一、消费端限流原理图
二、消费端限流实现步骤
1、确保Ack机制为手动确认机制
2、配置属性perfetch:设置每次拉取消息的数量
- 以上两个配置只需要在application.yml配置即可
# 配置RabbitMQ基本信息
spring:
rabbitmq:
host: 192.168.33.100
username: guest
password: guest
virtual-host: /
port: 5672
listener:
simple:
acknowledge-mode: manual # 设置消费端手动 ack
prefetch: 2 # 限流,设置消费端并发处理消息的数量,必须设置acknowledge-mode: manual 才有效
3、监听消息方法方法
- 处理完消息之后必须手动签收
@Component
public class RabbitMQConfig {
/**
* Consumer 消息限流
* 限流机制:
* 1. 确保ack机制为手动确认
* 2. 在application.yml设置prefetch属性:设置消费端并发处理消息的数量
* 比如:prefetch = 1 表示消费端每次从mq拉取一条消息,直到手动确认消费完毕之后,才会继续拉取下一条消息
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "xkp_topic_exchange",type = "topic"),
value = @Queue(value = "xkp_queue",durable = "true"),
key = "xkp.#"
))
public void handlerMessage(Message message, Channel channel)throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
Thread.sleep(1000);
// 1. 转换消息
System.out.println("消息内容: " + new String(message.getBody()));
// 处理消息
// .............
// 2. 手工签收
channel.basicAck(deliveryTag,true);
}
}