https://blog.csdn.net/leeasony/article/details/104857576
https://blog.csdn.net/daijiguo/article/details/89059963
一. RocketMQ如何保证分布式事务最终一致性?
1.1 rocketmq执行过程
我们需要先了解一下事务的执行流程
- producer 会向broker 发送一个prepare消息,此消息不可以被counsumer看到,然后执行本地事务并提交。
- 执行本地事务的回调函数
executeLocalTransaction()
有三种返回值:
- LocalTransactionState.COMMIT_MESSAGE; //执行成功可以提交
- LocalTransactionState.ROLLBACK_MESSAGE;//遇到异常回滚
- LocalTransactionState.UNKNOW//未知错误:宕机、停电
将本地事务执行结果返回给broker
- 如果是
COMMIT_MESSAGE
那么把刚才的prepare
消息设置为可以被consumer消费 - 如果是
ROLLBACK_MESSAGE
那么broker
把这个prepare
消息删除 - 如果是
UNKNOW
那么broker
会定时调用producer
中的checkLocalTransaction
函数进行回查,判断本地事务执行是否正常(一般查询数据库数据是否已经持久化数据)
- 消费者消费消息
- 如果消费者消费消息失败,那么将失败的消息传回给
broker
,即重新写入commitLog
文件,消费者重新消费,假如消息回传的时候出现了问题,就是consumer
与broker
出现了断连,则consumer
会调用submitConsumeRequestLater
在consumer端重新消费,如果仍然消费失败,会不断重试,知道默认的16
次,使用msg.getReconsumeTimes()
方法来获取当前重试次数,如果重试次数足够多之后仍然无法消费成功,必须通过工单、日志等方式进行人工干预以让producer
事务进行回退处理。
二. RocketMQ如何保证消息不丢失
- 从
Producer
分析:如何确保消息正确的发送到了Broker?
-
默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
-
采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
-
RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功
- 从Broker分析:如果确保接收到的消息不会丢失?
- 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
- Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
- Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
- 从Cunmser分析:如何确保拉取到的消息被成功消费?
- 消费者可以根据自身的策略批量Pull消息
Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标 - 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
- 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作
- 如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地
三. 消息重复
- 消费端处理消息的业务逻辑保持幂等性
- 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。