在producer.start()后,就可以进行消息发送了,在ProducerImpl中
public SendResult send(Message message) {
this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
try {
com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ);
前面两行是检查状态和将消息转为正确的格式。核心代码是最后一行。
上面方法,最终调用了DefaultMQProducerImpl的sendDefaultImpl方法。
一、验证
这个方法的前两行还是做验证,不细究
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
二、获取topic信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
后面会根据获取到的topic的队列,路由等信息发送消息。
三、发送消息
发送消息的代码是在一个for循环里面,最多可以重试发送3次,发送消息的代码是:
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
在这个sendKernelImpl方法中,先是获取到broker的地址。
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
然后调用钩子的发送消息前的方法
this.executeSendMessageHookBefore(context);
发送消息的代码如下:
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
有三种发送消息的方式,sync,async,oneway,默认是使用sync。在发送消息前,还设置了消息头,此处不细说。
最后进入了NettyRemotingClient的invokeSync()方法。
通过如下方式发送了消息:
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}