RabbitMQ客户端开发

RabbitMQ客户端开发

本篇博客思维导图:

思维导图

RabbitMQ中关键的Class有ChannelConnectionConnectionFactoryConsumer等,AMQP协议层面的操作通过Channel接口实现(C#中现在为IModel接口)

Connection用来开启一个Channel,RabbitMQ的开发工作基本也是围绕ConnetionChannel两个类展开的。

本篇通过一个完整运转流程进行讲解:

  1. 连接
  2. 交换机和队列的声明和绑定
  3. 发送消息
  4. 消费消息
  5. 消息确认
  6. 关闭连接

1. 连接RabbitMQ

1.1创建Connection和Channel

给定参数连接RabbitMQ,并且创建多个Channel

        //创建连接工厂,提供用户名等信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = "test";
        factory.Password = "123465";
        factory.VirtualHost = "virtual";
        factory.HostName = "127.0.0.1";
        factory.Port = 2020;
        //创建连接
        IConnection connection = factory.CreateConnection();
        //创建多个信道
        var channel1 = connection.CreateModel();
        var channel2 = connection.CreateModel();

可以看到一个Connection可以创建多个Channel,但是我们应该为每一个线程开辟一个Channel,因为Channel实例是非线程安全的,并发情况下可能会导致网络上出现错误的通信帧交错,也会影响到发送方确认机制(publisher confirm)的运行。

创建好Channel之后,我们就可以使用它来发送或者接收消息了。

1.2 连接状态判定

IsOpen属性判定

ChannleConnection都有一个IsOpen的属性,这个属性可以判断ChannelConnection是否开启,但是并不推荐在实际生产环境中使用IsOpen,因为这个属性依赖于CloseReason(关闭原因),如果此属性为null就代表Channel或者Connection是开启状态。

但是需要注意的是:

CloseReason可能会产生竞争,所以使用IsOpen判断ConnectionChannel是否在线是不可靠的。

看下面这一段代码:

            if (channel.IsOpen) //此刻IsOpen为true,判断通过
            {
                //做某些事
                //...
                //....
                //做某些事的这段时间之间,channel发生离线
                //也就是说CloseReason不为null,执行下面的代码程序就会报错
                channel.BasicQos(10, 10, false); //因为离线,所以执行失败,可能导致程序奔溃
            }

RabbitMQ关于IsOpen的摘要:

IsOpen

RabbitMQ关于CloseReason的摘要:

CloseReason

try/catch判定

当我们使用Channel的时候,如果其处于关闭状态,那么RabbitMQ可能会抛出诸如RabbitMQ.Client.Exceptions.OperationInterruptedException之类的错误,我们只需要捕获这些一场即可。

        try
        {
            //...
            channel.BasicQos(10,10,false);
        }
        catch (RabbitMQ.Client.Exceptions.OperationInterruptedException e)
        {
            channel.Close();
            throw e;
        }

2. 声明交换机和队列

交换机Exchange和队列QueueAMQP高层面构建的模块,应用程序需要确保在使用它们的时候其已存在,在使用它们的时候我们必须先声明(Delare)它们。

以下代码演示如何声明一个交换机和队列:

        //声明一个持久化、非自动删除、Direct类型的交换机
        channel.ExchangeDeclare(
            exchange: EXCHANGE_NAME,
            type: ExchangeType.Direct,
            durable: true,
            autoDelete: false,
            arguments: null);
        //声明一个默认的队列(非持久化、排他的、自动删除),名称由RabbitMQ自动生成
        string queueName = channel.QueueDeclare().QueueName;
        //绑定队列和交换机
        channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);

当然你也可以不声明默认队列,根据自己的需求配置参数。

2.1 ExchangeDeclare方法详解

方法1:ExchangeDeclare

void ExchangeDeclare(
    string exchange, 
    string type, 
    bool durable, 
    bool autoDelete, 
    IDictionary<string, object> arguments);

