前言
上一篇博客已经总结了消费延时队列的两种方式,同时也根据实例说明了spring boot 消费消息的两种配置方式。这一篇博客在上一篇博客的基础上总结延时队列(其实就是常说的死信队列,但是不太喜欢这个名字,所以我这里称为延时队列)
什么是延时队列
简单点理解,就是在一定时长之后,消息不能在该队列中存在了。这个队列就称为延时队列,在实际中常见的订单在指定时间之后失效,背后就有延时队列的影子。如下图所示,生产者发送的消息,通过第一个交换机发送到延时队列,然后在延时队列中消息存放达到一定时长之后,消息被重新投递到第二个交换机中,然后第二个交换机将消息投递到消费者真正监听的消费队列中,其中延时队列的左右其实很简单,只是在消息达到一定时长之后重新投递消息。这里需要说明一点的是,延时队列与二个交换机的绑定是根据队列创建时指定的参数绑定的。这一点在后面的实例中会有所体现。
springboot中集成延时队列
这里直接进入实例部分,第一个交换机与延时队列的绑定,这里为了简单,就不用@RabbitListener注解实现了,毕竟这里是没有消费者的,我们在@Configuration容器中直接进行绑定。
1、引入各种消息队列已经routingKey的配置
#延时队列的生产端配置
simple.produce.exchange.name=simple.produce.exchange
simple.produce.routing.key.name=simple.produce.routing.key
##真正的延时队列
simple.delay.queue.name=simple.delay.queue
##延时队列再次投放消息的队列和交换机
simple.deal.routing.key.name=simple.deal.routing.key
simple.deal.exchange.name=simple.deal.exchange.name
simple.deal.queue.name=simple.deal.queue.name
2、将生产者的exchange与延时队列绑定
在创建延时队列的时候,通过指定x-dead-letter-exchange、x-dead-letter-routing-key和x-message-ttl三个参数,完成延时队列与二次发送消息的交换机的绑定,各个参数看名字就知道是什么意思,这里不再赘述。
/**
* 创建延时队列,并将其与二次消息投放的交换机进行绑定
* @return
*/
@Bean
public Queue delayQueue(){
Map<String, Object> args=new HashMap();
args.put("x-dead-letter-exchange", env.getProperty("simple.deal.exchange.name"));
args.put("x-dead-letter-routing-key", env.getProperty("simple.deal.routing.key.name"));
args.put("x-message-ttl", 10000);
return new Queue(env.getProperty("simple.delay.queue.name"),true,false,false,args);
}
@Bean
public TopicExchange produceDelayExchange(){
return new TopicExchange(env.getProperty("simple.produce.exchange.name"));
}
/**
* 将生产端与延时队列进行绑定
* @return
*/
@Bean
public Binding bindProduceAndDelayQueue(){
return BindingBuilder.bind(delayQueue()).to(produceDelayExchange())
.with(env.getProperty("simple.produce.routing.key.name"));
}
3、真正的消费端,通过@RabbitListener注解绑定
/**
* autor:liman
* createtime:2019/10/31
* comment: 延时队列最终处理消息的队列和交换机
*/
@Component
public class DelayRealDealQueueListener {
private static final Logger log = LoggerFactory.getLogger(DelayRealDealQueueListener.class);
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${simple.deal.queue.name}", durable = "true")
, exchange = @Exchange(value = "${simple.deal.exchange.name}", type = ExchangeTypes.TOPIC)
, key = "${simple.deal.routing.key.name}")
,containerFactory = "singleListenerContainer"
)
public void dealDelayMessage(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long delivertTag, Channel channel){
try{
String result = new String(message);
log.info("开始处理真正的延时处理消息:{}",result);
channel.basicAck(delivertTag,true);
}catch (Exception e){
log.error("延时消息异常:{}",e.fillInStackTrace());
}
}
}
3、编写一个简单的生产者,用于测试
/**
* autor:liman
* createtime:2019/10/31
* comment:
* 延迟队列的controller
*/
@RestController
public class DelayQueueController {
private static final Logger log= LoggerFactory.getLogger(DelayQueueController.class);
private static final String Prefix="delay/queue";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private Environment env;
@RequestMapping(value = Prefix+"/send",method = RequestMethod.GET)
public BaseResponse sendMail(@RequestParam String message){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
rabbitTemplate.setExchange(env.getProperty("simple.produce.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("simple.produce.routing.key.name"));
// String str="延迟队列的消息";
Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
rabbitTemplate.convertAndSend(msg);
}catch (Exception e){
e.printStackTrace();
}
log.info("发送消息完毕----");
return response;
}
}
测试结果:
消息在延时队列中的时候
消息进入真正的处理队里中的时候(关闭了消息确认)
总结
至此,关于RabbitMQ的简单集成,几乎完成,后面会继续结合实际场景总结几篇博客