首先讲解防止生产者丢失信息处理。
1、设置发送消息确认: publisher-confirms: true
2、启动消息失败返回: publisher-returns: true
在yml中配置如下
spring:
rabbitmq: host: localhost port: 5672 virtual-host: mall username: guest password: guest publisher-confirms: true #如果对异步消息需要回调必须设置为true publisher-returns: true listener: direct: acknowledge-mode: manual simple: retry: enabled: true max-attempts: 3 initial-interval: 2000 acknowledge-mode: manual default-requeue-rejected: false template: mandatory: true # 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
在文件中增加
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setChannelTransacted(false); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); }else{ System.out.print("----"); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { message.getMessageProperties().setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE); log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; }
此两步可以解决生产者丢失消息问题。
对于消费者丢失消息
设置手动确认。在消费代码中try..catch.然后进行手动确认
@Component @RabbitListener(queues = "mall.order.cancel.ttl") public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); /** * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完 * 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去 * spring.rabbitmq.listener.simple.acknowledge-mode = manual */ @RabbitHandler public void cfgUserReceiveDealy(Long orderId, Message message, Channel channel) throws IOException { LOGGER.info("===============接收队列接收消息===================="); LOGGER.info("接收时间:{},接受内容:{}", LocalDateTime.now(), orderId.toString(),message.getMessageProperties()); //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 boolean isAck = true; try { int f = 1/0; } catch (Exception e) { isAck = false; LOGGER.error("============消费失败,尝试消息补发再次消费!=============="); LOGGER.error(e.getMessage()); //这里最后一个参数是是否进入队列。我这里用法是死信队列所以必须设置为false。才能触发死信 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } if(isAck){ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }