工作队列(Work Queues)
在第一节中介绍了将消息发送到指定队列并从该队列中接收消息。在这一节中我们将创建 Work Queue
来分发耗时的任务给多个 worker。
工作队列(行话称为 Task Queues) 的主要思想是避免立即做一些资源密集型的任务,而必须等待其完成。相反,我们将任务安排在稍后完成。我们封装一个任务作为一个消息并将它发送到一个队列。一个在后台运行的工作进程将任务取出并最终执行工作。可以将任务分给多个工作进程。
这一概念在网络应用程序中特别有用,在这些应用程序中,不可能在一个短暂的 HTTP 请求内处理复杂的任务。
准备
在本教程的前一部分,我们发送了一条包含 “Hello World!” 的消息。现在我们将发送字符串代表复杂的任务。我们没有一个真实的任务,比如要调整大小的图像或者要渲染 pdf 文件,所以让我们通过使用 sleep()
函数假装我们很忙。我们将用字符串中的点数代表其复杂性;每个点将需要让任务多执行一秒钟。例如,一个写作 Hello...
这样的假任务需要执行 3 秒。
我们将从前面的例子中稍微修改一下 send.php
代码,来允许从命令行发送任意消息。这个程序将把任务发送到工作队列中,所以将它命名为 new_task.php
:
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo ' [x] Sent ', $data, "\n";
我们旧的 receive.php
脚本也需要一些改变:它需要为消息体中的每个点伪造一秒钟的工作时间。它会从队列中取出消息并执行任务,所以我们称之为 worker.php
:
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
请注意,我们的假任务模拟了执行的时间。
按照教程一运行它们:
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
循环调度(Round-robin dispatching)
使用 Task Queue
的优势之一是能够轻松地实现并行工作。如果我们的任务有积压,可以通过增加工作进程轻松的扩展。
首先,让我们尝试同时运行两个 worker.php
脚本。他们都将从队列中获得消息,但是具体是怎样的呢?具体如下。
你需要打开三个控制台。两个将运行 worker.php
脚本,这些是我们的两个消费者——C1 和 C2。
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们将发布新任务。一旦你开始开启消费脚本,你就可以发布一些信息了:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
让我们看看它向工作进程传递的消息:
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 会将每个消息依次发送给下一个消费者。平均来说,每个消费者都会收到相同数量的信息。这种分发消息的方式叫做循环调度。你可以用三个或更多的工作进程尝试。
消息通知(Message acknowledgment)
做一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始一项漫长的任务,却仅仅完成了一部分就挂掉了,会是什么原因呢?当前的代码逻辑是,一旦RabbitMQ向消费者发送了一条消息,它会立即将其标记为删除。在这种情况下,如果你杀掉一个工作进程,我们将失去它刚刚处理的信息。我们还会丢失发送给该特定进程尚未处理的所有消息。
但是我们不想丢失任何任务。当一个进程挂掉,我们希望这个任务分发给其他进程。
为了确保消息永远不会丢失,RabbitMQ 支持 消息确认( message acknowledgments)。当消费着接收、处理了特定的消息后,会返回一个确认告知 RabbitMQ,然后它就可以自由删除该消息。
如果消费进程挂掉(比如其信道被关闭、连接被关闭或TCP连接丢失)而没有发送 ack,RabbitMQ 将会认为该消息没有被完全处理,并将它重新入队列。如果同时有其他消费者在线,它将很快把它转交给另一个消费者。这样即使工作进程偶尔挂掉,你可以确保没有信息丢失。
这里没有任何消息超时的概念。只要工作进程挂掉,RabbitMQ 就会重新投递消息,即使处理一个消息需要非常长的时间也没有关系。
消息确认默认是关闭的。通过将basic_consume
中第四个参数设置为 false( true 表示没有确认)来打开它们,当工作进程完成一项任务后,发送恰当的通知就可以了。
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用上面这段代码,我们可以确保即使在一个工作进程正在处理消息时用CTRL+C杀掉,也不会丢掉信息。进程挂掉后不久,所有未确认的消息都将被重新分发。
消息确认必须发送到分发出该消息的 channel 上,尝试确认通知其他 channel 将导致 channel 级协议异常。请参阅 确认文档 指南了解更多信息。
忘记确认通知
遗忘通知很常见。这是一个容易犯的错误,但是后果是严重。当客户端退出时,消息会被重新分发(这看起来像是随机的重新分发),但是 RabbitMQ 会消耗越来越多的内存,因为它不能释放任何未被确认过的消息。
为了调试这种错误,您可以使用
rabbitmqctl
打印messages_unaccounted
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在windows 上使用:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经了解过如何保证即使消费者死掉,任务也不会丢失的操作。但是如果 RabbitMQ 服务器宕机,我们的消息仍然会丢失。
当 RabbitMQ 退出或者崩溃,如果没有设置,队列和消息就会丢失。要保证消息不丢,需要做两件事:需要标记队列和消息都持久化。
首先,我们需要确保 RabbitMQ 永远不会丢失队列。为了做到这个,需要声明它是 durable
。通过给函数 queue_declare
第三个参数赋值 true
:
$channel->queue_declare('hello', false, true, false, false);
尽管这个命令本身是正确的,它并不会在现在的设置中生效。那是因为我们已经定义了一个 hello
的队列,而它不是持久化的。RabbitMQ 不允许重新定义已经存在不同配置的队列,如果试图这样做,它会返回一个错误。这里有一个快速的解决办法,我们重新声明一个其他名字的队列。比如 task_queue
:
$channel->queue_declare('task_queue', false, true, false, false);
这个标记设置为 true
需要同时应用在生产者和消费着的代码中。
这是我们可以确保 task_queue
在 RabbitMQ 服务器重启也不会丢失。现在我们需要标记我们的消息为持久化 – 通过恰当的设置消息 delivery_mode = 2
,因为 AMQPMessage 需要将该属性作为数组传值。
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
关于消息持久性的注意事项
标记消息为持久的并不能完全保证消息不丢失。尽管它告诉 RabbitMQ 将消息存入硬盘,这期间仍然有很短的时间窗口当 RabbitMQ 接收了消息却并没有来的及保存它。而且,RabbitMQ 并不会为每一个消息做
fsync(2)
– 它可能仅仅是保存在缓存中,并不是真的写入硬盘。该持久配置并不能强保障,但是对于我们简单的任务队列已经足够了。如果我们需要强一致性,可以使用 publisher confirms。
公平调度
你可能已经注意到调度仍然没有按照我们想要的那样工作。例如,在有两个工作进程的情况下:分发到奇数进程的消息复杂,到偶数进程的消息简单,其中一个进程将会持续繁忙的状态,而另一个几乎无事可做。RabbitMQ 对此一无所知,仍然会平均发送消息。
发生这种情况是因为当消息进入队列,RabbitMQ 就会将消息分发出去。它并不会考虑消费者未确认消息的数量。只是盲目地把第 n 条消息发送给第 n 个消费者。
为了克服这一点,我们可以使用 basic_qos
方法,并设置参数 prefetch_count = 1
。这告诉 RabbitMQ 不要一次给一个工作进程发送多个消息。或者换句话说,在一个工作进程没有处理完并确认前一个消息时,不要向它分发新的消息。相反,它会将该新消息分发给下一个不忙的工作进程。
$channel->basic_qos(null, 1, null);
注意队列的大小
如果所有的工作进程都在忙,你的队列就会排满。你要留意这一点,也许增加更多的工作进程,或者有一些其他的策略。
总结
new_task.php
的最终代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
以及 worker.php
:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
使用消息确认和 prefetch
可以设置一个工作队列。持久性的选项可以使任务即使在重启 RabbitMQ 也不会丢。
现在你可以移步到教学 3,学习如何向许多消费者传递相同的信息。
生产环境【非】适用性声明
请记住,本教程和其他教程都是教程。它们一次只展示一个新的概念,而且故意使一些事情简单化,而忽略了另一些。比如:简洁起见,连接管理、错误处理,连接恢复、并发以及**度量收集(metric collection)**等主题很大程度上被省略。这些简化的代码不应该直接拿来用在生产环境。
建议阅读以下的指导: Publisher Confirms and Consumer Acknowledgements, Production Checklist and Monitoring。
翻译自:https://www.rabbitmq.com/tutorials/tutorial-two-php.html