Springboot2.x集成Rabbitmq实现延时消费
前言
比如我们在使用mq的时候并不希望立即收到消息,比如实现3分钟后再查询订单,这时候我们就需要使用延迟消息发送了。
RabbitMQ如何实现迟队列?
AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能。但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:
特性1、Time To Live(TTL)
1.RabbitMQ可以针对Queue设置x-expires
2.针对Message设置 x-message-ttl
通过这两种方式来控制消息的生存时间
- 如果通过队列属性设置,队列中所有消息都有相同的过期时间。
- 如果通过对消息进行单独设置,每条消息TTL可以不同。
- 如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
特性2、Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列:
- x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
- x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
队列出现dead letter的情况有:
- 消息或者队列的TTL过期
- 队列达到最大长度
- 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
综合上述两个特性,设置了TTL规则之后当消息在一个队列中变成死信时,利用DLX特性它能被重新转发到另一个Exchange或者Routing Key,这时候消息就可以重新被消费了。
方式一:
我们通过插件的方式来实现rabbitMQ的延迟消息发送,在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
插件下载地址:下载,具体教程可以参考我blog,具体可参考我插件安装
注意:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用
1.配置
/**
* 定义延迟消息发送的队列.
*/
@Bean
public Queue delayQueue() {
return QueueBuilder.nonDurable("delay-queue").build();
}
/**
* 定义一个用于延迟消息发送的交换机
* 延时队列交换机 注意这里的交换机类型:CustomExchange
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delay-exchange", "x-delayed-message", true, false, args);
}
/**
* 给延时队列绑定交换机
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay-key").noargs();
}
消费者:
@RabbitListener(queues = {"delay-queue"})
public void redirectQueue(Message message, Channel channel) throws IOException {
log.info("delay-queue 10s 后 消费消息 {}", new String(message.getBody()));
}
测试:因为是延迟消息,所以这里编写一个接口来测试
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 通过插件设置消息的过期时间(10s)
*/
@GetMapping("/sendDelayMessage")
public void sendDelayMessage() {
rabbitTemplate.convertAndSend("delay-exchange", "delay-key","通过插件设置消息的过期时间(10s)", message -> {
message.getMessageProperties().setDelay(10 * 1000);
return message;
});
}
方式二:
通过rabbitMQ的特性来实现延迟消息发送
1.配置
/////////////////////////////////////////死信队列的应用//////////////////////////////////////////////////////////
/**
* DLX测试队列
*/
@Bean
public Queue DLXTestQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", "DL_exchange");
args.put("x-dead-letter-routing-key", "DL_queue");
return QueueBuilder.nonDurable("DLX-test-queue").withArguments(args).build();
}
@Bean
public Exchange DLXTestExchange() {
return ExchangeBuilder.directExchange("DLX-test-exchange").durable(true).build();
}
@Bean
public Binding DLXTestBinding() {
return new Binding("DLX-test-queue", Binding.DestinationType.QUEUE, "DLX-test-exchange", "DLX-test-queue", null);
}
///////////////////////////////////声明一个死信队列,交换机,绑定//////////////////////////////
@Bean
public Queue dlQueue() {
return QueueBuilder.nonDurable("DL_queue").build();
}
@Bean
public Exchange dlExchange() {
return ExchangeBuilder.directExchange("DL_exchange").durable(true).build();
}
/**
* 死信路由通过 DL_queue 绑定键绑定到死信队列上.
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_queue", Binding.DestinationType.QUEUE, "DL_exchange", "DL_queue", null);
}
2.消费者
/**
* 死信队列实现延迟消息
*/
@RabbitListener(queues = {"DL_queue"})
public void DLXTestQueue(Message message, Channel channel) throws IOException {
log.info("死信队列 10s 后 消费消息 {}", new String(message.getBody()));
}
3.测试
/**
* 测试死信队列
*/
@GetMapping("ttl")
public void testTTL() {
rabbitTemplate.convertAndSend("DLX-test-exchange", "DLX-test-queue", "去私信队列", messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("10000");
return messagePostProcessor;
});
}
总结:这里我们利用rabbitmq的特性和插件实现了消息的延迟投递,之前我也有用过ActiveMQ,ActiveMQ对消息延时和定时投递做了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型ScheduledMessage,个人感觉如果量不大并且需要很多定时投递,ActiveMQ也是一个不错的选择,但是如果对消息有严格的准确性,保证消息不丢失,保证性能的话选择RabbitMQ。
源码地址:https://gitee.com/zoo-plus/springboot-learn/tree/2.x/springboot-middleware/rabbitmq