基本思路:在一个普通的mq 队列中设置死信交换器、死信路由键,然后把一个存放死信的队列与死信交换器、死信路由键三者捆绑。普通队列中设置消息过期时间,时间到了以后就会把消息发送到死信队列里面,等待消费者使用。具体代码如下:
1.常量
package com.lyf.www.testDelayQqueue;
/**
* @author liuyanfei
* @description
* @date 2020/4/11 10:59 PM
**/
public class MqConstant {
//正常 交换器、队列、key
public static final String ORDINARY_EXCHANGE = "ordinary_exchange";
public static final String ORDINARY_ROUTEKEY = "ordinary_routekey";
public static final String ORDINARY_QUEUE = "ordinary_queue";
// 死信 交换器、队列、key
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String DEAD_ROUTEKEY = "dead_routekey";
public static final String DEAD_QUEUEU = "dead_queue";
}
2.配置文件
package com.lyf.www.testDelayQqueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange ordinaryExchange() {
return new DirectExchange(MqConstant.ORDINARY_EXCHANGE, true, false);
}
@Bean
public Queue ordinaryQueue() {
Map<String, Object> map = new HashMap<>();
//把死信 交换器和路由键 绑定到普通队列中
map.put("x-dead-letter-exchange", MqConstant.DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", MqConstant.DEAD_ROUTEKEY);
Queue queue = new Queue(MqConstant.ORDINARY_QUEUE, true, false, false, map);
return queue;
}
//把普通 交换器、队列、路由键 绑定在一块
@Bean
public Binding queueDeadBinding() {
return BindingBuilder.bind(ordinaryQueue()).to(ordinaryExchange()).with(MqConstant.ORDINARY_ROUTEKEY);
}
//把死信 交换器、队列、路由键 绑定在一块
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(MqConstant.DEAD_EXCHANGE, true, false);
}
@Bean
public Queue deadQueue() {
return new Queue(MqConstant.DEAD_QUEUEU, true, false, false);
}
@Bean
public Binding queueTransBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(MqConstant.DEAD_ROUTEKEY);
}
}
3.生产者
package com.lyf.www.testDelayQqueue;
/**
* @author liuyanfei
* @description
* @date 2020/4/11 9:03 PM
**/
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class Producr {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String msg, long time) {
//rabbit默认为毫秒级
long times = time * 1000;
MessagePostProcessor processor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(times));
return message;
}
};
amqpTemplate.convertAndSend(MqConstant.ORDINARY_EXCHANGE, MqConstant.ORDINARY_ROUTEKEY, msg, processor);
}
}
4.消费者
package com.lyf.www.testDelayQqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class TradeProcess {
private static final Logger LOGGER = LoggerFactory.getLogger(TradeProcess.class);
@Autowired
private AmqpTemplate amqpTemplate;
//注意此时监听的是死信队列
@RabbitListener(queues=MqConstant.DEAD_QUEUEU)
@RabbitHandler
public void process(String content) {
LOGGER.info("接收到延时消息:"+content);
}
}
5.controller
package com.lyf.www.testDelayQqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
@RestController
@RequestMapping("/delayQueue")
public class DelayQueueController {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueController.class);
@Autowired
private Producr producr;
@GetMapping("/send/{time}")
public String send(@PathVariable("time") int time){
LOGGER.info("现在时间是:{},baby,你来吧我到了,你大概{}秒后能到达,我等你。。。。么么", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),time);
producr.send("我是你珊珊来迟的初恋女友,来啵一个。。。。", time);
return "ok";
}
}
6.application.properties
server.port=9002
#mq 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
7.maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>springboot_rabbitmq</groupId>
<artifactId>springboot_rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
</project>
浏览器执行:http://127.0.0.1:9002/delayQueue/send/5
运行结果:每次去约会 男士一定要先到几秒等初恋女友,不能让人家女孩子等你(敲黑板)。所以用延时队列再合适不过了,
当然这样写 是为了更方便理解。真实项目中还需要做下一步处理。不能我来个延时处理就弄一副上面的对应那一整套,这样做是违背人道的。 下节我将写一个更接近于真实项目中的 demo
下面是一个悲伤的故事
由于开始做单元测试的时候死信交换器我就随便叫了一个名字 比如"aaaa"。所以在运行了程序后,程序自动把这个 aaaa 绑定到了普通队列上面。
开始时我定义的常量,对应上面的 1.常量
public class MqConstant {
//正常 交换器、队列、key
public static final String ORDINARY_EXCHANGE = "ordinary_exchange";
public static final String ORDINARY_ROUTEKEY = "ordinary_routekey";
public static final String ORDINARY_QUEUE = "ordinary_queue";
// 死信 交换器、队列、key
//此时我的 死信交换器是 aaaa
public static final String DEAD_EXCHANGE = "aaaa";
public static final String DEAD_ROUTEKEY = "dead_routekey";
public static final String DEAD_QUEUEU = "dead_queue";
}
后来想起了当年被一个据说是阿里出来的大神面试,因为类名首字母没有大写而被 pass 的惨痛经历,我决定要从小事做起,命名定义变量一定要更规范,所以把 aaaa 改成"ordinary_exchange"。当然开始这是做了一个 demo 试试,很多写的都不规范,当我全部都整改完以后,再次执行程序,结果。。。。
结果就是,我再也没有等到我的初恋女友出现。熙熙攘攘的大街上,双双对对,而我的身边空空荡荡,路过音像店门口播放着当年乡村爱情的片尾曲:『是谁为我穿上嫁妆,是谁伴我走进洞房,谁是我的新郎我是谁的新娘』。我犹如看见我的女友在哼着这首歌牵起来别人的手。。。。我再也抑制不住我的悲伤,举起手对着老天喊:为什么。。。。,
在我检查了好几遍变量是否定义错误,程序是否有问题后,多次实验,女友终究还是没有回来。此时已经临晨两点!!!为什么,为什么。。。。我的一切都没有问题啊,为什么。此时程序启动几行小字引起了我的注意:
蓝色的框框里面全文是:
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'ordinary_queue' in vhost '/': received 'dead_exchange' but current is 'aaaa', class-id=50, method-id=10)
意思是:信道罢工了,ordinary_queue 这个队列的 x-dead-letter-exchange这个属性接收到的是dead_exchange,但是当前是 aaaa。什么意思呢???再来一张图
这是普通队列(死信交换器绑定在他的身上)的信息,一切真相大白。普通队列还绑定着我之前的那个死信交换器 aaaa,而我现在的死信交换器叫 dead_exchange。
我哭了,原来女友从来就没有变心,而是我变心了。她还傻傻的抱着我以前的相片等着我,而我得意于现在的新装对她视而不见。
之所以会犯这个错误,是因为被『Started App in 14.693 seconds (JVM running for 15.494)』这句话给蒙蔽了,其实那堆小字前面的信息有个 大红『Error』呢,我也没看见。一般运行 java 程序,看见 start…seconds就万事大吉了,但是使用中间件的时候,就一定要小心了,java 程序正确启动起来,不一定中间件会配合你。
结局:
以现在电视剧的套路,我最后一定和女友没羞没臊的生活在了一起。但是现实就是现实,题目说了这是个悲伤的故事。
后来我把普通队列(ordinary_queue)删除了,再次启动程序一切正常。然后也能收到延时消息了
当然来约会的也不再是初恋女友了。。。。。