RabbitMQ 入门系列7 -- 延时队列

1 场景导入

在一些场景中,程序不需要同步执行,如果邮件提醒之类的,我们只需要在当前线程之外开启一个新线程,然后当前线程继续往下执行即可,无需等待新线程。

但有时我们没有这么好运,想象一下,我们在网上购物,提交订单之后30分钟内必须付款,否则就会取消订单,这种情况下,我们不可能单单只开启一个异步线程而不做其他处理。在这里,我可以提供一种方案来解决这个问题,那就是使用延时消息队列。在消息发布之后,保存在消息中间件中,延时一段时间之后才会发布至队列中。延时队列监听器在这段时间结束后才能监听到消息并开始执行。

2 延时队列实现的原理

RabbitMQ 本身并没有直接实现延时队列,但是我们可以使用 RabbitMQ 的属性(TTL,DLX)来模拟一个延时队列。

我们可以在消息上设置过期时间,然后在消息队列上为死信指定转发器,这样消息过期后会转发到与指定转发器匹配的死信队列上,从而实现延时队列。

2.1 TTL

TTL,即 Time To Live,过期时间。消息在超过过期时间将会变成死信,不会被消费者消费。那我们又应该如何设置消息的 TTL 呢?其实有两种方法,一是在创建队列时指定 x-message-ttl ,这样子队列中所有的消息都有着统一的 TTL,二是在发送消息的时候为每条消息设置不同的 TTL,这样每条消息之间的过期时间是不同的。若两者均被设置,则过期时间取两者中的最小值。

2.2 DLX

DLX ,即 Dead Letter Exchange,死信交换机,详情请看我的上一篇文章,这里不多重复。

3 代码

话不多说,我们直接看代码

配置文件

server.port: 8080
spring.application.name: provider
spring.rabbitmq.host: 127.0.0.1
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
spring.rabbitmq.virtual-host: /

# 开启 confirm 确认机制
spring.rabbitmq.publisher-confirms: true
# 开启 return 确认机制
spring.rabbitmq.publisher-returns: true
# 手动应答
spring.rabbitmq.listener.simple.acknowledge-mode: manual
# 指定最小的消费者数量
spring.rabbitmq.listener.simple.concurrency: 1
# 指定最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 2
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled: true
# 指定一个请求能处理多少消息
spring.rabbitmq.listener.simple.prefetch: 1

配置类,这里我们的死信队列实际上就是我们的延时队列

package com.example.provider.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 直连型交换机
 * @author 30309
 *
 */
@Configuration
public class DirectRabbitConfig {

	//队列,名称为DirectQueue
	//同时将DirectQueue绑定到死信队列交换机上
    @Bean
    Queue DirectQueue() {
    	Map<String, Object> args = new HashMap<>(2);
    	//交换机标识符
        args.put("x-dead-letter-exchange", "DeadExchange");
        //绑定键标识符
        args.put("x-dead-letter-routing-key", "DeadRoutingKey");
        Queue queue = new Queue("DirectQueue", true, false, false, args);
        return queue;
    }
 
    //直连型交换机,名称为DirectExchange
    @Bean
    DirectExchange DirectExchange() {
        return new DirectExchange("DirectExchange");
    }
 
    //将队列和交换机绑定, 并设置用于匹配键:DirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
    }
    
    //创建死信队列
    @Bean
    Queue DeadQueue() {
        return new Queue("DeadQueue", true);
    }
    
    //创建死信交换机
    @Bean
    DirectExchange DeadExchange() {
        return new DirectExchange("DeadExchange");
    }
    
    //死信队列与死信交换机绑定
    @Bean
    Binding bindingDead() {
        return BindingBuilder.bind(DeadQueue()).to(DeadExchange()).with("DeadRoutingKey");
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
    	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 消息发送失败返回到队列中, 配置文件需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        
        return rabbitTemplate;
    }
   
    
}

生产者,我们这里将消息发到一个没有消费者的队列中,让消息度过它的 TTL。

package com.example.provider.controller;

import java.util.Date;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * 生产者
 * @author 30309
 *
 */
@RestController
public class SendMessageController{
 
    @Autowired
    RabbitTemplate rabbitTemplate; 
 
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
    	
    	rabbitTemplate.convertAndSend("DirectExchange", "DirectRouting", "Hello World", message -> {
    		//设置延时时间
    		//也可以在队列中设置
    		message.getMessageProperties().setExpiration(5 * 1000 + "");
            return message;
    	});
    	
    	System.out.println("生产时间:" + new Date());
    	
        return "ok";
    }
 
}

消息初始发送的队列没有消费者,只有延时队列存在消费者

package com.example.consumer.receiver;

import java.io.IOException;
import java.util.Date;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

/**
 * 延时队列消费者
 * @author 30309
 *
 */
@Component
public class DirectReceiver2 {

	@RabbitListener(queues = "DeadQueue")
	@RabbitHandler
    public void process(String str,Channel channel, Message message) {
    	System.out.println("消费时间:" + new Date());
        System.out.println("DirectReceiver2消费者收到消息: " + str );
        
		try {
			channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

测试
在这里插入图片描述
在这里插入图片描述
我们的测试结果显示,生产者在53秒的时候发送消息,我们的消费者在58秒的时候接收到消息,证明延时队列的延时功能基本正常。

4 延时队列的应用场景

  1. 订单:比如我们文章开头提到过的电商业务,提交订单之后30分钟内未完成付款将取消订单。
  2. 通知:在任务完成后的一段时间给用户发送通知
  3. 重试:业务操作失败后,间隔一段时间后进行失败重试
发布了113 篇原创文章 · 获赞 206 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/Geffin/article/details/102701164