官方文档地址: 2 Work Queues
在worker
之间分配任务(竞争的消费者模式)。
前提条件
本教程假设你已经安装了 RabbitMQ 并在本地主机端口(5672)上运行。
工作队列
在第一篇教程中,我们编写了从命名队列发送和接收消息的程序。在本文中,我们将创建一个工作队列,用于在多个worker
之间分配耗时的任务。
工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成(这种情况比比较耗时,影响其他任务的执行)。相反,我们把任务安排在以后完成(同步执行改异步执行)。将任务封装为消息并将其发送到队列。在后台运行的工作进程将取出消息并进行处理。当您运行多个worker
时,它们之间可以共享任务。
这个概念在 web 应用程序中特别有用,因为在较短的 HTTP 请求窗口期中不可能处理复杂的任务。
准备
在本教程的上一篇中,我们发送了一条"Hello World!"
的消息。现在我们将发送表示复杂任务的字符串。通过使用Thread.sleep()
函数来模拟任务所需处理时间。我们取字符串中点的数量作为复杂度,每个点表示需要处理一秒。例如,Hello…
需要三秒钟。
我们将略微修改前面示例中的Send.java
代码,使其可以接受命令行的输入内容。这个程序将发送消息到工作队列,我们命名程序为NewTask.java
:
String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
我们原来的Recv.java
程序也需要做一些更改:它需要为消息体中的每个点伪造一秒的工作时间。它将处理传递的消息并执行任务,所以我们称它为Worker.java
:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
};
//关闭自动消息确认机制(参见下面)
boolean autoAck = true;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
我们模拟任务的执行时间:
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
按照教程1
的方式编译它们(使用工作目录中的jar
文件和环境变量CP
):
javac -cp $CP NewTask.java Worker.java
在 Windows 上:
javac -cp %CP% NewTask.java Worker.java
循环调度
使用任务队列的优点之一是能够轻松地并行处理工作。如果我们积累了积压的工作,我们可以增加更多的worker
,这样就很容易扩大规模。
首先,让我们尝试同时运行两个worker
实例。它们都将从队列中获取消息,但具体是如何操作的呢?让我们来看看。
您需要打开三个控制台。其中两个将运行worker
程序。这两控制台将是我们的两个消费者 - C1
和C2
。为了方便,我们将对类路径使用环境变量$CP
(Windows 上是%CP%
):
# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们用来发布消息:
# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'
让我们看看我们的workers
接收到了什么:
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 会将每条消息依次发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为round-robin
(循环)。尝试让三个或更多的工人worker
使用这个方法。
消息确认
完成一项任务可能需要几秒钟的时间。如果其中一个消费者启动了一个耗时很长的任务,但只完成了一部分就挂掉了,会发生什么情况。对于我们当前的代码,RabbitMQ 一旦将消息传递给消费者,就会立即将其标记为删除。在这种情况下,如果你杀死了一个worker
,将丢失它正在处理的消息。还将丢失所有发送给这个特定worker
但尚未处理的消息。
但我们不想丢失任何任务。如果一个worker
死了,我们希望把这个任务交给另一个worker
。
为了确保消息永不丢失,RabbitMQ 支持消息确认。消费者返回一个确认信息,告诉 RabbitMQ 已经接收、处理了特定的消息,RabbitMQ 可以自由地删除它。
如果消费者在不发送ack
的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解为消息未被完全处理,并将重新对其排队。如果同时有其他的消费者在线,它很快会将其重新发送给另一个消费者。这样即使worker
偶尔死亡,也可以确保没有信息丢失。
没有任何消息会超时,当消费者死亡时,RabbitMQ 将重新传递消息。即使处理一条消息需要很长很长的时间,也没关系。
默认情况下,手动确认是打开的。在前面的示例中,我们通过设置autoAck=true
打开了自动确认,关闭了手动确认。下面我们关闭自动确认,使用手动确认消息接收成功。
channel.basicQos(1); // 一次只接受一条未加密的消息(参见下面的内容)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
使用这段代码,我们可以确保即使您使用Ctrl+C
杀死了一个正在处理消息的worker
,也不会丢失任何东西。在这个worker
死后不久,所有未确认的消息将被重新发送。
确认信息必须和接收消息在同一通道上发送。尝试使用不同的通道发送确认信息将导致通道级协议异常。请参阅有关消息确认的文档指南以了解更多信息。
忘记确认消息
忘记basicAck
是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户端退出时,消息将被重新发送(可能看起来像随机的重新发送),但是 RabbitMQ 将消耗越来越多的内存,因为它将无法释放任何未被确认的消息。
为了调试这种错误,可以使用rabbitmqctl
打印messages_unacknowledged
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 上:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,将丢失队列和消息。为了确保消息不丢失我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列在 RabbitMQ 节点重启后仍然存在。为了做到这一点,我们需要声明它是持久的:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
虽然这个命令本身是正确的,但是它在我们当前的设置中不能工作。那是因为我们已经定义了一个名为hello
的队列,它不是持久的。RabbitMQ
不允许您使用不同的参数重新定义现有队列,并将向任何试图这样做的程序返回一个错误。所以我们需要声明一个不同名称的队列,例如task_queue
:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
此队列声明queueDeclare
更改需要同时应用于生产者代码和消费者代码。
此时,我们确定即使 RabbitMQ 重新启动,task_queue
队列也不会丢失。现在,我们需要将消息标记为持久的 - 通过将MessageProperties
(它实现了BasicProperties
)的值设置为PERSISTENT_TEXT_PLAIN
。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
注意事项
将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接收了一条消息并且还没有保存它时,仍然会有一个短的时间窗口。此外,RabbitMQ 不会对每条消息都执行fsync(2)
– 它可能只是保存到缓存中,而不是真正写到磁盘上。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果你需要更强的保证,那么你可以使用发布者确认。
公平分发
你可能已经注意到,调度仍然不能完全按照我们希望的方式工作。例如,在两个worker
的情况下,当所有奇数信息都很重,偶数信息很轻时,一个worker
会一直很忙,而另一个worker
几乎什么都不做。好吧,RabbitMQ 对此一无所知,仍然会均匀地分发消息。
这是因为 RabbitMQ 只在消息进入队列时分发消息。它不会查看消费者未确认的消息的数量。它只是盲目地将第n
条消息发送给第n
个消费者。
为了解决这个问题,我们可以使用basicQos
方法,并设置prefetchCount = 1
。这告诉 RabbitMQ 一次不要向一个worker
发送多个消息。或者,换句话说,在worker
处理并确认一个消息之前,不要向其发送新消息。相反,它将把它发送给另一个不太忙的worker
。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意队列大小
如果所有的worker
都很忙,你的队列就会排满。你要注意这一点,可以通过增加更多的worker
,或者其他的策略来解决这个问题。
把它们放一起
我们NewTask.java
类的最终代码:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wangbo
* @date 2019/10/23 11:24
*/
public class NewTask {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接器连接到服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection()){
//创建一个通道
Channel channel = connection.createChannel();
//声明一个队列,并将队列设置为持久的
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
//从命令行接受参数
String message = String.join(" ", args);
//发布一条消息,并将消息设置为持久的
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
Worker.java
类的最终代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wangbo
* @date 2019/10/22 18:25
*/
public class Worker {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接器连接到服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明一个队列,并将队列设置为持久的
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//一次只接受一条未加密的消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
//手动确认已收到一个消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//消息自动确标志
boolean autoAck = false;
//消费者监听
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
}
private static void doWork(String task) {
for (char ch: task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
使用消息确认和预取计数prefetchCount
可以设置工作队列。使用持久性设置可以保证 RabbitMQ 高可用。
有关Channel
方法和MessageProperties
的更多信息,可以在线浏览 JavaDocs。
现在我们可以继续教程3
,学习如何向许多用户传递相同的信息。