服务异常通讯高级篇二(死信交换机)
1、初始死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,查DLX)。
如图,一个消息被消费者拒绝了,变成了死信:
另外,队列将死信投递给死信交换机时,必须知道两个信息:
- 死信交换机名称
- 死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
我们在consumer服务中,定义一组死信交换机、死信队列:
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
死信交换机总结
什么样的消息会成为死信?
- 消息被消费者reject或者返回nack
- 消息超时未消费
- 队列满了
死信交换机的使用场景是什么?
- 如果队列绑定了死信交换机,死信会投递到死信交换机;
- 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
2、TTL(Time-To-Live)
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
- 消息所在的队列设置了超时时间
- 消息本身设置了超时时间
一定要理解下图的处理方向,死信交换机是dl.direct
,对应的消息队列是dl.queue
,我们一开始是将消息发送到ttl.direct
交换机的再发送到绑定的队列上,然后ttl.queue
队列超时未处理消息变为死信,将消息转移到死信交换机上
2.1.接收(消费)超时死信交换机的信息(ttl.queue预计时间内没有被消费)
在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:
/**
* 创建死信交换机【消费者】
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue",durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("消费者接收到了dl.queue的延迟消息:"+msg);
}
2.2、创建一个TTL的类创建交换机和队列配置(设置队列延时处理)
我们是要将消息发送到当前的交换机上,让消息变成死信
重点在于队列
ttl.queue
通过deadLetterExchange("dl.direct")
以及deadLetterRoutingKey("dl")
绑定了死信交换机和通讯之间的key
package cn.itcast.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 项目名称:mq-advanced-demo
* 描述:TTL延迟创建、
*
* @author zhong
* @date 2022-06-01 11:17
*/
@Configuration
public class TTLMessageConfig {
/**
* 创建交换机
*/
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttl.direct");
}
/**
* 创建队列,指定延迟时间
*/
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dl.direct") // 绑定的死信交换机
.deadLetterRoutingKey("dl")
.build();
}
/**
* 绑定交换机和队列
*/
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
}
}
2.3、发送消息到延时队列(ttl.queue)
这边是将消息正常的发送到了
ttl.queue
队列上的,而他们之间的key就是ttl
/**
* 发送消息到延时队列中
*/
@Test
public void testTTLMessage(){
// (MessageDeliveryMode.PERSISTENT:设置消息持久
Message build = MessageBuilder.withBody("hellow ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 发送消息,指定通道
rabbitTemplate.convertAndSend("ttl.direct","ttl",build);
log.info("消息已经成功发送了");
}
启动测试(查看日志输出)
- 启动消费者等待死信消息(延时10秒后接收到死信消息)
- 发送消息
2.4、发送延时消息
注意观察多了消息的延时:【.setExpiration(“5000”) // 设置消息的延时时间】
/**
* 发送消息到延时队列中
*/
@Test
public void testTTLMessage(){
// (MessageDeliveryMode.PERSISTENT:设置消息持久
Message build = MessageBuilder.withBody("hellow ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000") // 设置消息的延时时间
.build();
// 发送消息,指定通道
rabbitTemplate.convertAndSend("ttl.direct","ttl",build);
log.info("消息已经成功发送了");
}
启动测试(查看日志输出)
这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。
TTL总结
消息超时的两种方式是?
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
如何实现发送一个消息20秒后消费者才收到消息?
- 给消息的目标队列指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒
3、延迟队列(DelayExchange插件)
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
3.1、安装DelayExchange插件
官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
上述文档是基于linux原生安装RabbitMQ,然后安装插件。
因为我们之前是基于Docker安装RabbitMQ,所以下面我们会讲解基于Docker来安装RabbitMQ插件。
3.2.下载插件
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
其中包含各种各样的插件,包括我们要使用的DelayExchange插件:
大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本。
3.3、安装插件
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的同学,请参考第一章部分,重新创建Docker容器。
我们之前设定的RabbitMQ的数据卷名称为mq-plugins
,所以我们使用下面命令查看数据卷:
docker volume inspect mq-plugins
进入到数据卷的位置后就可以通过下载或者上传插件了
3.3.1、最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq
,所以执行下面命令:
docker exec -it mq bash
3.3.2、执行时,请将其中的 -it
后面的mq
替换为你自己的容器名.
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
结果如下:
执行命令退出修改容器
exit
3.4、插件的使用【管理页面】
在管理页面中设置插件的使用【创建交换机,指定路由】
交换机设置出来了,那么我们该如何使用呢,需要在消息发送的时候指定一个延时时间
如下是在管理页面设置的一个延时的消息,需要指定一个头并设置一个延时时间
3.5、SpringAMQP代码中使用插件
3.5.1、基于注解的形式开发交换机
/**
* 基于注解的开发使用插件
* 交换机的差别就是设置了【delayed = "true"】属性
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key = "delay"
))
public void listDelayExchange(String msg){
log.info("消费者接收到了delay.queue的延迟消息:"+msg);
}
基于Bean注解开发的交换机
.delayed() // 设置delay属性为true
/**
* 创建交换机
* @return
*/
@Bean
public DirectExchange delayedExchange() {
return ExchangeBuilder
.directExchange("delay.queue") // 指定交换机类型和名称
.delayed() // 设置delay属性为true
.durable(true) // 持久化
.build();
}
/**
* 创建队列
*/
@Bean
public org.springframework.amqp.core.Queue delayedQueue(){
return new org.springframework.amqp.core.Queue("delay.direct");
}
/**
* 绑定交换机和队列
*/
@Bean
public Binding delayedBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
}
/**
* 监听延迟消息
* @param msg
*/
@RabbitListener(queues = "delay.queue")
public void listDelayExchange(String msg){
log.info("消费者接收到了delay.queue的延迟消息:"+msg);
}
重启消费者服务
3.5.2、发送延时消息
使用==.setHeader(“x-delay”,5000) // 设置插件消息的延时时间==来指定的该延时时间
/**
* 发送插件的延时时间
*/
@Test
public void testSendDelayMessage() {
// 1、准备消息
Message build = MessageBuilder.withBody("hellow delay messages".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay",5000) // 设置插件消息的延时时间
.build();
// 2、准备correlationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3、发送消息
rabbitTemplate.convertAndSend("delay.direct","delay",build,correlationData);
log.info("消息已经成功发送了");
}
3.5.3、运行结果
3.5.4、解决发送延时消息成功的报错消息
主要解决办法是判断一个是否有那个延时时间
receivedDelay
在消息发布的配置类上添加延迟判断
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 1、获取rabbittemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 2、配置ReturnCallback【使用了lambda表达式】
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判断是否未延迟消息
if(message.getMessageProperties().getReceivedDelay() > 0){
// 是一个延迟消息,忽略这个错误消息
return;
}
// 投递失败,记录日志
log.info("消息发送到队列失败,错误码{},失败原因{},交换机{},路由键key{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}