mq调用
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MqTest2 { class Program { static void Main(string[] args) { } /// <summary> /// 获取数据 /// </summary> static public void GetData() { MqHelper mqHelper = new MqHelper(); var e = mqHelper.GetMQMsg(); byte[] data = null; data = e.Body; var result = Encoding.UTF8.GetString(data); } /// <summary> /// 发送数据 /// </summary> static public void SendData() { MqHelper mqHelper = new MqHelper(); Student student = new Student() { name = "测试", Age = "12" }; string errorMsg = string.Empty; mqHelper.sendMQMessage(student, out errorMsg);//发送数据 } } public class Student { public string name { get; set; } public string Age { get; set; } } }
mq帮助类
using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MqTest2 { class MqHelper { private IModel _channel; private QueueingBasicConsumer _consumer; private IDictionary<string, IModel> _channels; public RabbitMQConfigSection_New RabbitMQConfigSection { get; private set; } private readonly object _syncRoot = new object(); public ConnectionFactory Factory { get; private set; } private IConnection Connection { get; set; } string RoutingKeys = "CDP.Finish";//路由键(多个路由中间用,隔开) string QueueName = "Citms.Queue.VideoTest";//队列名称 string ExchangeName = "Citms.Exchange.Test";//交换机名称 string SendRoutingKey = "CDP.Finish";//发送路由键 public MqHelper() { this._channels = new Dictionary<string, IModel>(); this.RabbitMQConfigSection = new RabbitMQConfigSection_New(); Factory = new ConnectionFactory(); Factory.UserName = this.RabbitMQConfigSection.RabbitMQUserName; Factory.Password = this.RabbitMQConfigSection.RabbitMQPassword; Factory.VirtualHost = "/"; Factory.Uri = this.RabbitMQConfigSection.RabbitMQUri; CreateConsumer(); } /// <summary> /// 创建路由 /// </summary> public void CreateConsumer() { try { _channel = GetNamedChannel(QueueName); _channel.BasicQos(0, 100, false); GetChannel().QueueDeclare(QueueName, true, false, false, null); if (!string.IsNullOrEmpty(RoutingKeys)) { // 支持绑定多个路由键 string[] rks = RoutingKeys.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries); foreach (var rk in rks) { GetChannel().QueueBind( QueueName, ExchangeName, rk.Trim() ); } } this._consumer = new QueueingBasicConsumer(_channel); _channel.BasicConsume(QueueName, false, this._consumer); } catch (Exception ex) { throw (ex); } } /// <summary> /// 获取消息队列内容 /// </summary> /// <returns></returns> public BasicDeliverEventArgs GetMQMsg() { try { return this._consumer.Queue.Dequeue(); } catch (Exception ex) { if (this._consumer == null || !this._consumer.IsRunning) { this.CreateConsumer(); } throw; } } /// <summary> /// 发送队列消息 /// </summary> /// <param name="iv"></param> /// <param name="errorMsg"></param> /// <returns></returns> public bool sendMQMessage(Student iv, out string errorMsg) { errorMsg = ""; dynamic obj = new { DataType = "Student", Data = iv, ReportedTime = "20190826161051" }; string value = JsonConvert.SerializeObject(obj); try { lock (_channel) { _channel.ExchangeDeclare(ExchangeName, "direct", true); byte[] bytes = Encoding.UTF8.GetBytes(value); _channel.BasicPublish(ExchangeName, SendRoutingKey, null, bytes); return true; } } catch (Exception ex) { errorMsg = ex.Message; return false; } } /// <summary> /// 获取 rabbitmq 通道 /// </summary> /// <returns></returns> public IModel GetChannel() { lock (this._syncRoot) { // 最多重试 4 次 for (int i = 0; i < 4; ++i) { try { // 通道为 null,重新创建 if (this._channel == null) { this._channel = this.GetConnection().CreateModel(); return this._channel; } // 通道不为 null,并且已经打开,直接返回 if (this._channel.IsOpen) { return this._channel; } // 通道不为 null,但是没有打开,关闭通道,continue else { this._channel.Dispose(); this._channel = null; // 随机休眠之后再试 int sleep = 1000; Thread.Sleep(sleep); continue; } } catch (Exception ex) { } finally { } } return null; } } /// <summary> /// 获取命名通道. /// </summary> /// <param name="name">The name.</param> /// <returns></returns> public IModel GetNamedChannel(string name) { IModel channel; lock (this._syncRoot) { for (int i = 0; i < 4; ++i) { try { // 通道为 null,重新创建 if (!this._channels.TryGetValue(name, out channel)) { channel = this.GetConnection().CreateModel(); this._channels[name] = channel; return channel; } // 通道不为 null,并且已经打开,直接返回 if (channel.IsOpen) { return channel; } // 通道不为 null,但是没有打开,关闭通道,continue else { channel.Dispose(); channel = null; this._channels.Remove(name); // 随机休眠之后再试 int sleep = 1000; Thread.Sleep(sleep); continue; } } catch (Exception ex) { throw; } finally { } } return null; } } public IConnection GetConnection() { lock (_syncRoot) { try { // 连接为 null,创建之 if (this.Connection == null) { if (this.Factory == null) { } this.Connection = this.Factory.CreateConnection(); return this.Connection; } // 连接不为 null,但是状态不是已打开,关闭连接并重新创建连接 if (!this.Connection.IsOpen) { try { // 释放连接 this.Connection.Dispose(); this.Connection = null; } catch (Exception ex) { this.Connection = null; } finally { // 创建新连接 this.Connection = this.Factory.CreateConnection(); } } // 返回连接对象 return this.Connection; } catch (Exception ex) { throw ex; } } } } public class RabbitMQConfigSection_New { public string RabbitMQUri { get; set; } = "amqp://192.168.0.37:5672"; public string RabbitMQUserName { get; set; } = "citms"; public string RabbitMQPassword { get; set; } = "citms@b7"; } }