本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
1.拉取消息前的准备工作
在去
broker
拉取消息前必须要做的一件很重要的操作:触发重平衡
前面我们分析了Consumer
的启动流程,其中有两个服务类是特别关注的,一个是重平衡服务类RebalanceService
,一个是拉取消息的服务类PullMessageService
。
我们先看下拉取消息的服务类PullMessageService
,他是一个异步线程
,启动后将阻塞。
PullMessageService
对象是拉取消息的入口
public class PullMessageService extends ServiceThread {
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
//TODO:...省略其他代码.....
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//TODO:从队列中获取 PullRequest, 刚开始肯定获取不到,那么我们就要看是什么时候将PullRequest放入队列中的呢?
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
复制代码
它要从队列pullRequestQueue
中获取PullRequest
对象,但是刚开始肯定获取不到,所以我们就要看是什么时候将PullRequest
放入到队列中去的。
没错,这就要看
重平衡服务
了。
那么接下来我们就看下重平衡服务
都做了什么?重平衡服务也是一个异步线程服务
,我们就看下核心逻辑:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
// 广播模式:不需要处理负载均衡,每个消费者都要消费,只需要更新负载信息
case BROADCASTING: {
// 更新负载均衡信息,这里传入的参数是mqSet,即所有队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
// todo 集群模式
case CLUSTERING: {
// 从主题订阅信息缓存表中获取主题的队列信息, 获取这个topic下的所有队列(默认是4个)
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//发送请求从Broker中获取该消费组内当前所有的消费者客户端ID,主题的队
//列可能分布在多个Broker上,那么请求该发往哪个Broker呢?
//RocketeMQ从主题的路由信息表中随机选择一个Broker
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
// 如果mqSet、cidAll任意一个为空,则忽略本次消息队列负载
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 对cidAll、mqAll进行排序
// 这一步很重要,同一个消费组内看到的视图应保持一致,确保同一个消费队列不会被多个消费者分配
Collections.sort(mqAll);
Collections.sort(cidAll);
// 默认是平均分配策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// todo 分配算法
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
//TODO:保存了当前消费者需要消费的队列
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// todo 对比消息队列是否发生变化 更新负载均衡信息,传入参数是 allocateResultSet,即当前consumer分配到的队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
复制代码
然后继续看下updateProcessQueueTableInRebalance(...)
方法的核心逻辑:
/**
* 对比消息队列是否发生变化,主要思路是遍历当前负载队列集
* 合,如果队列不在新分配队列的集合中,需要将该队列停止消费并保
* 存消费进度;遍历已分配的队列,如果队列不在队列负载表中
* (processQueueTable),则需要创建该队列拉取任务PullRequest,
* 然后添加到PullMessageService线程的pullRequestQueue中,
* PullMessageService才会继续拉取任务
*/
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
/**
* ConcurrentMap〈MessageQueue, ProcessQueue〉
* processQueueTable是当前消费者负载的消息队列缓存表,如果缓存表
* 中的MessageQueue不包含在mqSet中,说明经过本次消息队列负载后,
* 该mq被分配给其他消费者,需要暂停该消息队列消息的消费。方法是
* 将ProccessQueue的状态设置为droped=true,该ProcessQueue中的消
* 息将不会再被消费,调用removeUnnecessaryMessageQueue方法判断是
* 否将MessageQueue、ProccessQueue从缓存表中移除。
* removeUnnecessaryMessageQueue在RebalanceImple中定义为抽象方
* 法。removeUnnecessaryMessageQueue方法主要用于持久化待移除
* MessageQueue的消息消费进度。在推模式下,如果是集群模式并且是
* 顺序消息消费,还需要先解锁队列
*/
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
/**
* 遍历本次负载分配到的队列集合,如果
* processQueueTable中没有包含该消息队列,表明这是本次新增加的消
* 息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读
* 取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,
* 如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供了
* CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、
* CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用
* DefaultMQPushConsumer#setConsumeFromWhere方法进行设置
*/
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
/**
* 经过消息队列重新负载(分配)后,分配到新的消息队列时,首
* 先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成
* 功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放
* 该消息队列的锁,然后在下一次队列重新负载时再尝试加锁
*/
// 顺序消息
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// todo 将PullRequest加入PullMessageService,以便唤醒PullMessageService线程
this.dispatchPullRequest(pullRequestList);
return changed;
}
复制代码
这里说一下,它会遍历MessageQueue
集合,这里的MessageQueue
集合是经过分配策略
分配的结果(默认是平均分配策略
),然后根据每个MessageQueue
会构建PullRequest
对象。
假如我有两个消费者A,B;并且是集群消费,然后队列数是4个,按照平均分配策略,那么A分配到的队列是2个(queueid=0,1),而B也分配2个(queueid=2,3); 集群模式下,一个队列只能被同一个消费者组下的一个消费者去消费。 现在是只有一个消费者A,队列数是4个,所以当前消费者A根据分配策略会分配到4个queue,然后遍历属于消费者A的4个queue,为每个queue创建一个
PullRequest
,然后放入集合中。
然后我们继续看下分发PullRequest
方法的逻辑: org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#dispatchPullRequest
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
复制代码
这里就是遍历PullRequest
集合,然后我们在继续往下看:
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
复制代码
这里就是重点了,它将PullRequest
对象放入了pullRequestQueue
队列中,这个队列
还熟悉吗?没错,这就是文章开始提到的PullMessageService
从pullRequestQueue
队列中获取PullRequest
对象的队列。刚开始获取不到,它会一直阻塞
,现在经过重平衡
后,队列中有了数据,现在就可以获取了,然后接下来就是拉取消息。
那么接下来我们就看消费者是如何通过PullMessageService
拉取消息的
2. 拉取消息
public class PullMessageService extends ServiceThread {
//TODO:.....
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//TODO:这里最开始是阻塞的,经过重平衡后可以获取到PullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
//TODO:拉取消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
复制代码
现在经过重平衡
后,这里就可以获取到PullRequest
对象,现在就可以拉取消息
了;那么我们继续跟进去,最后来到 DefaultMQPushConsumerImpl#pullMessage(PullRequest request)
中. 我们看具体都干了什么?
代码就不贴出来了,比较多,我就直接分析比较重要的部分
2.1 判断是否触发流控
流控主要是保护消费者。当消费者消费能力不够时,拉取速度太快会导致大量消息积压,很可能内存溢出
- 判断queue缓存的消息数量是否超过1000(可以根据
pullThresholdForQueue
参数配置),如果超过1000,则先不去broker拉取消息,而是先暂停50ms,然后重新将对象放入队列中(this.pullRequestQueue.put(pullRequest)
),然后重新拉取(就是上面代码中的this.pullReuqestQueue.take()
) - 判断queue缓存的消息大小是否超过100M(可以根据
pullThresholdSizeForQueue
参数配置),如果超过100M,则先不去broker拉取消息,而是先暂停50ms,然后重新将对象放入队列中(this.pullRequestQueue.put(pullRequest)
),然后重新拉取(就是上面代码中的this.pullReuqestQueue.take()
)
2.2 构建消息处理的回调对象 PullCallback
它是非常重要的,但是我这里先不说它,等从broker拉取到消息后,会交给它来处理,到时候我们在返回来看它,这里先跳过。
2.3PullAPIWrapper
拉取消息
2.3.1 客户端构建拉取消息的请求
//TODO:简单看下参数
this.pullAPIWrapper.pullKernelImpl(
//TODO: 指定去哪个queue拉取消息
pullRequest.getMessageQueue(),
//TODO:表达式,就是tag/sql
subExpression,
//TODO: 表达式类型,TAG/SQL
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
//TODO: 这个非常重要的,第一次拉取它的值是 0
pullRequest.getNextOffset(),
//TODO: 这个参数值默认是32
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
//TODO:当consumer拉取消息但broker没有时,此时broker会将请求挂起,默认是15s
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
//TODO: 异步
CommunicationMode.ASYNC,
//TODO: 它就是2.2中的回调对象
pullCallback
);
复制代码
nextOffset
参数比较重要,RocketMQ就是通过这个参数来保证消息不会重复消息的(宏观上) 将上面的参数信息封装到PullMessageRequestHeader
对象中,然后拉取消息
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl:
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
//TODO:消费哪个queue,重平衡服务做的就是这个
requestHeader.setQueueId(mq.getQueueId());
//TODO:从哪个queue的offset开始消费
requestHeader.setQueueOffset(offset);
//TODO: pullBatchSize, 默认是32
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
//TODO:当consumer拉取消息但broker没有时,此时broker会将请求挂起,默认是15s
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
//TODO: 拉取消息
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
复制代码
创建拉取消息的netty指令,发送到broker: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
//TODO: 构建拉取消息的netty指令:PULL_MESSAGE
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
//TODO: 异步拉取,将拉取消息的结果交给 PullCallback 处理
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
复制代码
异步拉取消息,将拉取到的消息交给 PullCallback 进行处理,后面我们在回来看
PullCallback
2.3.2 服务端接收拉取消息的请求
服务端接收拉取消息请求的处理器是:
PullMessageProcessor
首先是一系列的参数,权限判断,我们直接跳过,来到拉取消息的核心代码
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
//TODO:省略部分代码
//TODO: 从broker 拉取消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
//TODO:....省略大段代码......
//TODO:...校验拉取结果
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
//TODO: 从 getMessageResult 对象中获取消息内容
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
//TODO:设置到response中,返回给消费者
response.setBody(r);
}else {
//TODO:...省略else....
}
//TODO:如果没有拉取到消息,则挂起请求
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//TODO:构建新的拉取对象
//TODO:pollingTimeMills = 15 * 1000
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//TODO:放入 pullRequestTable 中,等待 PullRequestHoldService 唤醒
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
//TODO:如果挂起请求,则将response=null, 返回给消费者
response = null;
break;
}
}
//TODO:....省略部分代码.....
}
retrun respone;
}
复制代码
然后我们继续点进去看,它会来到DefaultMessageStore
对象的 getMessage(final String group, final String topic, final int queueId, final long offset,final int maxMsgNums, final MessageFilter messageFilter)
方法
getMessge()方法参数简单说明一下
-
第一个参数是消费者组
-
第二个参数是topic
-
第三个参数是queueid,表示消费哪个队列
-
第四个参数是offset,表示消费的起始偏移量,这个参数比较重要的
-
第五个参数是pullBatchSize的值,默认是32
-
第六个参数是消息过滤器(消费端可以根据TAG/SQL过滤消息,但是SQL过滤要在broker端完成过滤)
接下来就是拉取消息的核心逻辑了:
//TODO: maxMsgNums 默认是32,取的是 pullBatchSize的值
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
//TODO: ......省略一些不关注的代码......
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
//TODO: 下一次消费的起始偏移量,这里先将客户端传递过来的offset赋值给它,继续往后看
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
//TODO: 创建保存消息的容器
GetMessageResult getResult = new GetMessageResult();
//TODO:commmitlog的最大物理偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
//TODO: 根据 topic 和 queueId 获取 ConsumeQueue
// 一个 ConsumeQueue 对应一个 MappedFileQueue
// 一个 MappedFileQueue 对应多个 MappedFile
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
//TODO: 队列中保存了最大,最小 offset, 会设置到保存消息的容器GetMessageResult中
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
//TODO: ...... 省略offset的边缘检测.......
} else {
//TODO: 从 consumequeue 中读取索引数据
//TODO: 这个和消息分发 ReputMessageService 从 commitlog 中读取消息是一样的
//TODO: 第一次 offset=0, 从 consumequeue中读取多少消息呢?
//TODO: 在数据分发后,MappedFile wrotePosition 会记录写入的位置(就是记录写到哪里了)
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
//TODO: pullBatchSize(32) 好像并没有用, 只有当 pullBatchSize > 800 时才有用?
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//TODO: bufferConsumeQueue.getSize() 就是consumequeue 中的消息索引单元的总size(size/20 = 索引个数)
//TODO: 每20个字节往前推
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//TODO: 一个索引单元包含三个元素:消息偏移量,消息大小,消息tag的hashcode
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
//TODO: offsetPy + sizePy = 确定一条消息
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
//TODO: pullBatchSize在这里会工作,当超过默认的32条后,就会跳出循环
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}
//TODO: ..... 忽略判断代码.......
//TODO: 从commitlog 读取消息,一次读取一条消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
//TODO: .....忽略判断代码,比如没有读取到消息就continue....
//TODO: 将读到的消息放入容器中,然后继续循环
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
//TODO: 下一次的 queue offset
//TODO: 假如第一次读取,并且只有一条,那么 nextBeginOffset = 0 + 20 / 20 = 1;
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
}
//TODO: 忽略else
getResult.setStatus(status);
//TODO: 设置 nextBeginOffset ,消费者拿到 nextBeginOffset 后会设置到 nextOffset
//TODO: 然后消费者下次传过来,他就是这个方法的参数的 offset
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
复制代码
接下来我们对拉取逻辑做个总结:
- 创建保存消息的容器对象GetMessageResult,它重要保存4部分内容
- 1)它会保存拉取到的信息
- 2)逻辑消费队列的
nextBeginOffset
,这个参数非常非常的重要,它就表示消费者下次消费时从哪开始读取消息(指的是消息索引,根据索引读取真正的消息),后面我们会看到这个参数; - 3)逻辑消费队列的
minOffset
,最小消费偏移量 - 4)逻辑消费队列的
maxOffset
, 最大消费偏移量
- 根据topic和queueid获取
ConsumeQueue
,它就是逻辑消费队列,保存着索引单元数据,以及最大offset, 最小offset,以及最大物理偏移量maxPhysicOffset
这个queueid 就是通过重平衡后分配的
- 从
ConsumeQueue
中读取索引数据,从offset位置开始读取,那么这个offset是多少?它取的值是从消费端传过来的nextOffset(请看2.3.1);而这个nextOffset,就是从broker返回的nextBeginOffset,后面还会看到它。我假设是第一次读取消息,那么它肯定是0,那么读取多少消息索引数据呢?有提到,当消息索引写入后,会有一个wrotePosition
参数,记录已经写到的位置; 所以,我这里就读取从 offset到wrotePositon间的索引数据。
//TODO: 这个offset是消费端传过来的 nextOffset,而这个nextOffst 是broker返回的nextBeginOffset
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
复制代码
假设我在queueid=0的队列写入了33条消息,那么这里返回的
SelectMappedBufferResult
对象中,其内部的size=660,因为每个消息索引单元是固定20字节,所以20*33=660
- 遍历消息的索引单元(我假设消息生产者向queueid=0队列写入了33条数据,而我读取的也是queueid=0的队列,而且是第一次消费)
//TODO: bufferConsumeQueue.getSize()就是consumequeue 中的消息索引单元的总size(size/20 = 索引个数),如果是写入了33条数据,则size=660
//TODO: 循环条件是每次递增20byte(因为每个索引单元固定20byte)
int i = 0;
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//TODO:......
}
复制代码
i = 0, 遍历第1条消息索引单元,读取这个索引单元存储的3个数据,分别是消息偏移量offsetPy,消息大小sizePy,消息tag的hashcode
,然后根据消息的物理偏移量offsetPy
和 消息大小sizePy
,从commitlog中读取一条消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
复制代码
复制代码
然后将消息保存到容器对象GetMessageResult中.
i = 20 .....遍历第2条消息索引单元.......
i = 40 .....遍历第3条消息索引单元.......
........
i = 620 .....遍历第32条消息索引单元,将消息保存到容器中,此时容器中已经有了32条消息
i = 640 .....遍历第33条消息索引单元,但是此时(pullBatchSize的值<=容器中消息的总数
)的结果是true, 如果是true,则跳出循环,不在遍历索引数据。
- 计算下一次从队列的哪个位置开始消费,也就是计算队列新的起始offset
//TODO: 下一次的 queue offset
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
复制代码
前面我读到 i=640 时,也就是读到第32条消息后退出了循环。由于是第一次消费,所以offset=0, i = 640, 640/20 =32, 所以
nextBeginOffset=32
(当我下次读取时,这个nextBeginOffset 就变成了上面的offset)
那么本次共计读取了32条消息(还有1条消息没有读取消费,等待下一次读取)
- 给容器对象
GetMessageResult
设置offset值(重要)
//TODO: 设置 nextBeginOffset ,消费者拿到 nextBeginOffset 后会设置到 nextOffset
//TODO: 然后消费者下次传过来,他就是这个方法的参数的 offset
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
复制代码
nextBeginOffset=32,maxOffset=33,minOffset=0
- 检查拉取结果
2.3.3 检查拉取结果
- 如果拉取到了消息,则将消息内容设置到响应对象
RemotingCommand
(PullMessageResponseHeader
)中,然后返回给客户端。 - 如果没有拉取到消息,则将请求挂起,然后
RemotingCommand
将置为null,返回给消费者。然后通过异步任务PullRequestHoldService
实时扫描挂起的拉取请求。详见# broker 消息投递流程(处理PULL_MESSAGE请求)
- 如果有消息达到,则立刻唤醒挂起的请求,主动去broker拉取消息,然后主动推送给消费者
- 如果没有消息到达,但是挂起时间到了(20s),则也会主动去broker拉取消息,如果此时有消息,则将拉取结果主动推送给消费者,如果没有,则继续挂起请求。
2.3.4 客户端获取broker的响应结果
就是将broker响应的PullMessageResponseHeader
对象转换成客户端本地对象PullResult
,然后将PullResult
对象交给回调函数PullCallback
处理(就是 2.2 步骤)
2.4 回调函数PullCallback
处理拉取到消息(参考2.2步骤)
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
//TODO: 拉取消息回调,这里非常重要,不过这里是从broker拉取消息成功后才执行的,继续往后看
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// todo 执行结果
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
// todo
case FOUND:
// 更新PullRequest的下一次拉取偏移量,如果
//msgFoundList为空,则立即将PullReqeuest放入PullMessageService
//的pullRequestQueue,以便PullMessageSerivce能及时唤醒并再次执
//行消息拉取
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
// 因为RocketMQ根据TAG进行消息过滤时,在服务端只是验
//证了TAG的哈希码,所以客户端再次对消息进行过滤时,可能会出现
//msgFoundList为空的情况
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// todo 立即加入
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// todo 首先将拉取到的消息存入ProcessQueue,然后将拉取到
//的消息提交到Consume MessageService中供消费者消费
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// todo 将消息提交给消费者线程, PullCallBack将立即返回,可以说本次消息拉取顺利完成
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 然后查看pullInterval参数,如果pullInterval>0,则等待pullInterval毫秒后将PullRequest对象放
//入PullMessageService的pullRequestQueue中,该消息队列的下次拉
//取即将被激活,达到持续消息拉取,实现准实时拉取消息的效果
// todo 准备下一次的运行
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
// 延迟 xx秒后 进行一次pullRequest
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
// 立即进行一次 pullRequest
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
// todo NO_NEW_MSG(没有新消息)、NO_MATCHED_MSG(没有匹配消息)
case NO_NEW_MSG:
case NO_MATCHED_MSG:
// 则直接使用服务器端校正的偏移量进行下一次消息的拉取
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
// todo 非法
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
//如果拉取结果显示偏移量非法,首先将ProcessQueue的dropped设
//为true,表示丢弃该消费队列,意味着ProcessQueue中拉取的消息将
//停止消费,然后根据服务端下一次校对的偏移量尝试更新消息消费进
//度(内存中),然后尝试持久化消息消费进度,并将该消息队列从
//RebalacnImpl的处理队列中移除,意味着暂停该消息队列的消息拉
//取,等待下一次消息队列重新负载
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
// 这个方法会把 pullRequest 丢到 pullRequestQueue 中
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
复制代码
2.4.1 如果拉取出现异常
如果拉取出现异常,则执行异常回调
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
//TODO:延迟3s钟,将`PullRequest`对象再次放入队列`pullRequestQueue`中,等待再次`take()`,然后继续拉取消息的逻辑
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
复制代码
如果broker没有消息,则将拉取请求挂起,然后返回一个null对象给消费者,消费者如果拿到的是null,则视为是异常情况,然后执行异常回调。
所以说,如果没有消息,则broker将拉取请求挂起,其目的是如果有消息到达,能立刻写给消费者;同时消费者也会每隔3s
去broker拉取一次,如果这次依然没有消息,则继续将本次拉取请求挂起。
2.4.2 拉取成功,开始处理消息
2.4.2.1 消息转换和过滤
将二进制内容转换成 MessageExt
对象;并根据tag
进行过滤:
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
//TODO:将二进制消息转换成MessageExt对象
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
//TODO:根据TAG过滤消息
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
//TODO:....省略......
}
复制代码
2.4.2.2 更新PullRequest
对象的nextOffset
属性值
//TODO: 发现了消息
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
//TODO:更新nextOffset的值
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
复制代码
在前面2.3.2步骤中,我们举例读取了32条消息后,
nextBegingOffset
经过计算是32,然后将消息和offset值一并返回给消费者。所以这里PullRequest
的nextOffset
值是32.
2.4.2.3 将读取到的消息保存到本地缓存队列ProcessQueue
中
//TODO:将本次读取到的所有消息(经过了TAG/sql过滤了)保存到队列中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
复制代码
2.4.2.4 将消息提交到线程池中进行消费(重要)
这里我先不展开说,我放到第3大步骤中详细展开
2.4.2.5 再次将 PullRequest 放到阻塞队列
//TODO: 上面是异步消费(5.4.2.4),然后这里是将PullRequest放入 队列中,这样take()方法将不会
//TODO: 阻塞,然后继续从broker拉取消息,从而达到持续从broker拉取消息
//延迟 pullInterval 时间再去拉取消息
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
//立即拉取消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
复制代码
这里有一个
pullInterval
参数,表示间隔多长时间在放入队列中(实际上就是间隔多长时间再去broker拉取消息)。当消费者消费速度比生产者快的时候,可以考虑设置这个值,这样可以避免大概率拉取到空消息。 上面将新的对象PullRequest
放入队列中(这个新仅仅是因为nextOffset
值变了),然后还是执行第2大步骤,当broker接收到拉取请求后,然后将根据nextOffset
(值=32)读取逻辑索引的值,我当初举例的时候,是总共写入了33条消息(那么就有33条索引数据),所以,他会读取出[32,33]区间的索引数据,也就是最后一条消息索引,然后读取出真正的消息,再次计算 nextBeginOffset的值,然后返回给消费者。如此反复,从broker读取消息消费。
3.ConsumeMessageService
消费消息
就是2.4.2.4步骤的逻辑
//TODO: 将消息提交到线程池中,由ConsumeMessageService 进行消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
复制代码
由于我们的是普通消息(不是顺序消息),所以由ConsumeMessageConcurrentlyService
类来消费消息。
ConsumeMessageConcurrentlyService
内部会创建一个线程池ThreadPoolExecutor
,这个线程池非常重要,消息最终将提交到这个线程池中。
但是在提交到线程池之前,还要做一件事 ---》 分割消息
3.1 分割消息
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 表示消息批次,也就是一次消息消费任务ConsumeRequest中包含的消息条数,默认为1
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
/**
* msgs.size()默认最多为32条消息,受
* DefaultMQPushConsumer.pullBatchSize属性控制,如果msgs.size()
* 小于consumeMessage BatchMaxSize,则直接将拉取到的消息放入
* ConsumeRequest,然后将consumeRequest提交到消息消费者线程池
* 中。如果提交过程中出现拒绝提交异常,则延迟5s再提交
*/
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// todo 提交
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
// 延迟 提交
this.submitConsumeRequestLater(consumeRequest);
}
/**
* 如果拉取的消息条数大于
* consumeMessageBatchMaxSize,则对拉取消息进行分页,每页
* consumeMessageBatchMaxSize条消息,创建多个ConsumeRequest任务
* 并提交到消费线程池
*/
} else {
//TODO:分割消息
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
// todo 提交
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
// 延迟提交
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
复制代码
就是比较本次拉取到的消息总数size与consumeMessageBatchMaxSize
(默认=1)值的大小.如果size > consumeMessageBatchMaxSize
,则按照consumeMessageBatchMaxSize
将消息分割,然后分批次将消息submit到线程池中。
3.2 将消息submit到线程池中开始消费
提交到线程池中的是 ConsumeRequest
对象,他是一个Runnable
, 所以我们就看ConsumeRequest
的 run()
方法就好。
获取消息监听器然后开始消费
public void run() {
/**
* 先检查processQueue的dropped,如果设置为true,则停止该队列的消费。在进行消息重新负
* 载时,如果该消息队列被分配给消费组内的其他消费者,需要将
* droped设置为true,阻止消费者继续消费不属于自己的消息队列
*/
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 取出消息监听器 获取消息监听器MessageListenerConcurrently
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
/**
* 恢复重试消息主题名。这是为什么呢?这是由消息重试
* 机制决定的,RocketMQ将消息存入CommitLog文件时,如果发现消息的
* 延时级别delayTimeLevel大于0,会先将重试主题存入消息的属性,然
* 后将主题名称设置为SCHEDULE_TOPIC_XXXX,以便之后重新参与消息消
* 费
*/
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
...
try {
...
// todo 执行具体的消息消费,调用应用程序消息监听器的
//consumeMessage方法,进入具体的消息消费业务逻辑,返回该批消息
//的消费结果,即CONSUME_SUCCESS(消费成功)或
//RECONSUME_LATER(需要重新消费)
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
...
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
/**
* 执行业务消息消费后,在处理结果前再次验证一次
* ProcessQueue的isDroped状态值。如果状态值为true,将不对结果进
* 行任何处理。也就是说,在消息消费进入第四步时,如果因新的消费
* 者加入或原先的消费者出现宕机,导致原先分配给消费者的队列在负
* 载之后分配给了别的消费者,那么消息会被重复消费
*/
if (!processQueue.isDropped()) {
// todo
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
复制代码
这个监听器以及消费方法熟悉吗? 没错,他就是我们消费代码中指定的回调监听器
到这里,消费者就真正开始消费消息了。。。。。
3.3 处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
复制代码
直接看它的核心逻辑:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
//获取ackIndex的值,Integer.MAX_VALUE
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
/**
* 根据消息监听器返回的结果计算ackIndex,如果返回
* CONSUME_SUCCESS,则将ackIndex设置为msgs.size()-1,如果返回
* RECONSUME_LATER,则将ackIndex设置为-1,这是为下文发送msg
* back(ACK)消息做的准备
*/
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
//如果消费失败,则将ackIndex置为-1
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
/**
* 如果是广播模式,业务方会返回RECONSUME_LATER,消息
* 并不会被重新消费,而是以警告级别输出到日志文件中。如果是集群
* 模式,消息消费成功,因为ackIndex=
* consumeRequest.getMsgs().size()-1,所以i=ackIndex+1等于
* consumeRequest.getMsgs().size(),并不会执行sendMessageBack。
* 只有在业务方返回RECONSUME_LATER时,该批消息都需要发送ACK消
* 息,如果消息发送失败,则直接将本批ACK消费发送失败的消息再次封
* 装为ConsumeRequest,然后延迟5s重新消费。如果ACK消息发送成功,
* 则该消息会延迟消费
*/
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
/**
* 从ProcessQueue中移除这批消息,这里返回的偏移量
* 是移除该批消息后最小的偏移量。然后用该偏移量更新消息消费进
* 度,以便消费者重启后能从上一次的消费进度开始消费,避免消息重
* 复消费。值得注意的是,当消息监听器返回RECONSUME_LATER时,消息
* 消费进度也会向前推进,并用ProcessQueue中最小的队列偏移量调用
* 消息消费进度存储器OffsetStore更新消费进度。这是因为当返回
* RECONSUME_LATER时,RocketMQ会创建一条与原消息属性相同的消息,
* 拥有一个唯一的新msgId,并存储原消息ID,该消息会存入CommitLog
* 文件,与原消息没有任何关联,所以该消息也会进入ConsuemeQueue,
* 并拥有一个全新的队列偏移量
*/
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// todo 更新偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
复制代码
3.3.1 消费失败or成功
3.3.1.1 广播模式消费
广播模式消费成功,然后执行3.3.2步骤
广播模式消费失败,直接丢弃消息,什么也不做
3.3.1.2 集群模式消费
集群模式消费成功,然后执行3.3.2步骤
集群模式消费失败,则遍历消息,将每条消息重新发回到broker;如果消息发回到broker失败,也不能丢弃,则将消息重新放到ConsumeMessageConcurrentlyService
内部的线程池中,等待再次消费。
这里简单说下,发回broker都做了什么?
- 根据消费者组构建重试topic
"%RETRY%GroupName"
- 从commitlog再次读取出这条消息,在其properties中标记为retry。读取这条消息的目的是为了使用它的一些消息内容
- 设置延迟等级(再次说明消息的重试是利用延迟消息机制),第一次delayLevel默认是3,对应的延迟时间是10s,每次重试延迟等级+1;超过默认的16次后,则放入死信队列。
- 构建消息体对象
MessageExtBrokerInner
,设置topic为重试topic,设置重试次数+1 - 然后将消息写入到commitlog中broker 消息接收流程(写入commitLog),然后消息分发创建索引。
- 然后等待10s后,读取消息重试
这也说明,重试的消息虽然和原来的消息一模一样,但本质已经是新的消息了(原来的消息实际上已经被消费过了)
这里有一个疑问?
假如我提交到线程池中的消息总数是10条,我前面9条都消费成功了,但是最后一条消费失败了,那么前面9条也要重试吗?
答案是:是的。这也说明重试可能会导致重复消费。这一点还是要注意的
。
3.3.2 从本地缓存队列中移除消息并持久化offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
复制代码
无论是否消费成功,都会将队列缓存的消息remove掉,然后更新offset
到offset表中(offsetTable
)
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
复制代码
在Consumer客户端启动的时候,会启动很多定时任务,其中就有持久化offset的定时任务:
//TODO: 延迟10s之后,每隔5s执行一次持久化任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码
遍历所有queue,然后将其offset持久化到文件中。
总结下步骤:
- 构建
UpdateConsumerOffsetRequestHeader
对象,设置topic,queueid,offset
值 - 构建netty指令(
RequestCode.UPDATE_CONSUMER_OFFSET
),发送到broker端
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
复制代码
- broker接收到指令后,将信息保存到
ConsumerOffsetManager
对象的offsetTable
属性中
注意:这个offsetTable是服务端的,前面那个是消费者客户端的
- 服务端持久化offset,在broker(BrokerController)启动的时候,也会启动很多定时任务,其中就有持久化offset的,就是将上面的
offsetTable
内容写到文件中;代码如下:
//TODO:延迟10s,每隔5s持久化一次offset
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码
持久化的默认文件路径是:$home/store/config/consumerOffset.json
文件内容如下:
至此,消费者的消费就结束了。
4.总结
消费的过程要比消息的生产复杂的多,简单总结下
-
消费者启动拉取消息的服务
PullMessageService
,先阻塞在这里 -
然后启动重平衡服务,重平衡就是给当前消费者指定要去消费哪个
queue
,然后上面的就可以拉取消息了 -
发送
netty
请求到broker
,从commitlog
读取消息 -
读取到消息后开始处理消息,解码消息,根据TAG过滤消息
-
消费者消费消息
-
处理消费者的消费结果