NET RabbitMQ报错None of the specified endpoints were reachable
背景
由于项目用到多线程采集设备数据,需要频繁推送数据,之前搜到的方法都是using创建连接,
创建channel,推送报错"None of the specified endpoints were reachable".
解决方案
创建通用连接,避免频繁连接,至于channel,为了线程安全,暂时不提出来,期待大牛指点
代码
private static IConnection connection;
/// <summary>
/// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)
/// </summary>
/// <returns></returns>
public static IConnection GetConnection()
{
if (connection == null)
{
//创建连接工厂【设置相关属性】
var factory = new ConnectionFactory()
{
//设置IP
HostName = "127.0.0.1",//RabbitMQ地址
Port = 5672,//端口
VirtualHost = "/mqtest",//RabbitMQ中要请求的VirtualHost名称
UserName = "用户名",//RabbitMQ用户
Password = "密码"//RabbitMQ用户密码
};
//通过工厂创建连接对象
connection = factory.CreateConnection();
}
return connection;
}
/// <summary>
/// 关闭连接
/// </summary>
public static void MQClose()
{
GetConnection().Close();
}
/// <summary>
/// 消息推送 字符串
/// </summary>
/// <param name="exchangeName">交换机名称</param>
/// <param name="queueName">队列名称</param>
/// <param name="Msg">消息</param>
public static void SendMessage(string exchangeName, string queueName, string Msg)
{
//创建管道
using (var channel = GetConnection().CreateModel())
{
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, queueName, null);
var body = Encoding.UTF8.GetBytes(Msg);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: exchangeName, routingKey: queueName, mandatory: false, basicProperties: properties, body: body);
}
}
/// <summary>
/// 消息推送List
/// </summary>
/// <typeparam name="T">消息对象类型</typeparam>
/// <param name="exchangeName">交换机名称</param>
/// <param name="queueName">队列名称</param>
/// <param name="Msg">消息对象集合</param>
public static void SendMessage<T>(string exchangeName, string queueName, T Msg)
{
//创建管道
using (var channel = GetConnection().CreateModel())
{
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, queueName, null);
string json=JsonConvert.SerializeObject(Msg);
var body = Encoding.UTF8.GetBytes(json);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: exchangeName, routingKey: queueName, mandatory: false, basicProperties: properties, body: body);
}
}
/// <summary>
/// 获取mq数据 字符串
/// </summary>
/// <param name="exchangeName">交换机名称</param>
/// <param name="queueName">通道名称</param>
/// <param name="action">回调方法</param>
public static void GetMessage(string exchangeName, string queueName, Action<string> action)
{
//创建管道
using (var channel = GetConnection().CreateModel())
{
//创建交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, queueName, null);
channel.BasicQos(0, 10, false);
//事件对象
var consumer = new EventingBasicConsumer(channel);
//实现获取message处理事件
consumer.Received += (model, ea) =>
{
string datas = Encoding.UTF8.GetString(ea.Body.ToArray());
if (!string.IsNullOrEmpty(datas))
{
action(datas);
Thread.Sleep(1500);
}
//手动设置回复
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queueName, true, consumer);
}
}
/// <summary>
/// 获取mq数据 字符串
/// </summary>
/// <param name="exchangeName">交换机名称</param>
/// <param name="queueName">通道名称</param>
/// <param name="action">回调方法</param>
public static void GetMessage<T>(string exchangeName, string queueName, Action<T> action)
{
//创建管道
using (var channel = GetConnection().CreateModel())
{
//创建交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, queueName, null);
channel.BasicQos(0, 10, false);
//事件对象
var consumer = new EventingBasicConsumer(channel);
//实现获取message处理事件
consumer.Received += (model, ea) =>
{
string datas = Encoding.UTF8.GetString(ea.Body.ToArray());
if (!string.IsNullOrEmpty(datas))
{
T value = JsonConvert.DeserializeObject<T>(datas);
action(value);
Thread.Sleep(1500);
}
//手动设置回复
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queueName, true, consumer);
}
}