参数详解:

  • exchange:交换机的名称
  • type:交换机类型,可以使用枚举ExchangeType设定
  • durable:是否持久化,设置为true的时候表示持久化,反之表示非持久化。持久化可以将交换机存在硬盘中,即使服务器重启,交换机的信息也不会丢失。
  • autoDelete:是否自动删除,true表示自动删除。自动删除有一个前提,在删除前至少有一个队列或者交换机与之绑定,之后所有的绑定都与之解绑,才会执行自动删除。
  • arguments:其它一些结构化的参数,指定扩展属性

方法2:ExchangeDeclareNoWait

void ExchangeDeclareNoWait(
    string exchange, 
    string type, 
    bool durable, 
    bool autoDelete, 
    IDictionary<string, object> arguments);

此声明方法特点是不需要服务节点(Broker)返回确认,而ExchangeDeclare方法声明交换机需要服务器返回AMQP命令Exchange.Declare-Ok表示交换机在服务器中创建成功。

使用ExchangeDeclareNoWait方法可能会出现声明交换机之后立即使用,但是服务器可能没有创建完成这一情况,所有没有特殊原因和应用场景不建议使用此方法声明交换机。

方法3:ExchangeDeclarePassive

void ExchangeDeclarePassive(string exchange);

此方法检测指定交换机是否存在,如果交换机在服务节点中已存在,则正常返回,如果不存在则抛出异常404 channel exception同时Channel也会被关闭,这种方法在实际应用中很常见。

我们可以写一个检测交换机是否存在的工具方法:

    public bool IsExchangeExisted(IModel channel, string exchangeName)
    {
        bool result = false;
        try
        {
            channel.ExchangeDeclarePassive(exchangeName);
            result = true;
        }
        catch (Exception e) //不存在则抛出错误
        {
            //no - op
        }
        return result;
    }

删除交换机

删除交换机的方法如下:

    void ExchangeDelete(string exchange, bool ifUnused);

对应参数:

  • exchange:交换机名
  • ifUnuser:设置是否在交换机没有被使用的情况下删除,设置true则只有在此交换机没有被使用的情况下被删除。

当然还有ExchangeDeleteNoWait方法,Like ExchangeDelete but sets nowait to true.

void ExchangeDeleteNoWait(string exchange, bool ifUnused);

2.2 QueueDeclare方法详解

声明队列有四种方法

方法1:QueueDeclare

QueueDeclareOk QueueDeclare(
    string queue, 
    bool durable, 
    bool exclusive, 
    bool autoDelete, 
    IDictionary<string, object> arguments);

方法2:声明默认的队列,名称由RabbitMQ生成

public static QueueDeclareOk QueueDeclare(this IModel model, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, IDictionary<string, object> arguments = null);

方法3:QueueDeclarePassive

QueueDeclareOk QueueDeclarePassive(string queue);

方法4:QueueDeclareNoWait

void QueueDeclareNoWait(
    string queue, 
    bool durable, 
    bool exclusive, 
    bool autoDelete, 
    IDictionary<string, object> arguments);

参数作用类似之前的交换机声明,就不再细说,需要注意的一个参数是:exclusive

exclusive:设置是否排他,设置为true时队列是排他队列

如果队列被设置为排他队列,那么此队列有以下特点

  1. 仅对首次声明它的连接可见

  2. 连接断开时自动删除(即使设置为持久化)

对于第一点:另一个连接(Connection)无法声明一个同样的队列,并且只区别连接(Connection)而不是通道(Channel),即同一连接(Connection)的不同信道(Channel)是可以同时访问此队列的。

如果试图在一个不同的连接中重新声明或访问(如publish,consume)该排他性队列,会得到资源被锁定的错误:

ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'UserLogin2'

对于第二点:RabbitMQ会自动删除这个队列,而不管这个队列是否被声明成持久性的(Durable =true)。 也就是说即使客户端程序将一个排他性的队列声明成了Durable的,只要调用了连接的Close方法或者客户端程序退出了,RabbitMQ都会删除这个队列。注意这里是连接断开的时候,而不是通道断开。这个其实前一点保持一致,只区别连接而非通道。

