本文已参与[新人创作礼]活动,一路开启掘金创作之路。
基于rocketmq-4.9.0 版本分析rocketmq
前面我们分析了RocketMQ
的NameServer
和Broker
的启动流程,接下来我们看下作为客户端的消息生产者是如何启动的。
先看下客户端发送消息的代码:
/**
* @author qiuguan
* @version SyncProducer.java, v 0.1 2022/03/25 18:03:24 qiuguan Exp $
* 同步发送消息
*
* <p>
* public enum SendStatus {
* SEND_OK,
* // 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出 现这种异常状态。异步刷盘不会出现
* FLUSH_DISK_TIMEOUT,
* // Slave同步超时。当Broker集群设置的Master-Slave的复 制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
* FLUSH_SLAVE_TIMEOUT,
* // 没有可用的Slave。当Broker集群设置为Master-Slave的 复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
* SLAVE_NOT_AVAILABLE,
* }
*
* </p>
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
//TODO:实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("rocketmq-test-group");
//TODO:设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//TODO:启动Producer实例
producer.start();
//发送消息到broker
for (int i = 0; i < 1; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("my-rockemq-topic",
"*",
("Hello RocketMQ, producer is qiuguan " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 为消息指定key
msg.setKeys("mq-" + i);
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%s%n", sendResult, i);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
复制代码
生产者对象有很多参数可以配置,我这里都使用了默认的参数值,如果想尝试其他参数设置,可以参考官方文档
从代码中不难看出,生产者发送消息只需要3个步骤:创建生产者客户端对象
,启动
,发送消息
。 那么我们就按照这三个步骤逐步分析下它是如何工作的。
1.创建生产者客户端对象
//TODO:实例化消息生产者Producer,指定生产者组名称
DefaultMQProducer producer = new DefaultMQProducer("rocketmq-test-group");
复制代码
其实这个类它就是一个外观类,我们看下它的构造器方法:
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
//TODO:它才是真正工作的类
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
复制代码
所以,它主要是创建了 DefaultMQProducerImpl
对象,那么我们再看下它的构造器方法:
个人觉得名字取的属实不咋滴,乍一看,好像是接口和实现类的关系,但并不是。
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
//TODO:创建一个异步发送的队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
//TODO:创建一个异步发送的线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
复制代码
从构造器中我们发现,它主要就是在做两件事:
- 创建一个异步发送的队列:
asyncSenderThreadPoolQueue
- 创建一个异步发送的线程池:
defaultAsyncSenderExecutor
然后是给Producer设置NameServer的地址,因为NameServer它是一个注册中心,保存着Broker信息和Topic路由信息。
2.生产者(Producer)启动
public void start() throws MQClientException {
//TODO:设置生产者组
this.setProducerGroup(withNamespace(this.producerGroup));
//TODO:启动,是通过这个对象来启动的
this.defaultMQProducerImpl.start();
//TODO:追踪消息轨迹,可以通过 DefaultMQProducer 对象的多个参数的构造器来开启,默认是关闭
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
复制代码
核心就是调用DefaultMQProducerImpl.start()
方法来启动客户端:
内容比较多,我会添加一些注释
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//TODO:检查groupName是否为空,是否超过最大字符限制等
this.checkConfig();
//TODO:将instanceName属性更改为PID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//TODO:创建客户端对象
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//TODO:注册生产者信息到本地
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//TODO:设置topic路由表信息,不过这里的topic是“TBW102”
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
//TODO:启动生产者客户端
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//TODO:发送心跳到broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//TODO:......
}
复制代码
2.1 检查配置
- 检查 producer的
groupName
是否为空 - 检查 producer的
groupName
长度是否超过255字符 - 检查 producer的
groupName
是否包含特殊字符 - 检查 producer的
groupName
是否等于默认的DEFAULT_PRODUCER
,如果相等也会抛出异常
2.2 创建客户端实例
public class MQClientInstance {
//TODO:生产者表,producer启动时创建一个新的MQClientInstance实例对象,将生产者信息注册到这里。生产者实例对象中消费者信息是空
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
//TODO:消费者表,consumer启动时创建一个新的MQClientInstance实例对象,将消费者信息注册到这里。消费者实例对象中生产者信息是空
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
//TODO:topic路由信息,producer和consumer都会使用
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
//TODO:broker信息,producer和consumer都会用到
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
//TODO......
/**
* TODO:构造器
*/
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
//TODO:客户端处理器,比如在集群消费模式下,有新的消费者加入,则通知消费者客户端重平衡,主要是给消费者用的,这里可以忽略
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
//TODO:它的内部会创建netty客户端对象(NettyRemotingClient),用于和broker通信
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//TODO.......
//TODO:拉取消息的服务,和消费者相关,我们这里启动的是生产者实例,和消费者无关,忽略
this.pullMessageService = new PullMessageService(this);
//TODO:重平衡服务,和消费者相关,我们这里启动的是生产者实例,和消费者无关,忽略
this.rebalanceService = new RebalanceService(this);
//TODO:other......
}
}
复制代码
客户端实例是
MQClientInstance
类, 生产者和消费者都作为客户端,所以它保存了producer和consumer相关的所有信息,后面我们在分析消费者启动时,还会看到根据这个类创建消费者实例。但是我们这里是生产者,所以消费者相关的内容可以忽略。
2.3 将生产者注册到本地
就是将生产者信息注册到
MQClientInstance
对象中的producerTable
表中
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
复制代码
2.4 设置topic路由表信息
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
复制代码
这里先有个印象就行,它放置的topic=“TBW102”,等后面分析producer发送消息时还会看到。
2.5 启动客户端实例
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//TODO:我们一般都为生产者指定nameserver地址,所以这里为false
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
//TODO:启动netty客户端
this.mQClientAPIImpl.start();
//TODO:启动定时任务
this.startScheduledTask();
//TODO:启动拉取消息的服务,这个是和消费者相关的,我们这里是producer,忽略
this.pullMessageService.start();
//TODO:启动重平衡服务,它和消费者相关,忽略
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
复制代码
前面我们说了,
MQClientInstance
是客户端实例,生产者和消费者都会通过它创建对象,我们这里创建的是生产者对象,所以关于消费者的内容,我们直接忽略即可。后面分析消费者时,还会看到这些逻辑。
2.5.1 启动Netty客户端
this.mQClientAPIImpl.start();
复制代码
前面在创建客户端实例MQClientInstance
对象时,其内部会创建MQClientAPIImpl
对象,而他的内部会创建netty客户端对象NettyRemotingClient
。
public void start() {
//TODO:NettyRemotingClient
this.remotingClient.start();
}
复制代码
我们看下NettyRemotingClient
的启动逻辑:
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
//TODO:......
}
复制代码
有没有感觉很熟悉?没错,启动Broker时也会启动Netty客户端(因为Broker对于NameServer来说就是客户端),同样的类,同样的配方;所以,这里就不在赘述了。可以看下Broker启动-启动Netty客户端文章
2.5.2 启动各种定时任务
- 定时拉取NameServer地址
- 定时更新topic路由信息,从nameServer获取broker信息,topic信息等写到本地Map中
- 定时发送心跳
- 等等很多定时任务,这里就不展开描述了。
2.6 发送心跳到Broker
前面我们在分析Broker启动时知道,Broker会定时向NameServer发起心跳,那么我们现在来看下,Producer是如何向Broker发送心跳的?
说明一下,消费者发送心跳也是通过这个方法,如果是生产者,则准备生产者的心跳数据,如果是消费者,则准备消费者的心跳数据。
private void sendHeartbeatToAllBroker() {
//TODO:准备心跳数据
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
//TODO:.......
if (!this.brokerAddrTable.isEmpty()) {
//TODO:brokerAddTable中保存着broker的数据,通过定时任务从nameserver拉取下来的
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
//TODO:.........
//TODO:发送心跳到broker,addr就是broker地址
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
}
}
复制代码
2.6.1 准备心跳数据包
我们先看下心跳数据的结构:
public class HeartbeatData extends RemotingSerializable {
private String clientID;
//TODO:保存生产者心跳数据
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
//TODO:保存消费者心跳数据
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
}
复制代码
准备生产者心跳数据:
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
// clientID = clientIP@PID#System.nanoTime()
heartbeatData.setClientID(this.clientId);
//TODO:消费者心跳数据,因为我们这里创建的是生产者实例,所以消费者数据为空,跳过
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
//TODO:保存生产者心跳数据
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
//TODO:设置消费者组
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
复制代码
2.6.2 获取broker地址
在步骤2.5.2中,会启动很多定时任务,其中:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
复制代码
它会拉取broker信息,topic路由信息等到本地Map中。其中向broker发送心跳,则会使用broker的地址。
2.6.3 发送心跳到broker
public int sendHearbeat(
final String addr,
final HeartbeatData heartbeatData,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
//TODO:构建远程命令,code=RequestCode.HEART_BEAT
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
//TODO:设置心跳数据
request.setBody(heartbeatData.encode());
//TODO:通过netty客户端向broker发送请求,addr:broker地址
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return response.getVersion();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
复制代码
- 构建远程
RemotingCommand
命令,code=RequestCode.HEART_BEAT
- 设置心跳数据
- 发送到broker
这里我们又回到了Broker了,如果对Broker还熟悉的,可以看下Broker启动文章。
我们知道,在Broker启动时,会注册很多处理器来处理客户端的请求,根据code来区分不同的业务场景。
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
//TODO:remotingServer用来处理远程请求
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
复制代码
所以我们看下处理器的逻辑:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
//TODO:接收客户端心跳指令,保存客户端信息
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
复制代码
我们在继续看心跳逻辑:
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
//TODO:解析出心跳数据
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
//TODO:构建一个客户端Channel信息
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
//TODO:消费者组下的每个消费者clientId都不同,用来区分同一个消费者组下的不同消费者
//TODO:生产者组下的每个生产者clientId都不同,用来区分同一个生产者组下的不同生产者
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
//TODO:消费者心跳,为empty跳过,因为我们这里是生产者心跳,
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//TODO:获取订阅组配置并持久化
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
//TODO:注册消费者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
//TODO:...生产者心跳逻辑!
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
复制代码
注册生产者信息:
public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo clientChannelInfoFound = null;
//TODO:根据producer 的 groupName从 groupChannelTable 获取数据
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
//TODO:第一次心跳,则将生产者组放入进来
this.groupChannelTable.put(group, channelTable);
}
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
//TODO:放置生产者组下的每一个生产者,用clientId作为key
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
}
复制代码
将生产者信息放入
ProducerManager
对象的groupChannelTable
属性中,则本次心跳结束。
至此,Producer启动过程就算是结束了。
3.总结
本文从源码角度讲述了生产者是如何启动的,简单总结下:
- 创建生产者客户端实例
MQClientInstance
,其内部会创建netty客户端NettyRemotingClient
对象 - 启动客户端实例(
MQClientInstance#start()
),其内部会启动很多定时任务,同时也会启动netty客户端,等待和broker通信 - 通过上面的netty客户端向Broker发送心跳,Broker保存Producer的信息
限于作者个人水平,文中难免有错误之处,欢迎指正! 勿喷,感谢