消费者限流是用于消费者每次获取消息时限制条数。注意前提是手动确认模式。并且在手动确认后才能获取到消息。
消费者端
目录结构
导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
修改yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手动确认模式
prefetch: 1 # 每次消费仅1条消息
业务逻辑
为了验证是否一定要手动确认才能真正消费消息,如下我进行了测试:首先我先让生产者生产两条消息在队列当中,如下图1所示。其次再看代码逻辑。当启动了消费者代码后入下图2所示:虽然确确实实被限流了,有一条未确认的消息,但当我们关闭消费者端的应用后,就又会变成图1所示。
图1
/**
* 消费者的限流机制
* 1、确保Ack机制为手动机制:acknowledge-mode: manual
* 2、每次消费消息的个数:prefetch: 1 只有手动确认完后才会拉取下一条消息
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue_name")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消费者接受的消息为:" + new String(message.getBody()));
}
}
图2
所以可以当我们再次改变业务逻辑:进行手动确认后就可以发现消息确确实实被消费了,如图3所示。要注意哈:第二个是否批量签收参数表示的是开启消费者后是否只会读取一次消息,而消费者限流prefetch表示的是每次读取只能为一条消息。两者的概念是不一样的。
/**
* 消费者的限流机制
* 1、确保Ack机制为手动机制:acknowledge-mode: manual
* 2、每次消费消息的个数:prefetch: 1 只有手动确认完后才会拉取下一条消息
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue_name")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(5000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();// 消息的唯一标识id
System.out.println("消费者接受的消息为:" + new String(message.getBody()));
channel.basicAck(deliveryTag,true);//每5s读一次消息(限流后每次为一条)
}
}