这种队列适用于一个客户端同时发送和读取消息的应用场景。

注意要点

生产者和消费者都能使用QueueDeclare声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了,必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。

删除队列

QueueDelete

//Returns the number of messages purged during queue deletion
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);

它返回一个uint类型的值,代表队列删除期间清除的消息数量。

与删除交换机不同的是多了个参数ifEmpty,当此参数设置为true之时,表示队列里面没有任何消息堆积的情况下才能够删除。

QueuePurge 此方法用于删除队列上的消息而不删除队列本身

uint QueuePurge(string queue);

3. 绑定

3.1 交换机和队列之间的绑定

将交换机和队列绑定的方法如下两种,可以和之前的方法定义进行类比:

QueueBind

void QueueBind(
    string queue, 
    string exchange, 
    string routingKey, I
    Dictionary<string, object> arguments);

QueueBindNoWait

void QueueBindNoWait(
    string queue, 
    string exchange, 
    string routingKey, 
    IDictionary<string, object> arguments);

3.2 交换机和交换机之间的绑定

不仅队列和交换机之间可以进行绑定,两个交换机之间也能绑定,两个交换机绑定之后,消息从source交换机转发到destination交换机,从某种程度上来说destination交换机可以看做一个队列。

ExchangeBind

void ExchangeBind(
    string destination, 
    string source, 
    string routingKey, I
    Dictionary<string, object> arguments);

ExchangeBindNoWait

void ExchangeBindNoWait(
string destination, 
string source, 
string routingKey, 
IDictionary<string, object> arguments);

其中参数:

  • destination:目的交换机
  • source:源交换机

思考一下如何实现下面图片上所示的绑定关系

绑定交换机

代码:

            //声明源交换机
            channel.ExchangeDeclare(
                exchange: "source",
                type: ExchangeType.Direct);
            //声明目的交换机
            channel.ExchangeDeclare(
                exchange: "destination",
                type: ExchangeType.Fanout);
            //声明队列
            channel.QueueDeclare(
                queue: "queue");
            //绑定
            channel.ExchangeBind(destination: "destination", source: "source", routingKey: "routingKey");
            channel.QueueBind(queue: "queue", exchange: "destination", "routingKey");
            //发布消息至源交换机
            channel.BasicPublish(
                exchange: "source",
                routingKey: "routingKey",
                basicProperties: channel.CreateBasicProperties(),
                Encoding.UTF8.GetBytes("message"));

4.消息发送与消费

4.1 发送消息

使用ChannelBasicPublish方法可以发送消息。

例如一下代码所示,发送一条"hello"的消息到名为"myExchange"的交换机。

 channel.BasicPublish(
                exchange: "myExchange",
                routingKey: "routingKey",
                basicProperties: channel.CreateBasicProperties(),
                Encoding.UTF8.GetBytes("hello"));

我们看一下BasicPublish方法:

void BasicPublish(
string exchange, 
string routingKey, 
bool mandatory, 
IBasicProperties basicProperties,
byte[] body)

其它参数大同小异,主要关注mandatorybasicProperties这两个参数:

  • mandatory:
  • basicProperties:消息的基本属性集,其中包括多个属性成员

Prop

basicProperties可以定义消息的特定属性,之前用的都是默认属性,现在我们定义消息优先级(priority)和内容类型:

            var prop = channel.CreateBasicProperties();
            prop.Priority = 2; //优先级设置为2
            prop.ContentType = "text/plan"; //content-type
            //发布消息至源交换机
            channel.BasicPublish(
                exchange: "source",
                routingKey: "routingKey",
                basicProperties: prop,
                Encoding.UTF8.GetBytes("message"));

也可以定义过期时间(expiration):

            var prop = channel.CreateBasicProperties();
            prop.Expiration = "60000";

还可以定义头部(header):

            var prop = channel.CreateBasicProperties();
            Dictionary<string, object> header = 
            	new Dictionary<string, object>();
            header.Add("localtion", "here");
            header.Add("time", "today");
            prop.Headers = header;

