一起养成写作习惯!这是我参与「掘金日新计划 · 4 月更文挑战」的第9天,点击查看活动详情
retry 消息投递
- 客户端如何实现 retry topic 消息的投递?
- 通过 Consumer client 发送出去 (超时情况)
- 通过 producer 发送出去(消费失败情况)
Consumer
提供了两种消费方式: pull
和 push
其实 push
也是包装了 pull
实现消息消费的。
DefaultMQPushConsumerImpl#sendMessageBack
调用 sendMessageBack 的几种场景?
- 并发消费模式下,消息消费超时(默认15分钟)
- 并发消费模式下,消息消费失败,可能是业务异常返回 exection,或者显示返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
复制代码
超时场景
消费超时是 consumer client 启动的时候, 定时线程调用的, 定时的时间就是消息的过期时间, 默认是 15min.
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
cleanExpireMsg();
} catch (Throwable e) {
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
}
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
public long getConsumeTimeout() {
return consumeTimeout;
}
/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
private long consumeTimeout = 15;
复制代码
消费超时调用路径如下:
ConsumeMessageConcurrentlyService#start -> ConsumeMessageConcurrentlyService#cleanExpireMsg -> ProcessQueue#cleanExpiredMsg -> DefaultMQPushConsumer#sendMessageBack -> DefaultMQPushConsumerImpl#sendMessageBack
复制代码
cleanExpireMsg 代码实现如下:
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
this.treeMapLock.readLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty()) {
String consumeStartTimeStamp = MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
if (StringUtils.isNotEmpty(consumeStartTimeStamp) && System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
} else {
break;
}
} else {
break;
}
} finally {
this.treeMapLock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
try {
pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
try {
this.treeMapLock.writeLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
removeMessage(Collections.singletonList(msg));
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
}
复制代码
关注 cleanExpiredMsg
的代码中两行
pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
复制代码
消息超时后,发送消息的 brokerName 参数是 null DefaultMQPushConsumer#sendMessageBack
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
复制代码
也就是说,导致超时的消息,最终有限重新发送到原来的 broker ,delayLevel 默认写死 3, 表示 30s。 这里的消费超时是指消息还在队列中,没有被取出来投递给 handler 处理。
消费异常场景
消息消费失败的场景 是指可能存在消费异常(抛出exception), 或者业务handler显式返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 或者没有返回结果null, 都会触发最终发送回 retry message
的逻辑.
调用路径如下:
ConsumeMessageConcurrentlyService$ConsumeRequest#run -> ConsumeMessageConcurrentlyService#processConsumeResult -> DefaultMQPushConsumerImpl#sendMessageBack
复制代码
重新投递为啥抛出异常,或者,没有返回 null 或者显示返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
?
具体看代码 ConsumeMessageConcurrentlyService#run 中的这几行。
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
...
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
复制代码
processConsumeResult
代码实现如下:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
复制代码
为啥能调用 sendMessageBack 方法 ,具体查看下面几行:
case RECONSUME_LATER:
ackIndex = -1;
...
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
复制代码
发现没,如果消息模式是广播的,会直接 drop ,如果是集群的,或重新投递。
和消费超时的参数不同,消息消费失败,不近需要重新计算 delayLevel
,并且 brokerName
是 message
的 brokerName
,不再是 null。
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg, e);
}
return false;
}
复制代码
retry 投递核心函数
DefaultMQPushConsumerImpl#sendMessageBack
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
复制代码
消费者发送&生产者重试消息投递的区别
- consumer client 发送
-
消费者发送,在消费超时的情况下,用 sendMessageBack 的brokerName 参数是 null, 也就意味着 超时的消息会被重新投递会 原来的broker,根据ip指定的。 那么, 如果这条消息是从 slave 拉取的话, 这个retry message 就会发送到 slave 上。
-
消息超时的场景中, delay level 写死的 3, 对应的延迟是 30s. 也就意味着, 当消息消费超时的时候, 再次消费是 30s之后.
- 生产者发送
-
消费失败的消息, 调用 sendMessageBack 的brokerName 参数是 消息的brokerName, 那么, 消费失败的消息会被投递给 这个消息所有的broker的master. 如果这条消息是从slave消费, 那么消费失败后也会被投递给 master
-
ConsumeConcurrentlyContext 进行显式设置, 如果没有设置, 默认是0. 但是在broker的处理流程中, 会尝试进行修正, 重置 delayLevel = 3 + msgExt.getReconsumeTimes(). 也就是说 重试次数越大, 消费延迟越久, 至少 >=30s.
broker在收到 CONSUMER_SEND_MSG_BACK 的请求后, 因为设置了 delayLevel 的原因, 消息会被发送到 延迟消息队列, 消息到期后, 重新投递回 retry topic 最终被 consumer client 消费. 需要注意的是, broker在处理 CONSUMER_SEND_MSG_BACK 的时候, 如果 retry topic不存在, 也会触发 retry topic的创建. 除此之外, 当消息的重复消费次数大于group配置的最大重试次数 (客户端默认16; 老版本<3.4.9客户端没有配置, 使用broker配置, 是 16),会被投递到 dead letter topic 。
总结一下:
- Retry topic 是什么时候创建的?
上一篇文章已经谈到:心跳和发送重试消息都会触发
- Retry topic 的queue有多少个?
1个, subscriptionGroupConfig.getRetryQueueNums()
, 默认是1 3. rocketmq是有master-slave的,slave上也会有retryTopic吗?
会, 心跳机制保证了slave 会创建, 除此之外, 消费重试场景下, 也会触发。 客户端和所有broker 都是长链接,master ,slave 都会创建。
- consumer client 从 broker A上 拉取的消息消费失败后, 还会投递到 broker A上吗? 负载均衡?
brokerA不宕机的情况下, 还是会投递到 brokerA上,如果发生网络故障或者 broker down,会依靠producer的路由选择机制进行负载均衡, 消费超时场景,可以看到,还是投递到原来的 broker 上。