RocketMQ4.9.1 源码分析-定时回查任务

启动任务

broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java类中。

@Override
public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();  // 默认值为60000,即60s
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

@Override
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 调用chekc方法
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
复制代码

这里采用了模板设计模式,重写了run()onWaitEnd()两个方法,其中可以看到onWaitEnd()方法中调用了check()方法,那么在哪里调用了这个onWaitEnd()方法呢?看下waitForRunning()方法,在src/main/java/org/apache/rocketmq/common/ServiceThread.java中。

protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true, false)) {
        // 调用子类重写的逻辑
        this.onWaitEnd();
        return;
    }

    //entry to wait
    waitPoint.reset();

    try {
        // 等待
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false);
        this.onWaitEnd();
    }
}
复制代码

总结就是以默认60s的间隔执行一次check()方法,check()方法就是定时回查方法,下面看下。

check 流程

broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java类中,先看下全部的代码:

@Override
    public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
        try {
            // 定义topic为"RMQ_SYS_TRANS_HALF_TOPIC"
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // 获取half队列
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                // 获取opQueue
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 获取half队列的消费offset
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                // 获取op队列的offset
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
                    continue;
                }

                List<Long/*处理过得opQueue的offset*/> doneOpOffset = new ArrayList<>();
                HashMap<Long/*halfQueue offset*/, Long/*opQueue offset*/> removeMap = new HashMap<>();  //存储已经处理过,但还未添加到doneOpOffset中的消息
                // 将已处理但未更新的消息保存到removeMap中,后续进行判断时需要
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset);
                    continue;
                }
                int getMessageNullCount = 1;  //获取空消息的次数
                long newOffset = halfOffset;  //当前处理half队列的最新进度
                long i = halfOffset; //当前处理消息的half队列偏移量
                while (true) {
                    // 检查当前任务的时间片是否用完
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // 如果half消息已经被处理过,继续处理下一条消息
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // 根据half队列消费offset获取消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            // 是否最大重试次数
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }

                        // 如果超过存储时间(默认3天)或者超过回查时间(默认15次)
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }

                        // 如果存储时间大于开始时间,不处理
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
                            break;
                        }

                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();  //消息已存储时间
                        long checkImmunityTime = transactionTimeout; //立即检测事务消息的时间
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); //获取消息的最晚回查时间
                        if (null != checkImmunityTimeStr) {
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            // 如果未到回查时间,不处理
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }

                        // 获取消息列表
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // 判断是否需要进行回查
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
                        if (isNeedCheck) {
                            // 消息重新put到halfQueue中
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 执行回查
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                if (newOffset != halfOffset) {
                    // 更新halfQueue消费进度
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    // 更新opQueue消费进度
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }

    }
复制代码

代码太多了,一个个拆解分析:

获取half队列

// 定义topic为"RMQ_SYS_TRANS_HALF_TOPIC"
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 获取half队列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
    log.warn("The queue of topic is empty :" + topic);
    return;
}
复制代码

在rocketmq中,所有事务消息会保存在两个topic队列中,分别是RMQ_SYS_TRANS_HALF_TOPIC(简称halfQueue)和RMQ_SYS_TRANS_OP _HALF _TOPIC(简称opQueue)

  • RMQ_SYS_TRANS _HALF _TOPIC : 保存 half 消息
  • RMQ_SYS_TRANS_OP _HALF _TOPIC : 当 half消息收到 commit/rollback 后,会保存到opQueue中

定义偏移量

// 开始时间
long startTime = System.currentTimeMillis();
// 获取opQueue
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取half队列的消费offset
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 获取op队列的offset
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
复制代码
  • startTime : 作用是记录当前时间,后续在判断任务的执行时间时需要
  • opQueue : 即RMQ_SYS_TRANS_OP_HALF_TOPIC队列,保存已处理过的消息。
  • halfOffset : halfQueue 中的消费记录位置,可以保证的halfOffset之后的消息才需要进行回查操作
  • opOffset : opQueue中的消费记录,大于opOffset的消息就是距离上次定时回查任务结束后,这段时间新增的消息。

标记已处理消息

List<Long/*处理过得opQueue的offset*/> doneOpOffset = new ArrayList<>();
HashMap<Long/*halfQueue offset*/, Long/*opQueue offset*/> removeMap = new HashMap<>();  //存储已经处理过,但还未添加到doneOpOffset中的消息
// 将已处理但未更新的消息保存到removeMap中,后续进行判断时需要
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
复制代码
  • doneOpOffset: 保存opQueue中已经处理过得消息的offset,后续更新opQueue的offset时候要使用
  • removeMap : 保存已经保存到opQueue中,但仍在halfQueue中的消息,后续判断消息是否需要回查时的依据之一。

fillOpRemove()

方法的目的就是填充已处理的消息到 removeMap

