RabbitMQ客户端开发
本篇博客思维导图:
RabbitMQ中关键的Class有Channel
、Connection
、ConnectionFactory
、Consumer
等,AMQP协议层面的操作通过Channel
接口实现(C#中现在为IModel
接口)
Connection
用来开启一个Channel
,RabbitMQ的开发工作基本也是围绕Connetion
和Channel
两个类展开的。
本篇通过一个完整运转流程进行讲解:
- 连接
- 交换机和队列的声明和绑定
- 发送消息
- 消费消息
- 消息确认
- 关闭连接
1. 连接RabbitMQ
1.1创建Connection和Channel
给定参数连接RabbitMQ,并且创建多个Channel
:
//创建连接工厂,提供用户名等信息
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "test";
factory.Password = "123465";
factory.VirtualHost = "virtual";
factory.HostName = "127.0.0.1";
factory.Port = 2020;
//创建连接
IConnection connection = factory.CreateConnection();
//创建多个信道
var channel1 = connection.CreateModel();
var channel2 = connection.CreateModel();
可以看到一个Connection
可以创建多个Channel
,但是我们应该为每一个线程开辟一个Channel
,因为Channel
实例是非线程安全的,并发情况下可能会导致网络上出现错误的通信帧交错,也会影响到发送方确认机制(publisher confirm)的运行。
创建好Channel
之后,我们就可以使用它来发送或者接收消息了。
1.2 连接状态判定
IsOpen属性判定
Channle
和Connection
都有一个IsOpen
的属性,这个属性可以判断Channel
和Connection
是否开启,但是并不推荐在实际生产环境中使用IsOpen
,因为这个属性依赖于CloseReason
(关闭原因),如果此属性为null
就代表Channel
或者Connection
是开启状态。
但是需要注意
的是:
CloseReason
可能会产生竞争,所以使用IsOpen
判断Connection
和Channel
是否在线是不可靠的。
看下面这一段代码:
if (channel.IsOpen) //此刻IsOpen为true,判断通过
{
//做某些事
//...
//....
//做某些事的这段时间之间,channel发生离线
//也就是说CloseReason不为null,执行下面的代码程序就会报错
channel.BasicQos(10, 10, false); //因为离线,所以执行失败,可能导致程序奔溃
}
RabbitMQ关于IsOpen
的摘要:
RabbitMQ关于CloseReason
的摘要:
try/catch判定
当我们使用Channel
的时候,如果其处于关闭状态,那么RabbitMQ可能会抛出诸如RabbitMQ.Client.Exceptions.OperationInterruptedException
之类的错误,我们只需要捕获这些一场即可。
try
{
//...
channel.BasicQos(10,10,false);
}
catch (RabbitMQ.Client.Exceptions.OperationInterruptedException e)
{
channel.Close();
throw e;
}
2. 声明交换机和队列
交换机Exchange
和队列Queue
是AMQP
高层面构建的模块,应用程序需要确保在使用它们的时候其已存在,在使用它们的时候我们必须先声明(Delare
)它们。
以下代码演示如何声明一个交换机和队列:
//声明一个持久化、非自动删除、Direct类型的交换机
channel.ExchangeDeclare(
exchange: EXCHANGE_NAME,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
//声明一个默认的队列(非持久化、排他的、自动删除),名称由RabbitMQ自动生成
string queueName = channel.QueueDeclare().QueueName;
//绑定队列和交换机
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
当然你也可以不声明默认队列,根据自己的需求配置参数。
2.1 ExchangeDeclare方法详解
方法1:ExchangeDeclare
void ExchangeDeclare(
string exchange,
string type,
bool durable,
bool autoDelete,
IDictionary<string, object> arguments);
参数详解:
- exchange:交换机的名称
- type:交换机类型,可以使用枚举
ExchangeType
设定 - durable:是否持久化,设置为true的时候表示持久化,反之表示非持久化。持久化可以将交换机存在硬盘中,即使服务器重启,交换机的信息也不会丢失。
- autoDelete:是否自动删除,true表示自动删除。自动删除有一个前提,在删除前至少有一个队列或者交换机与之绑定,之后所有的绑定都与之解绑,才会执行自动删除。
- arguments:其它一些结构化的参数,指定扩展属性
方法2:ExchangeDeclareNoWait
void ExchangeDeclareNoWait(
string exchange,
string type,
bool durable,
bool autoDelete,
IDictionary<string, object> arguments);
此声明方法特点是不需要服务节点(Broker)返回确认,而ExchangeDeclare
方法声明交换机需要服务器返回AMQP命令Exchange.Declare-Ok
表示交换机在服务器中创建成功。
使用ExchangeDeclareNoWait
方法可能会出现声明交换机之后立即使用,但是服务器可能没有创建完成这一情况,所有没有特殊原因和应用场景不建议使用此方法声明交换机。
方法3:ExchangeDeclarePassive
void ExchangeDeclarePassive(string exchange);
此方法检测指定交换机是否存在,如果交换机在服务节点中已存在,则正常返回,如果不存在则抛出异常404 channel exception
同时Channel也会被关闭,这种方法在实际应用中很常见。
我们可以写一个检测交换机是否存在的工具方法:
public bool IsExchangeExisted(IModel channel, string exchangeName)
{
bool result = false;
try
{
channel.ExchangeDeclarePassive(exchangeName);
result = true;
}
catch (Exception e) //不存在则抛出错误
{
//no - op
}
return result;
}
删除交换机
删除交换机的方法如下:
void ExchangeDelete(string exchange, bool ifUnused);
对应参数:
- exchange:交换机名
- ifUnuser:设置是否在交换机没有被使用的情况下删除,设置true则只有在此交换机没有被使用的情况下被删除。
当然还有ExchangeDeleteNoWait
方法,Like ExchangeDelete but sets nowait to true.
void ExchangeDeleteNoWait(string exchange, bool ifUnused);
2.2 QueueDeclare方法详解
声明队列有四种方法
方法1:QueueDeclare
QueueDeclareOk QueueDeclare(
string queue,
bool durable,
bool exclusive,
bool autoDelete,
IDictionary<string, object> arguments);
方法2:声明默认的队列,名称由RabbitMQ生成
public static QueueDeclareOk QueueDeclare(this IModel model, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, IDictionary<string, object> arguments = null);
方法3:QueueDeclarePassive
QueueDeclareOk QueueDeclarePassive(string queue);
方法4:QueueDeclareNoWait
void QueueDeclareNoWait(
string queue,
bool durable,
bool exclusive,
bool autoDelete,
IDictionary<string, object> arguments);
参数作用类似之前的交换机声明,就不再细说,需要注意的一个参数是:exclusive
exclusive:设置是否排他,设置为true时队列是排他队列
如果队列被设置为排他队列,那么此队列有以下特点
-
仅对首次声明它的连接可见
-
连接断开时自动删除(即使设置为持久化)
对于第一点:另一个连接(Connection
)无法声明一个同样的队列,并且只区别连接(Connection)而不是通道(Channel),即同一连接(Connection
)的不同信道(Channel
)是可以同时访问此队列的。
如果试图在一个不同的连接中重新声明或访问(如publish,consume)该排他性队列,会得到资源被锁定的错误:
ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'UserLogin2'
对于第二点:RabbitMQ会自动删除这个队列,而不管这个队列是否被声明成持久性的(Durable =true
)。 也就是说即使客户端程序将一个排他性的队列声明成了Durable
的,只要调用了连接的Close
方法或者客户端程序退出了,RabbitMQ都会删除这个队列。注意这里是连接断开的时候,而不是通道断开。这个其实前一点保持一致,只区别连接而非通道。
这种队列适用于一个客户端同时发送和读取消息的应用场景。
注意要点
生产者和消费者都能使用QueueDeclare
声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了,必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。
删除队列
QueueDelete
//Returns the number of messages purged during queue deletion
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
它返回一个uint
类型的值,代表队列删除期间清除的消息数量。
与删除交换机不同的是多了个参数ifEmpty
,当此参数设置为true之时,表示队列里面没有任何消息堆积的情况下才能够删除。
QueuePurge 此方法用于删除队列上的消息而不删除队列本身
uint QueuePurge(string queue);
3. 绑定
3.1 交换机和队列之间的绑定
将交换机和队列绑定的方法如下两种,可以和之前的方法定义进行类比:
QueueBind
void QueueBind(
string queue,
string exchange,
string routingKey, I
Dictionary<string, object> arguments);
QueueBindNoWait
void QueueBindNoWait(
string queue,
string exchange,
string routingKey,
IDictionary<string, object> arguments);
3.2 交换机和交换机之间的绑定
不仅队列和交换机之间可以进行绑定,两个交换机之间也能绑定,两个交换机绑定之后,消息从source
交换机转发到destination
交换机,从某种程度上来说destination
交换机可以看做一个队列。
ExchangeBind
void ExchangeBind(
string destination,
string source,
string routingKey, I
Dictionary<string, object> arguments);
ExchangeBindNoWait
void ExchangeBindNoWait(
string destination,
string source,
string routingKey,
IDictionary<string, object> arguments);
其中参数:
- destination:目的交换机
- source:源交换机
思考一下如何实现下面图片上所示的绑定关系
代码:
//声明源交换机
channel.ExchangeDeclare(
exchange: "source",
type: ExchangeType.Direct);
//声明目的交换机
channel.ExchangeDeclare(
exchange: "destination",
type: ExchangeType.Fanout);
//声明队列
channel.QueueDeclare(
queue: "queue");
//绑定
channel.ExchangeBind(destination: "destination", source: "source", routingKey: "routingKey");
channel.QueueBind(queue: "queue", exchange: "destination", "routingKey");
//发布消息至源交换机
channel.BasicPublish(
exchange: "source",
routingKey: "routingKey",
basicProperties: channel.CreateBasicProperties(),
Encoding.UTF8.GetBytes("message"));
4.消息发送与消费
4.1 发送消息
使用Channel
的BasicPublish
方法可以发送消息。
例如一下代码所示,发送一条"hello"的消息到名为"myExchange"的交换机。
channel.BasicPublish(
exchange: "myExchange",
routingKey: "routingKey",
basicProperties: channel.CreateBasicProperties(),
Encoding.UTF8.GetBytes("hello"));
我们看一下BasicPublish
方法:
void BasicPublish(
string exchange,
string routingKey,
bool mandatory,
IBasicProperties basicProperties,
byte[] body)
其它参数大同小异,主要关注mandatory
和basicProperties
这两个参数:
- mandatory:
- basicProperties:消息的基本属性集,其中包括多个属性成员
basicProperties
可以定义消息的特定属性,之前用的都是默认属性,现在我们定义消息优先级(priority
)和内容类型:
var prop = channel.CreateBasicProperties();
prop.Priority = 2; //优先级设置为2
prop.ContentType = "text/plan"; //content-type
//发布消息至源交换机
channel.BasicPublish(
exchange: "source",
routingKey: "routingKey",
basicProperties: prop,
Encoding.UTF8.GetBytes("message"));
也可以定义过期时间(expiration
):
var prop = channel.CreateBasicProperties();
prop.Expiration = "60000";
还可以定义头部(header
):
var prop = channel.CreateBasicProperties();
Dictionary<string, object> header =
new Dictionary<string, object>();
header.Add("localtion", "here");
header.Add("time", "today");
prop.Headers = header;
4.2 消费消息
RabbitMQ有两种消费消息的方式:推模式(Push
)和拉模式(Pull
)。推模式对应的AMQP协议是Basic.Consume
,拉模式对应的AMQP协议是Basic.Get
。
推模式
在推模式中,我们可以通过持续订阅
的方式来消费消息。
使用BasicConsume
方法我们可以设置并且订阅某一队列:
string BasicConsume(
string queue,
bool autoAck,
string consumerTag,
bool noLocal,
bool exclusive,
IDictionary<string, object> arguments,
IBasicConsumer consumer);
其中各项参数:
- queue:队列名
- autoAck:设置是否自动确认(建议设置成false,即消费者手动确认)
- consumerTag:消费者标签,用来区分多个消费者
- noLocal:设置同一个Connection中生产者发送的消息能否被这个Connection中的消费者消费(是否自产自销)
- arguments:设置消费者其它参数
- consumer:消费者回调类(可以继承IBasicConsumer重写其中的方法)
我们看一下一个继承了IBasicConsumer接口
的消费者类是怎么样的:
public class TestConsumer : IBasicConsumer
{
public IModel Model { get; private set; }
public TestConsumer(IModel channel)
{
this.Model = channel;
}
public event EventHandler<ConsumerEventArgs> ConsumerCancelled;
/// <summary>
/// 消费者显式(basicCancel)以外的其它原因被取消时订阅式调用,例如:队列被删除
/// </summary>
/// <param name="consumerTag"></param>
public void HandleBasicCancel(string consumerTag)
{
throw new NotImplementedException();
}
/// <summary>
/// 消费者通过显式(basicCancel)取消订阅后调用
/// </summary>
/// <param name="consumerTag"></param>
public void HandleBasicCancelOk(string consumerTag)
{
Console.WriteLine("Cancel-OK");
//throw new NotImplementedException();
}
/// <summary>
/// 在其他方法前都会调用此方法
/// </summary>
/// <param name="consumerTag"></param>
public void HandleBasicConsumeOk(string consumerTag)
{
Console.WriteLine("Ok");
}
/// <summary>
/// 接收到任一消息时调用
/// </summary>
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
{
Console.WriteLine("message arrive");
//....消费消息....
//消费完后手动Ack
// Model.BasicAck(deliveryTag, false);
}
/// <summary>
/// Channel和Connetion关闭的时候调用
/// </summary>
/// <param name="model"></param>
/// <param name="reason"></param>
public void HandleModelShutdown(object model, ShutdownEventArgs reason)
{
throw new NotImplementedException();
}
}
举个例子,例如我们在客户端调用了BasicCancel()
方法:
channel.BasicCancel("MyConsumerTag");
此时调用方法的顺序是:HandleBasicConsumeOk
==>HandleBasicCancelOk
需要注意的是:
和生产者一样,消费者客户端同样需要考虑线程安全
的问题。消费者客户端的这些callback
会被分配到与Channel
不同的线程上,这意味着消费者客户端可以安全的调用一些阻塞
方法,例如channel.QueueDeclare
、channel.BasicCancel
(就算你在一个Consumer 的HandleBasicDeliver方法中阻塞了很长时间也不会影响客户端调用)
一般的做法是每个Channel
对应一个消费者,当然也可以在一个Channel
中维护多个消费者,但是这样会产生一个问题,如果Channel
中的一个消费者一直在运行,那么它就会阻塞其它消费者。
拉模式
拉模式通过channel.BasicGet
方法单条地获取消息,其返回值是BasicGetResult
(GetResponse)
BasicGet方法:
BasicGetResult BasicGet(string queue, bool autoAck);
以下代码是拉模式的关键代码:
BasicGetResult result = channel.BasicGet("MyQueue", false);
Console.WriteLine(result.Body);
channel.BasicAck(result.DeliveryTag, false);
需要注意的是:
只想获得单条消息而不是持续订阅,建议使用拉模式,但是我们不能将 BasicGet
放在一个While
循环里来代替Basic.Consume
,这样会严重影响
RabbitMQ的性能。
若要实现高吞吐量,建议使用推模式。
5.消费端的确认与拒绝
为了提高消息的可靠性,RabbitMQ提供了消息确认机制。
之前我们订阅队列时,有一个autoAck
参数,定义是否设置自动确认模式,建议设置为false
,既不自动确认。这样,RabbitMQ会显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后再真正删除),如果我们设置为true
,RabbitMQ自动就把发送的消息设置为已确认,然后从内存或者磁盘中删除,而不管消费端是否真正收到了这些消息。
消息确认
BasicAck方法
void BasicAck(ulong deliveryTag, bool multiple);
参数:
- deliveryTag:消息唯一编号,区分每一条消息
- multiple:是否批量确认
当设置autoAck
参数为false
的时候,RabbitMQ会一直等待
消息被消费者显式调用Basic.Ack
命令才将之删除并发送下一条消息。
此时队列中的消息会被分成两个部分:
- 已经投递给消费者但是还没有收到消费者确认信号的消息
- 等待投递给消费者的消息
如果RabbitMQ一直没有收到消费者的确认信号,并且消费者此时已经断开连接,那么RabbitMQ会安排消息重新入列,等待投递给下个一消费者,当然那个消费者也有可能是原来的消费者。
在Web上可以看到一个队列的两种消息:
- Ready:等待投递消息数
- Unacked:已投递未确认消息数
RabbitMQ允许消费者消费一条消息的时间可以很久,判断是否需要重新投递消息的依据只有一种:消费者是否断开连接
消息拒绝
有确认消息,也会有拒绝消息,拒绝消息的命令就是Basic.Reject
,消费者可以调用与其对应的Channel.BasicReject
方法来告诉RabbitMQ拒绝这个消息。
BasicReject方法
void BasicReject(ulong deliveryTag, bool requeue);
参数:
- deliveryTag:消息编号,区分每条消息
- requeue:是否重新入列
如果将requeue
设置为true
,那么RabbitMQ就会将这条被拒绝的消息重新入列,发送下一个订阅队列的消费者。如果设置为false
,则RabbitMQ就会立即把这条消息删除,不会被再次消费。
需要注意的是Basic.Reject
一次只能拒绝一条消息,如果需要批量拒绝消息,可以使用Basic.Nack
这个命令,对应的客户端方法为channel.BasicNack
void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
这个方法比BasicReject
多了一个参数multiple
,设置是否批量拒绝消息,如果设置为true,则表示拒绝此deliveryTag
标签之前所有未被当前消费者确认的消息。
将参数`requeue`设置为`false`可以开启死信队列功能(详见下一篇)。
6. 关闭连接
显式关闭
在应用程序使用完之后我们需要关闭连接从而释放资源:
channel.Close();
connection.Close();
显式关闭是一个好习惯,但也不是必须的,当connection
关闭的时候,channel
也会自动的关闭。
生命周期
connection
和channel
采用的是同一种方式管理网络失败、内部错误和显式关闭连接,所以它们具有相同类型的生命周期:
- Open:开启状态,代表当前对象可以使用
- Closing:正在关闭状态。当前对象已经被显式地通知调用关闭方法(shutdown),这样就会出现这种状态,已发送关闭请求,并等待关闭操作的完成
- Closed:已经关闭状态
connection
和channel
最终会成为Closed
这一状态,不管是何原因导致的(例如网络异常、显式调用或者客户端异常)
监控关闭
我们可以订阅ConnectionShutdown
事件,当连接关闭时,就会触发此事件
private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
//TODO:日志记录等
}
ShutdownEventArgs
这个参数包括了此次关闭的原因,其中ShutdownInitiator
包含了此次关闭的位置定位信息。
我们还可以使用 connection.CloseReason
方法获得相关关闭信息,如果连接还处于可用状态的话返回null
ShutdownEventArgs reason = connection.CloseReason