4.2 消费消息

RabbitMQ有两种消费消息的方式:推模式(Push)和拉模式(Pull)。推模式对应的AMQP协议是Basic.Consume,拉模式对应的AMQP协议是Basic.Get

推模式

在推模式中,我们可以通过持续订阅的方式来消费消息。

使用BasicConsume方法我们可以设置并且订阅某一队列:

string BasicConsume(
string queue, 
bool autoAck, 
string consumerTag, 
bool noLocal, 
bool exclusive, 
IDictionary<string, object> arguments, 
IBasicConsumer consumer);

其中各项参数:

  • queue:队列名
  • autoAck:设置是否自动确认(建议设置成false,即消费者手动确认)
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置同一个Connection中生产者发送的消息能否被这个Connection中的消费者消费(是否自产自销)
  • arguments:设置消费者其它参数
  • consumer:消费者回调类(可以继承IBasicConsumer重写其中的方法)

我们看一下一个继承了IBasicConsumer接口的消费者类是怎么样的:

    public class TestConsumer : IBasicConsumer
    {
        public IModel Model { get; private set; }
        public TestConsumer(IModel channel)
        {
            this.Model = channel;
        }
        public event EventHandler<ConsumerEventArgs> ConsumerCancelled;
        /// <summary>
        /// 消费者显式(basicCancel)以外的其它原因被取消时订阅式调用,例如:队列被删除
        /// </summary>
        /// <param name="consumerTag"></param>
        public void HandleBasicCancel(string consumerTag)
        {
            throw new NotImplementedException();
        }
        /// <summary>
        /// 消费者通过显式(basicCancel)取消订阅后调用
        /// </summary>
        /// <param name="consumerTag"></param>
        public void HandleBasicCancelOk(string consumerTag)
        {
            Console.WriteLine("Cancel-OK");
            //throw new NotImplementedException();
        }
        /// <summary>
        /// 在其他方法前都会调用此方法
        /// </summary>
        /// <param name="consumerTag"></param>
        public void HandleBasicConsumeOk(string consumerTag)
        {
            Console.WriteLine("Ok");
        }
        /// <summary>
        /// 接收到任一消息时调用
        /// </summary>
        public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
        {
            Console.WriteLine("message arrive");
            //....消费消息....
            //消费完后手动Ack
           // Model.BasicAck(deliveryTag, false);
        }
        /// <summary>
        /// Channel和Connetion关闭的时候调用
        /// </summary>
        /// <param name="model"></param>
        /// <param name="reason"></param>
        public void HandleModelShutdown(object model, ShutdownEventArgs reason)
        {
            throw new NotImplementedException();
        }
    }

举个例子,例如我们在客户端调用了BasicCancel()方法:

 channel.BasicCancel("MyConsumerTag");

此时调用方法的顺序是:HandleBasicConsumeOk==>HandleBasicCancelOk

需要注意的是:

和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些callback会被分配到与Channel不同的线程上,这意味着消费者客户端可以安全的调用一些阻塞方法,例如channel.QueueDeclarechannel.BasicCancel(就算你在一个Consumer 的HandleBasicDeliver方法中阻塞了很长时间也不会影响客户端调用)

一般的做法是每个Channel对应一个消费者,当然也可以在一个Channel中维护多个消费者,但是这样会产生一个问题,如果Channel中的一个消费者一直在运行,那么它就会阻塞其它消费者。

拉模式

拉模式通过channel.BasicGet方法单条地获取消息,其返回值是BasicGetResult(GetResponse)

BasicGet方法:

BasicGetResult BasicGet(string queue, bool autoAck);

以下代码是拉模式的关键代码:

            BasicGetResult result = channel.BasicGet("MyQueue", false);
            Console.WriteLine(result.Body);
            channel.BasicAck(result.DeliveryTag, false);

需要注意的是:

只想获得单条消息而不是持续订阅,建议使用拉模式,但是我们不能将 BasicGet放在一个While循环里来代替Basic.Consume,这样会严重影响RabbitMQ的性能。

