一、channel.basicReject与channel.basicNack的区别
- channel.basicReject 一次只能拒绝一条消息,而 channel.basicNack 可以批量拒绝多条消息。
- 如果使用 channel.basicReject 拒绝消息后,需要手动确认模式,如果 requeue 为 true,则消息会被重新放入队列。如果此时只有一个消费者监听该队列,则有发生死循环的风险。多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免。
需要注意的是,以上结果适用于 RabbitMQ 的 RPC 机制。如果您使用的是其他消息队列,可能会有不同的结果。另外,在实际开发过程中,您可能需要参考具体的编程语言和消息队列客户端库的文档,以了解如何正确地使用 basicNack 和 basicReject 方法。
示例:
/**
* 客户取消订单,监听到消息
*/
@RabbitHandler
public void handleOrderCloseRelease(OrderTO orderTo, Message message, Channel channel) throws IOException {
log.debug("订单关闭准备解锁库存,订单号:" + orderTo.getOrderSn());
try {
wareSkuService.unLockStock(orderTo);
// 手动删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 解锁失败 将消息重新放回队列,让别人消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
二、@RabbitListener 与 @RabbitHandler 的区别
@RabbitListener 和 @RabbitHandler 都是 Spring Boot 集成 RabbitMQ 的消息监听器注解,但是它们的区别在于:
-
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用;而 @RabbitHandler 只能标注在方法上。
-
@RabbitListener 标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致;而 @RabbitHandler 标注在类上,表示当有收到消息的时候,就交给该类中的方法处理。具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型来确定。
示例:
/**
* 解锁库存,监听死信队列
*
* @author: charlin
**/
@Slf4j
@RabbitListener(queues = MQConstant.stock_release_queue)
@Component
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
* 库存解锁(监听死信队列)
* 场景:
* 1.下订单成功【需要解锁】(订单过期未支付、被用户手动取消、其他业务调用失败(订单回滚))
* 2.下订单失败【无需解锁】(库存锁定失败(库存锁定已回滚,但消息已发出))
* <p>
* 注意:需要开启手动确认,不要删除消息,当前解锁失败需要重复解锁
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTO locked, Message message, Channel channel) throws IOException {
log.debug("库存解锁,库存工作单详情ID:" + locked.getDetail().getId());
//当前消息是否重新派发过来
// Boolean redelivered = message.getMessageProperties().getRedelivered();
try {
// 解锁库存
wareSkuService.unLockStock(locked);
// 解锁成功,手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 解锁失败,消息入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
/**
* 客户取消订单,监听到消息
*/
@RabbitHandler
public void handleOrderCloseRelease(OrderTO orderTo, Message message, Channel channel) throws IOException {
log.debug("订单关闭准备解锁库存,订单号:" + orderTo.getOrderSn());
try {
wareSkuService.unLockStock(orderTo);
// 手动删除消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 解锁失败 将消息重新放回队列,让别人消费
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
三、RabbitMQ如何保证消息可靠性-消息丢失
1. 消息丢失可能是由于以下原因导致的:
- 网络故障:如果生产者或者消费者所在的网络环境出现问题,会导致消息丢失。这可以通过监控网络环境、优化网络配置、使用更高效的消息传输协议等方式来解决。
- 消息队列容量不足:如果消息队列的容量过小,无法容纳过多的消息,也会导致消息丢失。可以通过增加消息队列的容量或者优化消息队列的配置来解决这个问题。
- 消费者处理能力不足:如果消费者处理消息的速度不够快,或者消费者节点数量过少,会导致消息队列中的消息积压,从而导致部分消息丢失。
- 消息确认机制问题:如果使用了手动确认机制,并且在确认消息之前宕机或者断线,会导致部分消息丢失。可以通过优化消息确认机制、使用自动确认机制等方式来解决这个问题。
- 硬件故障或者软件bug:如果RabbitMQ或者相关组件出现硬件故障或者软件bug,会导致消息丢失。可以通过监控相关组件的状态、及时修复bug等方式来解决这个问题。
2. RabbitMQ 通过以下几种方式来保证消息的可靠性:
- 消息确认机制:RabbitMQ 支持两种消息确认机制,分别是手动确认(Basic.ack)和自动确认(Auto-ack)。对于重要的消息,开发者通常使用手动确认机制,即在消息处理成功后手动调用 basicAck 方法来确认消息已经处理完成。如果消息处理失败或者消费者在指定的时间内没有确认消息,RabbitMQ 会认为消息失败,会将消息重新放回队列中。
- 消息持久化:RabbitMQ 支持将消息持久化到磁盘中,以保证消息在宕机或者重启之后不会丢失。当生产者将消息发送到 RabbitMQ 中时,可以选择将消息持久化,这样即使 RabbitMQ 宕机或者重启,已经发送的消息也不会丢失。
- 消息队列的备份和恢复:RabbitMQ 支持队列的备份和恢复,以保证在队列故障或者宕机时能够及时恢复数据。
- 消息重试机制:对于重要的消息,可以在消费者端实现消息重试机制,以保证消息在处理失败时能够被重新处理。可以在消费者端记录处理失败的消息,并在之后重新发送到 RabbitMQ 中进行处理。
- 做好日志记录:每个消息状态是否都被服务器收到都应该记录
需要注意的是,虽然 RabbitMQ 通过上述方式来保证消息的可靠性,但在一些极端情况下,如网络故障、硬件故障等,仍然可能发生消息丢失的情况。因此在实际开发中,需要根据具体场景和需求来选择合适的保证方式。
四、RabbitMQ如何保证消息可靠性-消息重复
1. 原因:
- 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息
重新由unack变为ready,并发送给其他消费者 - 消息消费失败,由于重试机制,自动又将消息发送出去
- 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
2. 解决方法:
RabbitMQ 本身并不会自动保证消息的可靠性,但是在使用 RabbitMQ 时,可以通过以下几种方式来避免消息重复:
- 保证业务接口的幂等性:在使用 RabbitMQ 时,需要保证业务接口的幂等性,即同一个接口在处理同一条消息时,处理结果应该是相同的。可以通过在业务接口中添加唯一标识符或者时间戳等方式来实现幂等性。
- 使用防重表(redis/mysql):可以在消费者端使用防重报机制,以避免重复处理同一条消息。可以在消息体中添加唯一标识符或者时间戳,然后在消费者端进行判断,以避免重复处理同一条消息。
- 查看消息的 redelivered 字段:RabbitMQ 的消息属性中包含了一个 redelivered 字段,该字段用于表示这条消息是否已经被重新投递过。在消费者端,可以通过判断该字段的值来确定是否已经处理过该条消息。
- 使用事务:在一些场景下,可以使用 RabbitMQ 的事务功能来保证消息的可靠性。通过将消息发送和确认放在同一个事务中,可以确保消息的可靠性。
五、RabbitMQ如何保证消息可靠性-消息积压
1. 导致积压原因:
- 消费者处理能力不足:如果消费者处理消息的速度不够快,或者消费者节点数量过少,会导致消息队列中的消息积压。
- 生产者发送消息过快:如果生产者发送消息的速度过快,会导致消息队列中的消息积压。这可以通过限制生产者的发送速度或者优化生产者的发送方式来解决。
- 消息队列容量不足:如果消息队列的容量过小,无法容纳过多的消息,也会导致消息积压。可以通过增加消息队列的容量或者优化消息队列的配置来解决这个问题。
- 消息处理逻辑复杂:如果消息处理逻辑过于复杂,需要耗费大量的时间和资源来处理,也会导致消息积压。可以通过优化消息处理逻辑、减少处理时间或者增加消费者节点数量来解决这个问题。
- 网络拥堵:如果网络拥堵,会导致消息的传输速度变慢,从而导致消息队列中的消息积压。可以通过优化网络环境或者使用更高效的消息传输协议来解决这个问题。
2. 解决方法:
- 消息队列的持久化:RabbitMQ 支持将消息队列持久化到磁盘中,以保证消息在宕机或者重启之后不会丢失。当生产者将消息发送到 RabbitMQ 中时,可以选择将消息持久化,这样即使 RabbitMQ 宕机或者重启,已经发送的消息也不会丢失。
- 消息确认机制:RabbitMQ 支持两种消息确认机制,分别是手动确认(Basic.ack)和自动确认(Auto-ack)。对于重要的消息,开发者通常使用手动确认机制,即在消息处理成功后手动调用 basicAck 方法来确认消息已经处理完成。如果消息处理失败或者消费者在指定的时间内没有确认消息,RabbitMQ 会认为消息失败,会将消息重新放回队列中。
- 消息重试机制:对于重要的消息,可以在消费者端实现消息重试机制,以保证消息在处理失败时能够被重新处理。可以在消费者端记录处理失败的消息,并在之后重新发送到 RabbitMQ 中进行处理。
- 消息队列的备份和恢复:RabbitMQ 支持队列的备份和恢复,以保证在队列故障或者宕机时能够及时恢复数据。