个人名片:
博主:酒徒ᝰ.
个人简介:沉醉在酒中,借着一股酒劲,去拼搏一个未来。
本篇励志:三人行,必有我师焉。
本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》,SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 点击观看
二、消息可靠性
3. 消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
SpringAMQP允许配置三种确认模式:
manual:手动ack,需要在业务代码结束后,调用api发送ack。
auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
- none 模式下,消息投递是不可靠的,可能丢失
- auto 模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
- 演示none模式
修改consumer
服务的application.yml文件中的mq地址,用户名,密码等配置,同时添加下面内容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消费者接收到simple.queue的消息:{}", msg);
System.out.println(1/0);
log.debug("消息处理完成!");
}
测试:
@Test
public void testSimple() {
rabbitTemplate.convertAndSend("simple.queue", "hello, money");
}
测试可以发现,当消息处理抛异常时,消息还没有被消费者读取,就被RabbitMQ删除了。
- 演示auto模式
把确认机制修改为auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
添加监听类,监听队列,并抛出异常。
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:{" + msg + "}");
throw new RuntimeException();
}
启动consumer项目,控制台重复报错
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除: