启动任务
broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java类中。
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); // 默认值为60000,即60s
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 调用chekc方法
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
复制代码
这里采用了模板设计模式,重写了run()
和onWaitEnd()
两个方法,其中可以看到onWaitEnd()
方法中调用了check()
方法,那么在哪里调用了这个onWaitEnd()方法呢?看下waitForRunning()
方法,在src/main/java/org/apache/rocketmq/common/ServiceThread.java
中。
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
// 调用子类重写的逻辑
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
// 等待
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
复制代码
总结就是以默认60s的间隔执行一次check()方法,check()方法就是定时回查方法,下面看下。
check 流程
broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java类中,先看下全部的代码:
@Override
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
try {
// 定义topic为"RMQ_SYS_TRANS_HALF_TOPIC"
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 获取half队列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
// 获取opQueue
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取half队列的消费offset
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 获取op队列的offset
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
continue;
}
List<Long/*处理过得opQueue的offset*/> doneOpOffset = new ArrayList<>();
HashMap<Long/*halfQueue offset*/, Long/*opQueue offset*/> removeMap = new HashMap<>(); //存储已经处理过,但还未添加到doneOpOffset中的消息
// 将已处理但未更新的消息保存到removeMap中,后续进行判断时需要
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset);
continue;
}
int getMessageNullCount = 1; //获取空消息的次数
long newOffset = halfOffset; //当前处理half队列的最新进度
long i = halfOffset; //当前处理消息的half队列偏移量
while (true) {
// 检查当前任务的时间片是否用完
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 如果half消息已经被处理过,继续处理下一条消息
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
// 根据half队列消费offset获取消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
// 是否最大重试次数
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// 如果超过存储时间(默认3天)或者超过回查时间(默认15次)
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
// 如果存储时间大于开始时间,不处理
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); //消息已存储时间
long checkImmunityTime = transactionTimeout; //立即检测事务消息的时间
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); //获取消息的最晚回查时间
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
// 如果未到回查时间,不处理
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
// 获取消息列表
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 判断是否需要进行回查
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 消息重新put到halfQueue中
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 执行回查
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
// 更新halfQueue消费进度
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
// 更新opQueue消费进度
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}
}
复制代码
代码太多了,一个个拆解分析:
获取half队列
// 定义topic为"RMQ_SYS_TRANS_HALF_TOPIC"
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 获取half队列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
复制代码
在rocketmq中,所有事务消息会保存在两个topic队列中,分别是RMQ_SYS_TRANS_HALF_TOPIC
(简称halfQueue)和RMQ_SYS_TRANS_OP _HALF _TOPIC
(简称opQueue)
- RMQ_SYS_TRANS _HALF _TOPIC : 保存 half 消息
- RMQ_SYS_TRANS_OP _HALF _TOPIC : 当 half消息收到 commit/rollback 后,会保存到opQueue中
定义偏移量
// 开始时间
long startTime = System.currentTimeMillis();
// 获取opQueue
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取half队列的消费offset
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 获取op队列的offset
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
复制代码
- startTime : 作用是记录当前时间,后续在判断任务的执行时间时需要
- opQueue : 即
RMQ_SYS_TRANS_OP_HALF_TOPIC
队列,保存已处理过的消息。 - halfOffset : halfQueue 中的消费记录位置,可以保证的halfOffset之后的消息才需要进行回查操作
- opOffset : opQueue中的消费记录,大于opOffset的消息就是距离上次定时回查任务结束后,这段时间新增的消息。
标记已处理消息
List<Long/*处理过得opQueue的offset*/> doneOpOffset = new ArrayList<>();
HashMap<Long/*halfQueue offset*/, Long/*opQueue offset*/> removeMap = new HashMap<>(); //存储已经处理过,但还未添加到doneOpOffset中的消息
// 将已处理但未更新的消息保存到removeMap中,后续进行判断时需要
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
复制代码
- doneOpOffset: 保存opQueue中已经处理过得消息的offset,后续更新opQueue的offset时候要使用
- removeMap : 保存已经保存到opQueue中,但仍在halfQueue中的消息,后续判断消息是否需要回查时的依据之一。
fillOpRemove()
方法的目的就是填充已处理的消息到 removeMap
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
// step1 : 获取op消息,拉取32条消息
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
if (null == pullResult) {
return null;
}
// step2 : 判断状态
if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
return pullResult;
} else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { //没有新消息
log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null) {
log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
}
// step3 : 遍历所有消息
for (MessageExt opMessageExt : opMsg) {
// 获取opQueue的offset
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
if (queueOffset < miniOffset) {
// 正常情况,opQueue里的消息在halfQueue中offset要小于当前halfQueue的offset
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
// 消息已经存储到opQueue中,但是halfQueue中的offset还没有更新
// 保存到removeMap中,防止后续重复进行回查操作
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
} else {
log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
}
}
log.debug("Remove map: {}", removeMap);
log.debug("Done op list: {}", doneOpOffset);
return pullResult;
}
复制代码
定义变量
int getMessageNullCount = 1; //获取空消息的次数
long newOffset = halfOffset; //当前处理half队列的最新进度
long i = halfOffset; //当前处理消息的half队列偏移量
复制代码
- getMessageNullCount : rocketMQ允许进行重试,该字段是重试次数判断的依据
判断是否满足回查条件
// 检查当前任务的时间片是否用完
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 如果half消息已经被处理过,继续处理下一条消息
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
// 根据half队列消费offset获取消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
// 是否最大重试次数
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// 如果超过存储时间(默认3天)或者超过回查时间(默认15次)
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
// 如果存储时间大于开始时间,不处理
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); //消息已存储时间
long checkImmunityTime = transactionTimeout; //立即检测事务消息的时间
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); //获取消息的最晚回查时间
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
// 如果未到回查时间,不处理
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
复制代码
任务时间片机制:在rocketmq中,有一种通用的设计方案就是给任务分配时间片, 如果时间片用完则直接退出。
执行回查
// 获取消息列表
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 判断是否需要进行回查
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 消息重新put到halfQueue中
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 执行回查
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
continue;
}
复制代码
其中比较重要的两个部分,putBackHalfMsgQueue()
和resolveHalfMsg()
putBackHalfMsgQueue()
这里消息会重写追加到commitlog中,设计巧妙之处:
- 发送回查的接口是异步的,将消息重写追加到队列中,下一次回查的时候还能获取到该消息。如果回查有结果,消息就会保存到opQueue中,这样也会在fillOpRemove()中被过滤掉,不影响。
- 每次进行回查消息都要修改消息的回查次数属性,使用顺序写的性能要高于修改已存储消息的性能。
private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
// 再次将消息存储到commitlog中
PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
// 存储成功修改消息的属性
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
// 设置新的消费队列逻辑offset
msgExt.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
// 设置新的commitlog的offset
msgExt.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
// 设置新的msgId
msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
log.debug("Send check message, the offset={} restored in queueOffset={} " + "commitLogOffset={} " + "newMsgId={} realMsgId={} topic={}", offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(), msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), msgExt.getTopic());
return true;
} else {
log.error("PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + "msgId: {}", msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());
return false;
}
}
复制代码
更新索引
if (newOffset != halfOffset) {
// 更新halfQueue消费进度
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
// 根据doneOpOffset 列表获取opQueue的offset
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
// 更新opQueue消费进度
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
复制代码
calculateOpOffset()
方法简单说返回就是doneOffset列表中最后一个值。
private long calculateOpOffset(List<Long> doneOffset, long oldOffset) {
Collections.sort(doneOffset);
long newOffset = oldOffset;
for (int i = 0; i < doneOffset.size(); i++) {
if (doneOffset.get(i) == newOffset) {
newOffset++;
} else {
break;
}
}
return newOffset;
}
复制代码