若要实现高吞吐量,建议使用推模式。

5.消费端的确认与拒绝

为了提高消息的可靠性,RabbitMQ提供了消息确认机制。

之前我们订阅队列时,有一个autoAck参数,定义是否设置自动确认模式,建议设置为false,既不自动确认。这样,RabbitMQ会显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后再真正删除),如果我们设置为true,RabbitMQ自动就把发送的消息设置为已确认,然后从内存或者磁盘中删除,而不管消费端是否真正收到了这些消息。

消息确认

BasicAck方法

void BasicAck(ulong deliveryTag, bool multiple);

参数:

  • deliveryTag:消息唯一编号,区分每一条消息
  • multiple:是否批量确认

当设置autoAck参数为false的时候,RabbitMQ会一直等待消息被消费者显式调用Basic.Ack命令才将之删除并发送下一条消息。

此时队列中的消息会被分成两个部分:

  1. 已经投递给消费者但是还没有收到消费者确认信号的消息
  2. 等待投递给消费者的消息

如果RabbitMQ一直没有收到消费者的确认信号,并且消费者此时已经断开连接,那么RabbitMQ会安排消息重新入列,等待投递给下个一消费者,当然那个消费者也有可能是原来的消费者。

在Web上可以看到一个队列的两种消息:

  • Ready:等待投递消息数
  • Unacked:已投递未确认消息数

Web管理界面上的消息类型

RabbitMQ允许消费者消费一条消息的时间可以很久,判断是否需要重新投递消息的依据只有一种:消费者是否断开连接

消息拒绝

有确认消息,也会有拒绝消息,拒绝消息的命令就是Basic.Reject,消费者可以调用与其对应的Channel.BasicReject方法来告诉RabbitMQ拒绝这个消息。

BasicReject方法

void BasicReject(ulong deliveryTag, bool requeue);

参数:

  • deliveryTag:消息编号,区分每条消息
  • requeue:是否重新入列

如果将requeue设置为true,那么RabbitMQ就会将这条被拒绝的消息重新入列,发送下一个订阅队列的消费者。如果设置为false,则RabbitMQ就会立即把这条消息删除,不会被再次消费。

需要注意的是Basic.Reject一次只能拒绝一条消息,如果需要批量拒绝消息,可以使用Basic.Nack这个命令,对应的客户端方法为channel.BasicNack

void BasicNack(ulong deliveryTag, bool multiple, bool requeue)

这个方法比BasicReject多了一个参数multiple,设置是否批量拒绝消息,如果设置为true,则表示拒绝此deliveryTag标签之前所有未被当前消费者确认的消息。

将参数`requeue`设置为`false`可以开启死信队列功能(详见下一篇)。

6. 关闭连接

显式关闭

在应用程序使用完之后我们需要关闭连接从而释放资源:

	channel.Close();
	connection.Close();

显式关闭是一个好习惯,但也不是必须的,当connection关闭的时候,channel也会自动的关闭。

生命周期

connectionchannel采用的是同一种方式管理网络失败、内部错误和显式关闭连接,所以它们具有相同类型的生命周期:

  • Open:开启状态,代表当前对象可以使用
  • Closing:正在关闭状态。当前对象已经被显式地通知调用关闭方法(shutdown),这样就会出现这种状态,已发送关闭请求,并等待关闭操作的完成
  • Closed:已经关闭状态

connectionchannel最终会成为Closed这一状态,不管是何原因导致的(例如网络异常、显式调用或者客户端异常)

监控关闭

我们可以订阅ConnectionShutdown事件,当连接关闭时,就会触发此事件

关闭事件

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
           //TODO:日志记录等
        }

ShutdownEventArgs这个参数包括了此次关闭的原因,其中ShutdownInitiator包含了此次关闭的位置定位信息。

我们还可以使用 connection.CloseReason方法获得相关关闭信息,如果连接还处于可用状态的话返回null

 ShutdownEventArgs reason = connection.CloseReason

猜你喜欢

转载自www.cnblogs.com/zaijianba/p/12626471.html