1.产生消息丢失的原因
1.发送者发送消息的时候消息丢失,看起来消息发送出去了
2.mq已经收到消息,但是消费者消费的时候没有收到
在p->b e->q 以及q->c 三个地方来保证可靠抵达;(图片来自尚硅谷)
2.生产端 关于回调函数以及配置
#开启发送端确认
spring.rabbitmq.publisher-confirm-type=correlated
#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列 以异步方式优先回调returnconfirm
spring.rabbitmq.template.mandatory=true
@Slf4j
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
*定制RabbitTemplate
* 1.服务收到消息就回调
* 1)spring.rabbitmq.publisher-confirms=true
* 2)设置确认回调confirmCallback
* 2.消息正确抵达队列进行回调
* 1)spring.rabbitmq.publisher-returns=true
* 2) spring.rabbitmq.template.mandatory=true
* 设置确认回调ReturnCallback
* 3.消费端确认(保证每个消费被正确消费,此时才可以broker删除这个消息)
* 1)默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息
* 问题:
* 我们收到很多消息,自动回复服务器ack,只有一个消息处理成功,宕机了,就会发生消息丢失。
* 消费者手动确认模式,只要我们没有明确告诉MQ,货物被签收,没有ACK
* 消息就一直是unacked状态,即使Consumer宕机。消息不会丢失,会重新变成ready
* 2)如何签收:
* channel.basicAck(deliveryTag,false);签收获取
* channel.basicNack(deliveryTag,false,true);拒签
*
*/
@PostConstruct //再配置类对象创建完成以后,执行这个方法
public void initRabbitTemplate(){
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("发送成功");
}else {
System.out.println("发送失败");
}
}
};
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
}
});
}
}
(上述代码来自尚硅谷)
3.消费端 ack模式 以及聊天相关代码
springboot-rabbit提供了三种消息确认模式:
AcknowledgeMode.NONE:不确认模式(不管程序是否异常只要执行了监听方法,消息即被消费。相当于rabbitmq中的自动确认,这种方式不推荐使用)
AcknowledgeMode.AUTO:自动确认模式(默认,消费者没有异常会自动确认,有异常则不确认,无限重试,导致程序死循环。不要和rabbit中的自动确认混淆)
AcknowledgeMode.MANUAL:手动确认模式(需要手动调用channel.basicAck确认,可以捕获异常控制重试次数,甚至可以控制失败消息的处理方式)
一般推荐使用手动确认模式
例子如下:
springboot的配置
acknowledge-mode:none, auto, manual
none: 自动确认;auto:根据情况;manual:手动确认
spring:
rabbitmq:
host: localhost
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual // 开启手动确认
retry:
enabled: true // 开启重试
max-attempts: 3 //最大重试次数
initial-interval: 2000ms //重试间隔时间
@RestController
@RequestMapping("/mq")
//@Api(tags = "mq相关接口" , description = "MqController | 消息模块")
public class MqController {
private static final Logger logger = LoggerFactory.getLogger(MqController.class);
@Autowired
private ProdServer prodServer;
@Autowired
WebSocketServer webSocketServer;
@RabbitListener(queues = {
FanoutRabbitConfig.DEFAULT_BOOK_QUEUE})
public void listenerAutoAck(String text, Message message, Channel channel) {
// TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
logger.info("[消费者一监听的消息] - [{}]", text);
ChatDto.MqChat chatDto = JSONObject.parseObject(text, ChatDto.MqChat.class);
Integer rs = WebSocketServer.sendInfo(chatDto.getMessage(), chatDto.getToUserId());
// yanUserChatService.saveChat(chatDto.getOpenid(),chatDto,rs);
// webSocketServer.sendMqMessage(text);
// TODO 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// TODO 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
@ApiOperation("聊天核心方法 ")
public void defaultMessage(@RequestBody ChatDto chatDto,
HttpServletRequest request) throws IOException{
String message = chatDto.getContent();
String toUserId = chatDto.getEmail();
String openid = Util.fromRequestToOpenid(request);
Map<String, String> map = new HashMap<String, String>();
map.put("message",message);
map.put("toUserId",toUserId);
map.put("openid",openid);
rabbitTemplate.convertAndSend("fanoutExchange", "", JSONObject.toJSONString(map));
}
websocket的相关代码
public static Integer sendInfo(String message,@PathParam("userId") String userId) throws IOException {
log.info("发送消息到:"+userId+",报文:"+message);
if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
//这个地方最好封装一下报文请求 要不然前端不好处理
JSONObject jsonObject =new JSONObject();
//追加发送人(防止串改)
jsonObject.put("type","receive");
jsonObject.put("content",message);
webSocketMap.get(userId).sendMessage(jsonObject.toJSONString());
log.info("发送消息到成功");
return 1;
}else{
log.error("用户"+userId+",不在线!");
return 0;
}
}
在聊天的情境下面的rabbitmq 的应用;将发送消息之后的业务逻辑解耦,提高聊天过程中的性能。
4.如何保证消息可靠性
1、消息丢失
消息发送出去,由于网络问题没有抵达服务器
做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机
制,可记录到数据库,采用定期扫描重发的方式
做好日志记录,每个消息状态是否都被服务器收到都应该记录
做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进
行重发消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队
2、消息重复
消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
消息消费失败,由于重试机制,自动又将消息发送出去
成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理
rabbitMQ的每一个消息都有redelivered字段,可以获取是否 是被重新投递过来的,而不是第一次投递过来的
3、消息积压
消费者宕机积压
消费者消费能力不足积压
发送者发送流量太大
上线更多的消费者,进行正常消费
上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
————————————————
第四部分原文链接:https://blog.csdn.net/weixin_40566934/article/details/119643740
5.rabbitmq死信队列和延时队列的使用
————————————————
原文链接:https://blog.csdn.net/fu_huo_1993/article/details/88350188