private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
    // step1 : 获取op消息,拉取32条消息
    PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
    if (null == pullResult) {
        return null;
    }
    // step2 : 判断状态
    if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
        log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
        transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
        return pullResult;
    } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { //没有新消息
        log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
        return pullResult;
    }
    List<MessageExt> opMsg = pullResult.getMsgFoundList();
    if (opMsg == null) {
        log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
        return pullResult;
    }
    // step3 : 遍历所有消息
    for (MessageExt opMessageExt : opMsg) {
        // 获取opQueue的offset
        Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
        log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
        if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
            if (queueOffset < miniOffset) {
                // 正常情况,opQueue里的消息在halfQueue中offset要小于当前halfQueue的offset
                doneOpOffset.add(opMessageExt.getQueueOffset());
            } else {
                // 消息已经存储到opQueue中,但是halfQueue中的offset还没有更新
                // 保存到removeMap中,防止后续重复进行回查操作
                removeMap.put(queueOffset, opMessageExt.getQueueOffset());
            }
        } else {
            log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
        }
    }
    log.debug("Remove map: {}", removeMap);
    log.debug("Done op list: {}", doneOpOffset);
    return pullResult;
}
复制代码

定义变量

int getMessageNullCount = 1;  //获取空消息的次数
long newOffset = halfOffset;  //当前处理half队列的最新进度
long i = halfOffset; //当前处理消息的half队列偏移量
复制代码
  • getMessageNullCount : rocketMQ允许进行重试,该字段是重试次数判断的依据

判断是否满足回查条件

// 检查当前任务的时间片是否用完
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
    break;
}
// 如果half消息已经被处理过,继续处理下一条消息
if (removeMap.containsKey(i)) {
    log.debug("Half offset {} has been committed/rolled back", i);
    Long removedOpOffset = removeMap.remove(i);
    doneOpOffset.add(removedOpOffset);
} else {
    // 根据half队列消费offset获取消息
    GetResult getResult = getHalfMsg(messageQueue, i);
    MessageExt msgExt = getResult.getMsg();
    if (msgExt == null) {
        // 是否最大重试次数
        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
            break;
        }
        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
            log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
            break;
        } else {
            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
            i = getResult.getPullResult().getNextBeginOffset();
            newOffset = i;
            continue;
        }
    }

    // 如果超过存储时间(默认3天)或者超过回查时间(默认15次)
    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
        listener.resolveDiscardMsg(msgExt);
        newOffset = i + 1;
        i++;
        continue;
    }

    // 如果存储时间大于开始时间,不处理
    if (msgExt.getStoreTimestamp() >= startTime) {
        log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
        break;
    }

    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();  //消息已存储时间
    long checkImmunityTime = transactionTimeout; //立即检测事务消息的时间
    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); //获取消息的最晚回查时间
    if (null != checkImmunityTimeStr) {
        checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
        // 如果未到回查时间,不处理
        if (valueOfCurrentMinusBorn < checkImmunityTime) {
            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                newOffset = i + 1;
                i++;
                continue;
            }
        }
    } else {
        if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
            log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
            break;
        }
    }
复制代码

任务时间片机制:在rocketmq中,有一种通用的设计方案就是给任务分配时间片, 如果时间片用完则直接退出。

执行回查

// 获取消息列表
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 判断是否需要进行回查
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
    // 消息重新put到halfQueue中
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    // 执行回查
    listener.resolveHalfMsg(msgExt);
} else {
    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
    log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
    continue;
}
复制代码

其中比较重要的两个部分,putBackHalfMsgQueue()resolveHalfMsg()

putBackHalfMsgQueue()

这里消息会重写追加到commitlog中,设计巧妙之处:

  1. 发送回查的接口是异步的,将消息重写追加到队列中,下一次回查的时候还能获取到该消息。如果回查有结果,消息就会保存到opQueue中,这样也会在fillOpRemove()中被过滤掉,不影响。
  2. 每次进行回查消息都要修改消息的回查次数属性,使用顺序写的性能要高于修改已存储消息的性能。
private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
    // 再次将消息存储到commitlog中
    PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
    // 存储成功修改消息的属性
    if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
        // 设置新的消费队列逻辑offset
        msgExt.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
        // 设置新的commitlog的offset
        msgExt.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
        // 设置新的msgId
        msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        log.debug("Send check message, the offset={} restored in queueOffset={} " + "commitLogOffset={} " + "newMsgId={} realMsgId={} topic={}", offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(), msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), msgExt.getTopic());
        return true;
    } else {
        log.error("PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + "msgId: {}", msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());
        return false;
    }
}
复制代码

更新索引

if (newOffset != halfOffset) {
    // 更新halfQueue消费进度
    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
// 根据doneOpOffset 列表获取opQueue的offset
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
    // 更新opQueue消费进度
    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
复制代码

calculateOpOffset()方法简单说返回就是doneOffset列表中最后一个值。

private long calculateOpOffset(List<Long> doneOffset, long oldOffset) {
    Collections.sort(doneOffset);
    long newOffset = oldOffset;
    for (int i = 0; i < doneOffset.size(); i++) {
        if (doneOffset.get(i) == newOffset) {
            newOffset++;
        } else {
            break;
        }
    }
    return newOffset;

}
复制代码

猜你喜欢

转载自juejin.im/post/7050423789095485476