上一章分析到consumer是如何发起拉起消息的请求的,主要还是利用Pull模式,当我们把拉取消息的请求发送给Broker后,Broker应该作何反映?以及Consumer接收到Broker反映后作何处理呢?
Broker处理拉取消息请求
前面我们在分析Consumer拉取消息发送的消息对象RemotingCommand的Code为PULL_MESSAGE,当Broker接收到消息后就会找到Code对应的Processor进行处理。
在BrokerController启动的时候会去调用registerProcessor()方法注册对应Code的处理器Processor。
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor)
processRequest处理消息方法
当消息进行拉取Broker会调用pullMessageProcessor类的processRequest方法进行处理。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
// 消息过滤构建
// 是否支持消息重试 isFilterSupportRetry默认false
// filterSupportRetry 用于控制消息过滤时是否支持消息重试
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 获取消息,同时进行消息过滤
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
// 对于拉取结果进行处理
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
// 建议消费者从备份节点拉取消息,以减轻主节点负载并提高系统可靠性和性能。
// 使用时需权衡数据同步延迟和网络情况,确保备份节点可用且数据与主节点保持一致。
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
// 判断拉取消息的状态码设置对应的响应码
switch (getMessageResult.getStatus()) {
// 成功找到返回SUCCESS
case FOUND:
response.setCode(ResponseCode.SUCCESS);
break;
// CommitLog中没有找到需要进行重试
case MESSAGE_WAS_REMOVING:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
// 没有匹配到Queue或者Queue没有消息
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
if (0 != requestHeader.getQueueOffset()) {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
// ...
}
// 如果存在消费钩子
if (this.hasConsumeMessageHook()) {
// 构建上下文消息
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getConsumerGroup());
context.setTopic(requestHeader.getTopic());
context.setQueueId(requestHeader.getQueueId());
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
// 判断响应码
switch (response.getCode()) {
// 消息成功,则将对应消息进行参数设置
case ResponseCode.SUCCESS:
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
context.setCommercialRcvTimes(incValue);
context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
context.setCommercialOwner(owner);
break;
// 没有拉取到消息,则判断是否允许挂起,能就设置参数
case ResponseCode.PULL_NOT_FOUND:
if (!brokerAllowSuspend) {
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(owner);
}
break;
// 其它的直接设置挂起参数
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(owner);
break;
default:
assert false;
break;
}
// 执行前置构子方法
this.executeConsumeMessageHookBefore(context);
}
switch (response.getCode()) {
case ResponseCode.SUCCESS:
// 添加数量和大小
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
// 是否使用堆内存
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
//堆外内存需要使用监听器触发
try {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}
response = null;
}
break;
// 没有找到对应的消息
case ResponseCode.PULL_NOT_FOUND:
// 消息长轮询1:消费者消费时,没有消息就会被缓存起来。
//brokerAllowSuspend 客户端初次请求消息时是指定的true。重新唤醒时指定为false
//hasSuspendFlag默认都是true
if (brokerAllowSuspend && hasSuspendFlag) {
//默认最长跟消费者有关。
// DefaultMQPushConsumerImpl.BROKER_SUSPEND_MAX_TIME_MILLIS 默认阻塞15秒
// DefaultLitePullConsumer.brokerSuspendMaxTimeMillis 默认20秒
long pollingTimeMills = suspendTimeoutMillisLong;
//没打开长轮询,也默认等待1秒
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//没有拉取到消息,就再创建一个拉取请求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//将请求放入ManyRequestPull请求队列 pullRequestTable
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
// ...
}
/**
* 拉取消息完毕后,无论是否拉取到消息,满足两个条件都会上报消费者上一次的消费点位,其一broker支持挂起,其二当前broker的角色不是Slave
*/
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
// 上报偏移量
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}
}
整体的消息过于长逻辑相对复杂,只是单纯的记录主要的知识点。
构建消息过滤的类,然后调用getMessage方法进行消息的获取同时进行消息的过滤,最后通过获取消息的返回去判断状态设置响应码。
如果存在消费钩子就会根据响应码将相关属性设置到上下文消息对象中然后执行executeConsumeMessageHookBefore方法。
判断响应码,如果拉取成功就会更新一些统计同时将消息转换成byte数组然后存入response的body属性中,如果拉取失败,如果支持长轮询就会将其存入缓存,broker支持挂起就会挂起默认最长挂起15s,构建一个拉取请求pullRequest对象,通过suspendPullRequest方法提交pullRequest对象。
拉取消息完毕后,无论是否拉取到消息,满足两个条件都会上报消费者上一次的消费点位,其一broker支持挂起,其二当前broker的角色不是Slave。
getMessage()获取消息方法
获取消息以及过滤消息都是调用DefaultMessageStore的getMessage方法来操作,最终返回GetMessageResult其中数据也是原始的Buffer数组而不是反序列化后的Message对象,这是由于该处并不需要对消息体进行相关出来而是需要获取出来然后在传递出去,直接原封不动拿出来可以提升效率。
Consumer处理Broker的响应
通过前面分析当Consumer发起拉取消息请求时候,就是调用DefaultMQPushConsumerImpl的pullMessage方法,存在两种方法进行响应,如果是同步方法就是调用pullKernelImpl等待响应结果后进行响应的处理,如果是异步方法就是构建一个PullCallback回调类,当成功返回时候就会去执行onSuccess方法失败就会去执行onException方法,主要分析异步的逻辑。
pullMessage方法
异步的请求也会去执行pullKernelImpl方法去处理,最终会执行到pullMessage方法。
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
// 单向
case ONEWAY:
assert false;
return null;
// 异步
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
// 同步
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
这个方法在前面也分析过,异步会去执行pullMessageAsync,同步会去执行pullMessageSync方法等待响应结果然后进行处理。
pullMessageAsync方法
pullMessageAsync方法,异步拉取消息并触发回调函数
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
// 执行invokeAsync得到响应结果就会去执行InvokeCallback回调函数
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
// 响应结果获取
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
// 正确执行onSuccess
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
// 异常执行onException
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}
接下来正式分析Consumer端处理Broker响应结果的回调方法。
回调成功onSuccess方法
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 获取请求结果,反序列化成Message对象
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
// 判断响应状态
switch (pullResult.getPullStatus()) {
// 正确返回
case FOUND:
// 更新offset
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 拉取消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 存放消息
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交拉取消息的请求(存在并发消息和顺序消息)
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 将pullRequest对象存入队列中,这样take()就不会阻塞,继续拉取消息,
// (参数 pullInterval 表示间隔多长时间放入队列,未放入队列就会一直阻塞,所以这里可以控制避免拉取空消息)
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
//...
default:
break;
}
}
}
调用processPullResult方法进行消息的反序列化以及过滤等其他操作,返回pullRequest对象,然后进行响应结果状态判断,如果成功返回就会去更新offset,拉取消息,提交消息,其中提交消息即消费消息存在两种模式一种为ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService消费。
最后将pullRequest对象存入队列中,这样take()就不会阻塞,继续拉取消息,(参数 pullInterval 表示间隔多长时间放入队列,未放入队列就会一直阻塞,所以这里可以控制避免拉取空消息)。
回调失败onException方法
// 如果拉取出现异常,则执行异常回调
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
} else {
// 延迟3s钟,将PullRequest对象再次放入队列pullRequestQueue中,等待再次take(),然后继续拉取消息的逻辑
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
判断异常类型是否为FLOW_CONTROL,如果是则会延迟20s后在进行拉取,反之则延迟3s后将PullRequest对象再次放入队列pullRequestQueue中,等待再次take(),然后继续拉取消息的逻辑。