目录
延时队列
概念
延时队列是存储延时消息的队列,延时消息就是生产者发送了一条消息,但是不希望该消息不要被立即消费,而是设置一个延时时间,等过了这个时间再消费消息
使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 账单在一周内未支付,则自动结算
- 用户注册成功后,如果三天内没有登陆则进行短信提醒
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员
这时候,延时队列就可以闪亮登场了,以上场景,正是延时队列的用武之地
延时队列插件
插件安装
在 RabbitMQ 3.6.x
之前我们一般采用 死信队列+ TTL
过期时间来实现延迟队列,我们这里不做过多介绍,从 RabbitMQ 3.6.x
开始,RabbitMQ
官方提供了延迟队列的插件。插件下载地址:https://www.rabbitmq.com/community-plugins.html
本人使用的 RabbitMQ 3.8.3
版本,所以下载 rabbitmq_delayed_message_exchange-3.8.0.ez
这个插件放到 RabbitMQ
安装目录的 plugins
文件中。在 RabbitMQ
安装 sbin
文件中用 cmd
执行命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
开启插件后,启动 RabbitMQ
,访问登录后访问 http://localhost:15672
,在交换机 exchanges
的 tab
下,底部新增将看到如下图设置,则表示插件已启动,以后直接就可以使用了
插件原理
延迟插件底层简单原理图
- 原始的
死信队列+ TTL
的模式,消息首先会路由到一个正常的队列,根据设置的TTL
进入死信队列,与之不同的是通过x-delayed-message
声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至Mnesia
(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的Erlang
应用) - 这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是
Delay > 0, Delay =< ?ERL_MAX_T
(在Erlang
中可以被设置的范围为(2^32)-1
毫秒),如果消息过期通过x-delayed-type
类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了
插件实现延时队列
消息发送方
主要依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置文件
server.port=8080
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#确认消息已发送到交换机
spring.rabbitmq.publisher-confirms=true
#确认消息已发送到队列
spring.rabbitmq.publisher-returns=true
配置类(1
)
主要是队列,交换机的配置绑定
@Configuration
public class RabbitConfig {
// 延时交换机
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
// 延时队列名称
public static final String DELAY_QUEUE_NAME = "delay_queue";
// 普通交换机
public static final String ORDER_PAY_Exchange_Name = "order_pay_exchange";
// 普通队列名称
public static final String ORDER_PAY_QUEUE_NAME = "order_pay_queue";
// 普通交换机路由键
public static final String ORDER_PAY_ROUTING_KEY = "order_pay_routing_key";
// ------------------------延时队列------------------------
// 延时队列
@Bean
public Queue delayPayQueue() {
return new Queue(RabbitConfig.DELAY_QUEUE_NAME, true);
}
// 延时交换机
public FanoutExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
FanoutExchange fanoutExchange = new FanoutExchange(RabbitConfig.DELAY_EXCHANGE_NAME, true, false, args);
fanoutExchange.setDelayed(true);
return fanoutExchange;
}
// 绑定延时队列与延时交换机
@Bean
public Binding delayPayBind() {
return BindingBuilder.bind(delayPayQueue()).to(delayExchange());
}
// ------------------------普通队列------------------------
// 普通队列
@Bean
public Queue orderPayQueue() {
return new Queue(RabbitConfig.ORDER_PAY_QUEUE_NAME, true);
}
// 普通交换机
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(RabbitConfig.ORDER_PAY_Exchange_Name, true, false);
}
// 绑定普通消息队列
@Bean
public Binding orderPayBind() {
return BindingBuilder.bind(orderPayQueue()).to(orderExchange()).with(RabbitConfig.ORDER_PAY_ROUTING_KEY);
}
// 定义消息转换器
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// 定义消息模板用于发布消息,并且设置其消息转换器
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
// --------------------------使用RabbitAdmin启动服务便创建交换机和队列--------------------------
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
// 创建延时交换机和对列
rabbitAdmin.declareExchange(delayExchange());
rabbitAdmin.declareQueue(delayPayQueue());
// 创建普通交换机和对列
rabbitAdmin.declareExchange(orderExchange());
rabbitAdmin.declareQueue(orderPayQueue());
return new RabbitAdmin(connectionFactory);
}
}
配置类(2
)
消息发送到 exchange,queue
的回调函数
@Slf4j
@Configuration
public class RabbitConfirmConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
/*设置开启Mandatory才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数*/
rabbitTemplate.setMandatory(true);
/*消息发送到Exchange的回调,无论成功与否*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("ConfirmCallback:" + "相关数据:" + correlationData);
log.info("ConfirmCallback:" + "确认情况:" + ack);
log.info("ConfirmCallback:" + "原因:" + cause);
});
/*消息从Exchange路由到Queue失败的回调*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ReturnCallback:" + "消息:" + message);
log.info("ReturnCallback:" + "回应码:" + replyCode);
log.info("ReturnCallback:" + "回应信息:" + replyText);
log.info("ReturnCallback:" + "交换机:" + exchange);
log.info("ReturnCallback:" + "路由键:" + routingKey);
});
return rabbitTemplate;
}
}
发送方 service
@Service
public class MsgProductionService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送延时信息
public void sendTimeoutMsg(String content, String routingKey, int delay) {
// 通过广播模式发布延时消息,会广播至每个绑定此交换机的队列,这里的路由键没有实质性作用
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, routingKey, content, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 毫秒为单位,指定此消息的延时时长
message.getMessageProperties().setDelay(delay * 1000);
return message;
});
}
// 发送普通消息
public void sendMsg(String routingKey, String content) {
// DirectExchange类型的交换机,必须指定对应的路由键
rabbitTemplate.convertAndSend(RabbitConfig.ORDER_PAY_Exchange_Name, routingKey, content);
}
}
发送方 controller
@Controller
public class MsgSendController {
@Autowired
private MsgProductionService msgProductionService;
@GetMapping(path = "/sendMsg")
@ResponseBody
public String sendMsg() {
// 发送多个延时消息
msgProductionService.sendTimeoutMsg("hello1", "routingKey1", 40);
msgProductionService.sendTimeoutMsg("hello2", "routingKey2", 20);
msgProductionService.sendTimeoutMsg("hello3", "routingKey3", 60);
// 发送普通消息
msgProductionService.sendMsg(RabbitConfig.ORDER_PAY_ROUTING_KEY, "weixin");
msgProductionService.sendMsg(RabbitConfig.ORDER_PAY_ROUTING_KEY, "alipay");
return "success";
}
}
消息接收方
配置文件
server.port=8081
#配置rabbitmq服务器
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.type=simple
#消费方消息确认:手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
消息的消费
@Slf4j
@Component
public class MsgComsumerService {
// 监听消费延时消息
@RabbitListener(queues = {
"delay_queue"})
@RabbitHandler
public void process(String content, Message message, Channel channel) throws IOException {
try {
log.info("延迟队列的内容[{}]", content);
// 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("超时信息处理完毕");
} catch (Exception e) {
log.error("处理失败:{}", e.getMessage());
// 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
// 消费普通消息
@RabbitListener(queues = {
"order_pay_queue"})
@RabbitHandler
public void process1(String content, Message message, Channel channel) throws IOException {
try {
log.info("普通队列的内容[{}]", content);
// 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("普通信息处理完毕");
} catch (Exception e) {
log.error("处理失败:{}", e.getMessage());
// 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
测试
启动 RabbitMQ
,及两个 springboot
项目,RabbitMQ
管理页面如下
队列
交换机
调用接口 http://localhost:8080/sendMsg
,查看控制台结果
消息消费方日志
hello2
的延时时间是20s
hello1
的延时时间是40s
hello3
的延时时间是60s
消息发送方日志
源码:https://gitee.com/chaojiangcj/springboot-rabbitmq-delay-queue.git