springboot整合rabbitMQ
简介
-
rabbitMQ是部署最广泛的开源消息代理。
-
rabbitMQ轻量级,易于在内部和云中部署。 它支持多种消息传递协议。 RabbitMQ可以部署在分布式和联合配置中,以满足高规模,高可用性要求。
rabbitMQ的安装
博客中有介绍Linux系统与Windows系统的安装。
- Linux Linux下安装rabbitmq3.7.8。
- win Windows下RabbitMQ安装及注意事项
准备工作
- 1 pom.xml jar引入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 2 application.yml 文件配置(如果是本地安装,使用默认密码端口,此配置可以忽略)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
- 3 消息接收者Receiver
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
- 4 写一个RabbitMqConfig文件
@Configuration
public class RabbitMqConfig {
final static String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
- 5 消息发送者 Runner
@Component
public class Runner implements CommandLineRunner {
final static String queueName = "spring-boot";
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
private final ConfigurableApplicationContext context;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate,
ConfigurableApplicationContext context) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
this.context = context;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
context.close();
}
}
- 6 项目运行看到以下结果
Sending message...
Received <Hello from RabbitMQ!>
这是简单的springboot rabbitMQ 整合,当然rabbitMQ的支持不仅仅如此,他还提供了延迟队列
什么是延时队列,延时队列应用于什么场景
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景
- 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)
- 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会
- 系统中的业务失败之后,需要重试
Rabbitmq实现延时队列一般而言有两种形式:
- 第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
- 第二种方式:利用rabbitmq中的插件x-delay-message
利用TTL DLX实现延时队列的方式
TTL DLX是什么
-
TTL
RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
-
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
我们实现一下第一种方式
直接贴代码
- 创建一个RabbitMqConfig (初始化queue及exchange之间的binding关系)
@Configuration
public class RabbitMqConfig {
/**
* 创建一个立即消费队列
* @return
*/
@Bean
public Queue immediateQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue(Constants.IMMEDIATE_QUEUE, true);
}
/**
* 创建一个延时队列
* @return
*/
@Bean
public Queue delayQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY);
return new Queue(Constants.DELAY_QUEUE, true, false, false, params);
}
@Bean
public DirectExchange immediateExchange() {
// 一共有三种构造方法:
// 第一种,可以只传exchange的名字,
// 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
// 第三种,在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(Constants.IMMEDIATE_EXCHANGE, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(Constants.DEAD_LETTER_EXCHANGE, true, false);
}
/**
* 把立即消费的队列和立即消费的exchange绑定在一起
* @return
*/
@Bean
public Binding immediateBinding() {
return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(Constants.IMMEDIATE_ROUTING_KEY);
}
/**
* 把立即消费的队列和立即消费的exchange绑定在一起
* @return
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(Constants.DELAY_ROUTING_KEY);
}
}
- 生产者
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* Description : 发送延迟消息
* Group :
*
* @author honghh
* @date 2019/3/8 0008 18:03
* @param msg
* @param delayTime
*/
public void send(String msg, int delayTime) {
System.out.println("delayTime:[ms]" + delayTime);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
this.rabbitTemplate.convertAndSend(
Constants.DEAD_LETTER_EXCHANGE,
Constants.DELAY_ROUTING_KEY,
msg,
message -> {
message.getMessageProperties().setExpiration(delayTime + "");
System.out.println(sdf.format(new Date()) + " 发送完成.");
return message;
});
}
}
- 消费者 Receiver
@Component
@EnableRabbit
@Configuration
public class Receiver {
@RabbitListener(queues = Constants.IMMEDIATE_QUEUE)
@RabbitHandler
public void get(String msg) {
System.out.println(new Date() + " 收到延时消息了: " + msg);
}
}
- 测试类
@Autowired
Sender sender;
@Test
public void contextLoads() {
sender.send("msg--send", 10000);
}
- 启动项目 看控制台打印效果
delayTime:[ms]10000
2019-03-08 18:30:40 发送完成.
Fri Mar 08 18:30:50 CST 2019 收到延时消息了: msg--send
## 十秒后收到了发送的消息
代码获取
https://gitee.com/honghh/boot-demo.git