1 场景导入
在一些场景中,程序不需要同步执行,如果邮件提醒之类的,我们只需要在当前线程之外开启一个新线程,然后当前线程继续往下执行即可,无需等待新线程。
但有时我们没有这么好运,想象一下,我们在网上购物,提交订单之后30分钟内必须付款,否则就会取消订单,这种情况下,我们不可能单单只开启一个异步线程而不做其他处理。在这里,我可以提供一种方案来解决这个问题,那就是使用延时消息队列。在消息发布之后,保存在消息中间件中,延时一段时间之后才会发布至队列中。延时队列监听器在这段时间结束后才能监听到消息并开始执行。
2 延时队列实现的原理
RabbitMQ 本身并没有直接实现延时队列,但是我们可以使用 RabbitMQ 的属性(TTL,DLX)来模拟一个延时队列。
我们可以在消息上设置过期时间,然后在消息队列上为死信指定转发器,这样消息过期后会转发到与指定转发器匹配的死信队列上,从而实现延时队列。
2.1 TTL
TTL,即 Time To Live,过期时间。消息在超过过期时间将会变成死信,不会被消费者消费。那我们又应该如何设置消息的 TTL 呢?其实有两种方法,一是在创建队列时指定 x-message-ttl ,这样子队列中所有的消息都有着统一的 TTL,二是在发送消息的时候为每条消息设置不同的 TTL,这样每条消息之间的过期时间是不同的。若两者均被设置,则过期时间取两者中的最小值。
2.2 DLX
DLX ,即 Dead Letter Exchange,死信交换机,详情请看我的上一篇文章,这里不多重复。
3 代码
话不多说,我们直接看代码
配置文件
server.port: 8080
spring.application.name: provider
spring.rabbitmq.host: 127.0.0.1
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
spring.rabbitmq.virtual-host: /
# 开启 confirm 确认机制
spring.rabbitmq.publisher-confirms: true
# 开启 return 确认机制
spring.rabbitmq.publisher-returns: true
# 手动应答
spring.rabbitmq.listener.simple.acknowledge-mode: manual
# 指定最小的消费者数量
spring.rabbitmq.listener.simple.concurrency: 1
# 指定最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 2
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled: true
# 指定一个请求能处理多少消息
spring.rabbitmq.listener.simple.prefetch: 1
配置类,这里我们的死信队列实际上就是我们的延时队列
package com.example.provider.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 直连型交换机
* @author 30309
*
*/
@Configuration
public class DirectRabbitConfig {
//队列,名称为DirectQueue
//同时将DirectQueue绑定到死信队列交换机上
@Bean
Queue DirectQueue() {
Map<String, Object> args = new HashMap<>(2);
//交换机标识符
args.put("x-dead-letter-exchange", "DeadExchange");
//绑定键标识符
args.put("x-dead-letter-routing-key", "DeadRoutingKey");
Queue queue = new Queue("DirectQueue", true, false, false, args);
return queue;
}
//直连型交换机,名称为DirectExchange
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange");
}
//将队列和交换机绑定, 并设置用于匹配键:DirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
}
//创建死信队列
@Bean
Queue DeadQueue() {
return new Queue("DeadQueue", true);
}
//创建死信交换机
@Bean
DirectExchange DeadExchange() {
return new DirectExchange("DeadExchange");
}
//死信队列与死信交换机绑定
@Bean
Binding bindingDead() {
return BindingBuilder.bind(DeadQueue()).to(DeadExchange()).with("DeadRoutingKey");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息发送失败返回到队列中, 配置文件需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
生产者,我们这里将消息发到一个没有消费者的队列中,让消息度过它的 TTL。
package com.example.provider.controller;
import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 生产者
* @author 30309
*
*/
@RestController
public class SendMessageController{
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
rabbitTemplate.convertAndSend("DirectExchange", "DirectRouting", "Hello World", message -> {
//设置延时时间
//也可以在队列中设置
message.getMessageProperties().setExpiration(5 * 1000 + "");
return message;
});
System.out.println("生产时间:" + new Date());
return "ok";
}
}
消息初始发送的队列没有消费者,只有延时队列存在消费者
package com.example.consumer.receiver;
import java.io.IOException;
import java.util.Date;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
/**
* 延时队列消费者
* @author 30309
*
*/
@Component
public class DirectReceiver2 {
@RabbitListener(queues = "DeadQueue")
@RabbitHandler
public void process(String str,Channel channel, Message message) {
System.out.println("消费时间:" + new Date());
System.out.println("DirectReceiver2消费者收到消息: " + str );
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试
我们的测试结果显示,生产者在53秒的时候发送消息,我们的消费者在58秒的时候接收到消息,证明延时队列的延时功能基本正常。
4 延时队列的应用场景
- 订单:比如我们文章开头提到过的电商业务,提交订单之后30分钟内未完成付款将取消订单。
- 通知:在任务完成后的一段时间给用户发送通知
- 重试:业务操作失败后,间隔一段时间后进行失败重试