上文分析了消息推送核心机制和主要过程。本文的目的是破除迷信,笔者试图从源码上修正一些市面上大多数文章对Rocket MQ推送模式错误理解。
从几张截图开始
在这之前我曾经搜索过这么类似的几个问题--"Rocket MQ推模式弊端",我们看看大家的回答。
我截取了一些:
破除谣言
上述他们的理解中存在不少错误,我分别驳斥之:
- Push模式由MQ主动将消息推送给消费者
在笔者的RocketMQ消息"推送"浅析(上)上已经分析过,Push并不会主动推送消息,而是被动的接收到拉取请求之后讲消息告知消费者 - Push过程中,不会考虑消费者处理能力
拉取之前目前的源码中存在三个条件的流量控制,目的就是为了考虑消费者的消费能力 - 要求消费端有很强的消费能力,消费端缓冲区可能会溢出
默认情况下Client每个Consumer订阅的每个Queue暂存的消息占用内存不能超过100M,且可以调控,其实真的没有那么容易溢出
看一下代码
上文中我们介绍了拉取消息是由PullMessageService服务线程决定的,但是最终的落脚点都是DefaultMQPushConsumerImpl#pullMessage()。笔者分析一下ta的为了保护Consumer端都做了哪些努力:
- 消费累计数量的控制
- 计算当前拉取请求对象--PullRequest对应的processQueue已经暂存的消息数量
- pullThresholdForQueue是队列级别的流量控制阈值,每个消息队列默认最多缓存1000条消息在Consumer端,判断两者大小,很显然如果暂存的消息已经超过限制,则不会处理该拉取请求,而是将这个请求重新入队
- 等待50ms之后再次处理,假设50ms后依然大于阈值则一直重复1,2步骤
public void pullMessage(PullRequest pullRequest) {
/* 当前ProcessQueue暂存的消息总数 */
long cachedMessageCount = processQueue.getMsgCount().get();
if (
cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()
) {
/* 放弃本次拉取任务,等待50ms,pullRequest又入队了 */
executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
);
return;
}
}
复制代码
pullThresholdForQueue这个阈值是可以调整的,
DefaultMQPushConsumerImpl暴露了setPullThresholdForQueue方法,
支持用户根据自身情况定制,默认等于1000
复制代码
- 占用内存的控制
- 计算当前拉取请求对象--PullRequest对应的processQueue已经暂存的消息占用内存总数
- pullThresholdSizeForQueue限制队列级别的缓存消息大小,每个消息队列默认最多缓存100M消息,判断两者大小,很显然如果暂存的消息已经超过限制,则不会处理该拉取请求,而是将这个请求重新入队
- 等待50ms之后再次处理,假设50ms后依然大于阈值则一直重复1,2步骤
public void pullMessage(PullRequest pullRequest) {
/* 当前ProcessQueue暂存消息占用内存空间(MB) */
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
/* 当前消息堆积超过100M,触发流控*/
if (
cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()
) {
/* 放弃本次拉取任务,等待50ms,pullRequest又入队了 */
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
);
return;
}
}
复制代码
同样pullThresholdSizeForQueue这个阈值是也是可以调整的,
DefaultMQPushConsumerImpl暴露了setPullThresholdSizeForQueue
方法,
支持用户根据自身情况定制,默认等于100
复制代码
- 消费偏移量之差大小的控制
这个是难以理解的,因为刚刚接触Rocket MQ的时候,我先入为主的以为,需要将ProcessQueue中的暂存的消息消费完毕之后才会继续拉取,其实不然,只要没有触发Consumer端的流量控制,PullMessageService会一直按照自己的节奏进行消息拉取。
每次消息消费进度的上报则是ProcessQueue中目前暂存消息的最小的偏移量,这个偏移量则决定了消费端重启之后,需要从何处开始拉取,而PullRequest对象中维护的下一次拉取偏移量--nextOffset是针对当前Consumer拉取行为每次计算得出的。
正常情况下,明明已经限制了暂存消息不能超过1000条,而拉取顺序又是从前到后的,怎么也不可能偏移量之差等于2000啊。
但是你自己思考假设出现了如此情况呢:如果offset=1的消息所在的消费线程死锁,那么这个消息就会一直没有ACK,而其他消息都在正常消费,就会导致一直可以正常拉取消息,偏偏这个时候消费端重启,而你其实已经消费了大量消息,但是消息偏移量却是1,这就会带来超级大量的重复消费,这个对我们而言是不可接受的。有了此项限制就保证了即使出现了死锁现象,不至于重复消费太多也就是2000来个,瑕不掩瑜。
public void pullMessage(PullRequest pullRequest) {
if (!this.consumeOrderly) {
/* 当前 ProcessQueue 中队列最大偏移量与最小偏移量间距不得超过2000 */
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
/* 放弃本次拉取任务,等待50ms,pullRequest又入队了 */
executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL
);
return;
}
}
}
复制代码
public long getMaxSpan() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
/**
* lastKey()当前暂存消息的最大偏移量
* firstKey()当前暂存消息的最小偏移量
*/
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey()
- this.msgTreeMap.firstKey();
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
}
return 0;
}
复制代码
总结一下
综上所述,Rocket MQ的推送模式请放心使用,根本不会出现文章开头所说的各种尴尬情况,不会破坏系统的稳定性。