公平分发也就是能者多劳模式,当有消费者处理完成后,MQ就会继续给它分发消息,直到MQ中所有消息都被消费。
公平分发需要注意的是,关闭自动应答模式,将每次分发的消息数进行限定,同时在消息消费完成后手动向MQ发送确认,之后MQ将该条消息移除,继续分发新的消息。
具体的介绍会在消费者1的代码中详细注释。
定义生产者
和简单队列一样,声明生产者(具体可查看之前的博客——简单队列)。本次循环发送50条消息到队列。
package com.mmr.rabbitmq.work;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String msg="Hello work QUEUE " + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("Send----->" + msg + "----" + i);
Thread.sleep(100);
}
channel.close();
connection.close();
}
}
定义消费者
本次消费者为两个,通过两个消费者模拟工作队列。
消费者1:
package com.mmr.rabbitmq.workfair;
import java.io.IOException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//声明在一条消息被确认消费前,不会再发给该消费者另外的消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//一般用于异常处理,告知MQ消息处理失败,
//第二个参数为true,则该条消息被重新放回队列,为false则放弃该条消息
// channel.basicReject(envelope.getDeliveryTag(), false);
e.printStackTrace();
}finally {
System.out.println("Recv1---->" + msg + "---->Done");
//向MQ发送确认,告诉MQ该条消息已经消费,可以重新发送了
//第二个参数为false表示只确认当前这一条消息,
//如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认(批量确认针对的是整个信道)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//相比于轮询分发,将第二个参数设置为false,也就是关闭自动应答模式
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
消费者2
package com.mmr.rabbitmq.workfair;
import java.io.IOException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("Recv2---->" + msg + "---->Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
这时候查看控制台的输出记录便可清晰的看到,消费者1处理的消息比消费者2多。
以上就是rabbitMq的公平分发模式。