发送消息超时,咋办

同步发送,最多发送 3 次

这里指的是发送成功,等待 broker 的响应时发生超时,客户端有理由认为是网络不好,数据没有到达 broker,因此重复发送消息,也就是这种情况会导致 broker 存在重复消息。当发生 RemotingException 或 MQClientException 时,会重复发送。

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
    try {
        SendResult sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    } catch (RemotingException e) {
        endTimestamp = System.currentTimeMillis();
        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
        log.warn(msg.toString());
        exception = e;
        continue;
    } catch (MQClientException e) {
        endTimestamp = System.currentTimeMillis();
        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
        log.warn(msg.toString());
        exception = e;
        continue;
    } catch (OtherException e) {
        break;
    ]

}

异步发送,只发送 1 次。

我本来打算,在 SendMessageProcessor#processRequest 方法中 sleep 一段时间,模拟网络延迟,结果发现 broker 有个自我保护,如果线程池队列中的请求自创建起 200 ms 时间没有处理完,则表明压力大,丢弃该请求,并通知客户端

// org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequestInQueue

猜你喜欢

转载自www.cnblogs.com/allenwas3/p/12207075.html