个人名片:
博主:酒徒ᝰ.
个人简介:沉醉在酒中,借着一股酒劲,去拼搏一个未来。
本篇励志:三人行,必有我师焉。
本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》,SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 点击观看
三、死信交换机
3. 延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
- 安装DelayExchange插件
官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
因为之前是基于Docker安装RabbitMQ,所以基于Docker来安装RabbitMQ插件。
1)下载插件
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
其中包含各种各样的DelayExchange插件:
2)上传插件
之前设定的RabbitMQ的数据卷名称为
mq-plugins
。
使用命令查看数据卷:
docker volume inspect mq-plugins
可以得到下面结果:
接下来,将课前资料中的插件(rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
)上传到这个目录即可:
cd /var/lib/docker/volumes/mq-plugins/_data
3)安装插件
最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq
,所以执行下面命令:
docker exec -it mq bash
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
结果如下:
- DelayExchange原理
DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
- 使用DelayExchange
插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。
1)声明DelayExchange交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String msg) {
log.info("接收到延迟队列的消息:{}", msg);
}
2)发送消息
发送消息时,一定要携带x-delay属性,指定延迟的时间:
@Test
public void testDelay() {
Message message = MessageBuilder
.withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay", 10000)
.build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.debug("延时消息发送成功");
}
测试:
delay消息成功,但是下面报了错。
这是消息在delay时处于暂存状态,队列没有收到,delay结束后才发送队列,修改代码,判断是否是delay消息,是的话不走报错环节。
报错的内容是在publisher中的CommonConfig.java中,修改代码,添加判断delay环节。
@Test
public void testDelay() {
Message message = MessageBuilder
.withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay", 10000)
.build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.debug("延时消息发送成功");
}
结果:
不在报错
总结
延迟队列插件的使用步骤包括哪些?
声明一个交换机,添加delayed属性为true
发送消息时,添加x-delay头,值为超时时间