工作队列模式
工作队列或者竞争消费者模式结构如下图:
工作队列模式与简单模式相比,多了一个消费端,两个消费端(或多个消费端)共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
它解决了当消息队列的消息过多的情况,单消费者消费速率有限导致的消息堆积的问题。
工作队列模式分为两种:轮询分发、公平分发
- 轮询分发:任务平均分配。不管谁忙,都不会多给消息,总是你一个我一个
- 公平分发:能者多劳。谁消费得快,谁就消费得多。这种模式下,必须关闭自动应答 ACK,改成手动应答,并且设置
channel.basicQos(1);
,表示消费者一次只处理一条消息
后台页面操作
这个工作队列模式中的队列是和默认的交换机 AMQP default
进行绑定(和简单模式一样),在页面中无法很形象地表现出来,在下面会用代码的方式进行操作,会更直观。
先建 2 个队列:work_queue1、work_queue2
然后,在默认的交换机中给这两个队列分别发送消息(需要指定路由键:路由键为队列名称)
上述给 work_queue1、work_queue2 分别发送消息
发送完消息后,到队列中查看消息
这样,便在页面中完成了工作队列模式。页面并不能很直观地感受到工作队列。接下来看看使用代码如何实现吧。
代码实现
P:生产者:任务的发布者
C1:消费者1 Consumer :领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2 Consumer2 :领取任务并且完成任务,假设完成速度较快
轮询分发
生产者循环发送 20 条消息
public class Producer {
public static void main(String[] args) throws Exception{
// 1. 获取连接
Connection connection = RabbitMqUtil.getConnection("生产者");
// 2. 通过连接获取通道 Channel
Channel channel = connection.createChannel();
String queueName = "code_work_queue1";
// 3. 通过通道创建声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 4. 发送消息给队列 Queue
for (int i = 0; i < 20; i++) {
// 5. 准备消息内容
String message = "Hello RabbitMQ" + i;
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送完成~~~发送的消息为:" + message);
}
// 6. 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
消费者 Consumer:
public class Consumer {
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
final Channel channel = connection.createChannel();
String queueName = "code_work_queue1";
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 消息体
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
try {
// 模拟任务耗时 1s
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 监听队列
channel.basicConsume(queueName, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
消费者 Consumer2 与 消费者 Consumer 代码基本一样。只是消费者 Consumer2 没有设置消费耗时间
先启动生产者,再启动两个消费者,结果如下:
消费者 Consumer 消费了 10 条消息
消费者 Consumer2 也消费了 10 条消息
可以发现,两个消费者各自消费了不同 10 条消息,这就实现了任务的分发。
公平分发
上面的实现有什么问题吗?
消费者 Consumer 比消费者 Consumer2 的效率要低,一次任务的耗时较长,然而两人最终消费的消息数量是一样的。消费者2大量时间处于空闲状态,消费者1一直忙碌。
正确的做法应该是消费越快的人,消费的越多。
那么如何实现呢?
- 设置手动 ACK
- 设置
channel.basicQos(1);
只修改消费者代码:
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 消息体
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
/**
* @param1:deliveryTag:用来标识消息的id
* @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息
*/
// 手动确认
channel.basicAck(deliveryTag, false);
}
};
channel.basicQos(1);
// 监听队列
channel.basicConsume(queueName, false, consumer);
再次执行生产者、消费者程序,结果如下:
消费者 Consumer 只消费了两条消息
消费者 Consumer2 消费了十八条消息
这就实现了多劳多得!!