


   当设置multiple为true,RabbitMQ将确认未完成的标签,并携带在确认信息中包含标签标示。正如其它的收到确认信息,作用域也是channel。比如,假定channel (称为Ch)上有5、6、7和8四个未确认的传递标签,当一个确认帧到达,且标签是8,且multiple设置为true,所有5到8的将被确认。如果false,5到8依然不会被确认。



// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge all deliveries up to
             // this delivery tag
             channel.basicAck(deliveryTag, true);

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // positively acknowledge all deliveries up to
                    // this delivery tag
                    channel.BasicAck(ea.DeliveryTag, true);
String consumerTag = channel.BasicConsume(queueName, false, consumer);

    有时候,一个客户端不能及时处理一个信息,但是其它可能可以处理。这种情况下,可能希望冲洗分配这个任为,让其它的客户端来处理。basic.reject 和basic.nack可以执行这种操作。


    在客户端库上,这些函数通长作为channel对象的方法。java客户端通长用Channel#basicReject 和Channel#basicNack依次执行basic.reject 和basic.nack操作:

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // negatively acknowledge, the message will
             // be discarded
             channel.basicReject(deliveryTag, false);

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // requeue the delivery
             channel.basicReject(deliveryTag, true);

.NET客户端是使用IModel#BasicReject and IModel#BasicNack:

/ this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // negatively acknowledge, the message will
                    // be discarded
                    channel.BasicReject(ea.DeliveryTag, false);
// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // requeue the delivery
                    channel.BasicReject(ea.DeliveryTag, true);
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // requeue all unacknowledged deliveries up to
             // this delivery tag
             channel.basicNack(deliveryTag, true, true);

.NET 类似:

/ this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // requeue all unacknowledged deliveries up to
                    // this delivery tag
                    channel.BasicNack(ea.DeliveryTag, true, true);
String consumerTag = channel.BasicConsume(queueName, false, consumer);

Acknowledging Multiple Deliveries at Once

Manual acknowledgements can be batched to reduce network traffic.This is done by setting the multiple field of acknowledgementmethods (see above) to true. Note that basic.reject doesn'thistorically have the field and that's why basic.nack was introducedby RabbitMQ as a protocol extension.

When the multiple field is set to true, RabbitMQ will acknowledgeall outstanding delivery tags up to and including the tag specified in theacknowledgement. Like everything else related to acknowledgements, this is scoped per channel.For example, given that there are delivery tags 5, 6, 7, and 8 unacknowledged on channel Ch,when an acknowledgement frame arrives on that channel with delivery_tag set to 8and multiple set to true, all tags from 5 to 8 will be acknowledged.If multiple was set to false, deliveries 5, 6, and 7 would stillbe unacknowledged.

To acknowledge multiple deliveries with RabbitMQ Java client, pass true for themultiple parameter to Channel#basicAck:

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge all deliveries up to
             // this delivery tag
             channel.basicAck(deliveryTag, true);

The idea is very much the same with the .NET client:

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // positively acknowledge all deliveries up to
                    // this delivery tag
                    channel.BasicAck(ea.DeliveryTag, true);
String consumerTag = channel.BasicConsume(queueName, false, consumer);

Negative Acknowledgement and Requeuing of Deliveries

Sometimes a consumer cannot process a delivery immediately but other instances mightbe able to. In this case it may be desired to requeue it and let another consumer receiveand handle it. basic.reject and basic.nack are two protocolmethods that are used for that.

The methods are generally used to negatively acknowledge a delivery. Such deliveires canbe discarded by the broker or requeued. This behaviour is controlled by the requeue field.When the field is set to true, the broker will requeue the delivery (or multipledeliveries, as will be explained shortly) with the specified delivery tag.

Both methods are usually exposed as operations on a channel in client libraries. Javaclient users will use Channel#basicReject and Channel#basicNackto perform a basic.reject and basic.nack, respectively:

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // negatively acknowledge, the message will
             // be discarded
             channel.basicReject(deliveryTag, false);

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // requeue the delivery
             channel.basicReject(deliveryTag, true);

In .NET client the methods are IModel#BasicReject and IModel#BasicNack,respectively:

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // negatively acknowledge, the message will
                    // be discarded
                    channel.BasicReject(ea.DeliveryTag, false);
String consumerTag = channel.BasicConsume(queueName, false, consumer);

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // requeue the delivery
                    channel.BasicReject(ea.DeliveryTag, true);
String consumerTag = channel.BasicConsume(queueName, false, consumer);

When a message is requeued, it will be placed to its originalposition in its queue, if possible. If not (due to concurrentdeliveries and acknowledgements from other consumers whenmultiple consumers share a queue), the message will be requeuedto a position closer to queue head.

Requeued messages may be immediately ready for redelivery dependingon their position in the queue, the prefetch value used by the channelswith active consumers. This means that if all consumers requeue becausethey cannot process a delivery due to a transient condition, they willcreate a requeue/redelivery loop. Such loops can be costly in terms ofnetwork bandwidth and CPU resources. Consumer implementations can trackthe number of redeliveries and reject messages for good (discard them)or schedule requeueing after a delay.

It is possible to reject or requeue multiple messages at once using the basic.nackmethod. This is what differentiates it from basic.reject. It accepts an additionalparameter, multiple. Here's a Java client example:

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
             long deliveryTag = envelope.getDeliveryTag();
             // requeue all unacknowledged deliveries up to
             // this delivery tag
             channel.basicNack(deliveryTag, true, true);

Things work very similarly with .NET client:

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                    var body = ea.Body;
                    // requeue all unacknowledged deliveries up to
                    // this delivery tag
                    channel.BasicNack(ea.DeliveryTag, true, true);
String consumerTag = channel.BasicConsume(queueName, false, consumer);

