MQ高级-延迟队列

请添加图片描述
个人名片:

博主酒徒ᝰ.
个人简介沉醉在酒中,借着一股酒劲,去拼搏一个未来。
本篇励志三人行,必有我师焉。

请添加图片描述
本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》,SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 点击观看

三、死信交换机

3. 延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

  1. 安装DelayExchange插件

官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

因为之前是基于Docker安装RabbitMQ,所以基于Docker来安装RabbitMQ插件。

1)下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

其中包含各种各样的DelayExchange插件:

2)上传插件

之前设定的RabbitMQ的数据卷名称为mq-plugins
在这里插入图片描述

使用命令查看数据卷:

docker volume inspect mq-plugins

可以得到下面结果:
在这里插入图片描述

接下来,将课前资料中的插件(rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez)上传到这个目录即可:

cd /var/lib/docker/volumes/mq-plugins/_data

3)安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:
在这里插入图片描述

  1. DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列
  1. 使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

1)声明DelayExchange交换机

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayQueue(String msg) {
    
    
    log.info("接收到延迟队列的消息:{}", msg);
}

2)发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间:

@Test
public void testDelay() {
    
    
    Message message = MessageBuilder
            .withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
            .setHeader("x-delay", 10000)
            .build();
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
    log.debug("延时消息发送成功");
}

测试:
在这里插入图片描述

delay消息成功,但是下面报了错。

这是消息在delay时处于暂存状态,队列没有收到,delay结束后才发送队列,修改代码,判断是否是delay消息,是的话不走报错环节。

报错的内容是在publisher中的CommonConfig.java中,修改代码,添加判断delay环节。

@Test
public void testDelay() {
    
    
    Message message = MessageBuilder
            .withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
            .setHeader("x-delay", 10000)
            .build();
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
    log.debug("延时消息发送成功");
}

结果:
不在报错
在这里插入图片描述

总结

延迟队列插件的使用步骤包括哪些?

  • 声明一个交换机,添加delayed属性为true

  • 发送消息时,添加x-delay头,值为超时时间

猜你喜欢

转载自blog.csdn.net/m0_65144570/article/details/133150879