死信队列(延迟队列)
死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制
般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。
死信消息来源:
消息 TTL 过期
队列满了,无法再次添加数据
消息被拒绝(reject 或 nack),并且 requeue =false
死信交换机(生产者
DeadConfig.java
package com.hmf.producer.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration @SuppressWarnings("all") public class DeadConfig { //1.需要正常的交换机 //2.正常队列发出消息(具备配置) //3.具备死信交换机,队列 @Bean public Queue normalQueue(){ Map<String,Object> config=new HashMap<>(); //过期时间 config.put("x-message-ttl", 10000); //死信交换机 config.put("x-dead-letter-exchange", "deadExchange"); //死信routing key config.put("x-dead-letter-routing-key", "DD"); return new Queue("normalQueue",true,false,false,config); } @Bean public Queue deadQueue(){ return new Queue("deadQueue",true); } @Bean public DirectExchange normalExchange() { return new DirectExchange("normalExchange"); } @Bean public DirectExchange deadExchange() { return new DirectExchange("deadExchange"); } @Bean public Binding normalBinding() { return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("CC"); } @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DD"); } }
ProducerController.java
package com.hmf.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SuppressWarnings("all")
@Slf4j
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/directSend")
public String directSend(String routingKey){
template.convertAndSend("directExchange",routingKey,"Hello");
return "okok";
}
@RequestMapping("/topicSend")
public String topicSend(String routingKey){
template.convertAndSend("topicExchange",routingKey,"Hello");
return "okok";
}
@RequestMapping("/fanoutSend")
public String fanoutSend(){
//键写成了null 不然它直接进入到第二个参数 否则会报错
template.convertAndSend("fanoutExchange",null,"Hello");
return "okok";
}
@RequestMapping("/deadSend")
public String deadSend(){
log.warn("订单已经保存");
//保存了一个订单
template.convertAndSend("normalExchange","CC","order-1902");
return "yes";
}
}
访问成功
死信交换机(消费者
DeadReceiver
package com.hmf.cunsumer.mq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") @RabbitListener(queues = "deadQueue") @Slf4j public class DeadReceiver { @RabbitHandler public void process(String message){ log.warn(message+":该订单已经过期"); } }