基于rocketmq-4.9.0 版本分析rocketmq
1.Broker是什么?
参考官方文档
Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块.
- Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
2.Broker 启动入口
public class BrokerStartup {
//TODO:...省略部分代码.....
public static void main(String[] args) {
//TODO:创建BrokerController对象
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
//TODO:启动
controller.start();
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
复制代码
知道了入口,那接下来我们就分析下它是如何启动的,启动的过程中都做了什么?从上面的部分代码中,不难看出,它首先就是创建NamesrvController
对象,然后调用start()
方法启动,那我们就一步一步走进去看看细节.
前面我们分析NameServer启动时,它是先创建一个
NamesrvController
对象,然后调用start()
方法启动。不过相比NameServer启动,Broker的启动将会复杂一些
3.创建 BrokerController
对象过程
化繁为简,只留一些比较重要的代码
public static BrokerController createBrokerController(String[] args) {
//TODO:....省略部分代码.......
try {
//TODO:.......
//TODO:创建配置对象
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
//TODO:设置broker和producer,consumer通信的端口,固定是10911
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
//TODO:.....
//TODO:读取broker.conf配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//TODO:将配置文件内容利用反射设置到对应的配置类中
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
//TODO:检查是否配置了ROCKETMQ_HOM
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//TODO:......
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
//TODO:master机器的id设置为0
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
//TODO:slave机器的id > 0
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
//TODO:.......
//TODO:设置master和slave通信的端口,是10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
//TODO:创建BrokerController对象
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
//TODO:初始化,如果初始化失败,则退出,broker启动失败
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//TODO:.........
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
复制代码
3.1 创建配置对象
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
复制代码
创建Broker配置对象,Netty 客户端和服务端配置对象,以及存储对象的配置
1.BrokerConfig: 存储了Broker的配置,里面有非常多的内容,简单看下
public class BrokerConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
//TODO:ROCKETMQ_HOME 路径
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//TODO:namesrvAddr的地址(ip:port)
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
//TODO:brokerip
private String brokerIP1 = RemotingUtil.getLocalAddress();
private String brokerIP2 = RemotingUtil.getLocalAddress();
//TODO:brokerName,默认取localhostname,我配置的是broker-a
private String brokerName = localHostName();
//TODO:集群的默认名称
private String brokerClusterName = "DefaultCluster";
//TODO:brokerId, master=0, slave > 0
private long brokerId = MixAll.MASTER_ID;
private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
//TODO:默认的queue数量
private int defaultTopicQueueNums = 8;
//TODO:自动创建topic是否开启,默认是开启。当生产者发送消息指定了一个broker没有的topic时,会自动创建并存储该topic
private boolean autoCreateTopicEnable = true;
//TODO:.....还有很多
}
复制代码
2.NettyServerConfig 和 NettyClientConfig:保存Netty相关的配置。并将broker
和producer,consumer
通信的端口设置固定的10911
3.MessageStoreConfig:设置消息存储的一些配置,简单看下
public class MessageStoreConfig {
//TODO:消息存储的根路径,默认是 $user.home/store/
//TODO:比如,我在broker.conf配置的是:/Usrs/qiuguan/config/ROCKETMQ_HOME/store/
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//TODO:存储commitlog(真正的消息)的路径,默认是 $user.home/store/commitlog
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
//TODO:每一个存储消息的文件默认是1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
//TODO:每一个consumequeue默认存储30个索引单元
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
//TODO:默认凌晨4点删除过期文件
private String deleteWhen = "04";
//TODO:文件默认保留3天,超过3天就是过期,然后就可以删除了
private int fileReservedTime = 72;
//TODO:每个消息的最大限制为4M
private int maxMessageSize = 1024 * 1024 * 4;
//TODO........还有很多配置
}
复制代码
3.2 读取配置文件
读取broker启动时指定的broker.conf配置文件,将配置信息映射到上面的配置类中
还有一些其他配置
- 比如设置master和slave通信的端口为10912(就是broker和客户端通信端口 + 1)
- 以及设置brokerId(如果是master,则brokerid=0 ; slave的brokerid > 0)
3.3 创建 BrokerController
对象
将上面创建的4个配置类对象通过构造参数传递到
BrokerController
类中。
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
//TODO:通过构造器传递进来的配置信息对象
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
//TODO:创建consuemr offset 管理对象
this.consumerOffsetManager = new ConsumerOffsetManager(this);
//TODO:创建topic配置管理对象
this.topicConfigManager = new TopicConfigManager(this);
//TODO:创建拉取消息的处理器
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
//TODO:创建消息达到监听器
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
//TODO:创建客户端id有变动的监听器(主要是重平衡时工作)
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
//TODO:consuemr管理器对象
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
//TODO:consumer filter 管理器对象(消息过滤时会用)
this.consumerFilterManager = new ConsumerFilterManager(this);
//TODO: producer 管理器对象
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
//TODO:broker向客户端发送消息时会用,比如向客户端发起重平衡
this.broker2Client = new Broker2Client(this);
//TODO:订阅信息组管理器对象
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
//TODO:broker向NameServer发消息时会用
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);
//TODO:.....
//TODO:还有各种队列
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
//TODO........
}
复制代码
不难发现,创建
BrokerController
对象时,其内部也会构建很多的对象,我们简单说几个, 其他的有个印象就行,后面看到再说。
BrokerOuterAPI
: broker向NameServer发消息时会用。其内部会创建NettyRemotingClient
对象,向NameServer发起请求底层用的就是它。后面我们还会看到一个叫NettyRemotingServer
对象,它是broker接收客户端请求的。Broker2Client
: broker向客户端发起请求时会工作。比如向消费者发起重平衡。PullMessageProcessor
: 拉取消息的处理器。当消费者消费时,从broker拉取消息会使用它。
4.初始化 BrokerController
对象过程
初始化过程做了非常多的事情,我们还是只保留我们关注的部分
public boolean initialize() throws CloneNotSupportedException {
//TODO:加载配置,就是将文件中的topic配置信息加载到Map中
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
//TODO:.....省略部分代码......
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
result = result && this.messageStore.load();
if (result) {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//TODO:创建各种线程池,我只保留2个
//TODO:发送消息的线程池(生产者发送消息到broker后,会通过这个线程池来处理消息)
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//TODO:拉取消息的线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//TODO:注册处理器
this.registerProcessor();
//TODO:很多定时任务,我只保留一个
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//TODO:持久化consumer offset 到文件中
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//TODO:....省略诸多代码........
}
return result;
}
复制代码
4.1 加载配置
public boolean initialize() throws CloneNotSupportedException {
//TODO:加载配置,就是将文件中的topic配置信息加载到Map中
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
//TODO:........
}
复制代码
我们就以加载topic配置信息为例进行简单分析
TopicConfigManager
对象根据根据文件路径读取出文件的内容,解析成JSON字符串
默认存储路径:$user.home/store/config/topics.json
- 将
JSON
字符串转换成Java对象TopicConfigSerializeWrapper
JSON内容:
- 将JSON内容保存到
TopicConfigManager
对象的topic配置表中
public class TopicConfigManager extends ConfigManager {
//TODO:
private final ConcurrentMap<String/*topic*/, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(1024);
}
复制代码
同理:
ConsumerOffsetManager
将读取$user.home/store/config/consumerOffset.json
文件的内容映射到其内部的offset表中
public class ConsumerOffsetManager extends ConfigManager {
//TODO: key = topic@group
//TODO: value = Map[key = queueId, value = offset]
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
}
复制代码
SubscriptionGroupManager
将读取$user.home/store/config/ subscriptionGroup.json
文件的内容映射到其内部的订阅组表中
public class SubscriptionGroupManager extends ConfigManager {
//TODO:key=consumerGroupName,value=订阅组信息
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
}
复制代码
ConsumerFilterManager
将读取$user.home/store/config/consumerFilter.json
文件的内容映射到其内部的Map中。消费者订阅消息时如果是根据SQL92过滤的,则将topic,sql等信息保存到这里。
4.2 创建消息存储对象
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
复制代码
创建默认的消息存储对象DefaultMessageStore
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
//TODO:通过构造器传递一些配置对象
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
//TODO:创建分配MappedFile的服务对象
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
//TODO:创建commitlog对象
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);
//TODO:consumequeue刷盘服务
this.flushConsumeQueueService = new FlushConsumeQueueService();
this.cleanCommitLogService = new CleanCommitLogService();
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
//TODO:创建IndexFile服务,根据消息key构建索引
this.indexService = new IndexService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//TODO:创建高可用服务,用来做数据同步
this.haService = new HAService(this);
} else {
this.haService = null;
}
//TODO:构建消息分发服务,就是用来构建消息的consumequeue索引和IndexFile索引
this.reputMessageService = new ReputMessageService();
//TODO:构建延迟消息服务
this.scheduleMessageService = new ScheduleMessageService(this);
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
//TODO:启动异步任务,"预热"MappedFile
this.allocateMappedFileService.start();
//TODO:启动IndexFile异步任务
this.indexService.start();
//TODO:设置消息分发的组件,分别是构建consumequeue索引和IndexFile索引的
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
//TODO........
}
复制代码
不难发现,构建
DefaultMessageStore
对象时,其内部也创建了非常多的对象。
AllocateMappedFileService
: 它是用来做MappedFile文件预热的IndexService
: 如果消息设置了key,则它会根据key来构建索引ReputMessageService
: 消息分发服务,当消息写入成功后,则它会去创建consumequeue索引和IndexFile索引ScheduleMessageService
: 延迟消息的服务对象CommitLog
,可以简单理解为读取message的。在创建它的时候,它的内部还会创建一个叫的MappedFileQueue
对象, 而MappedFileQueue
对象内部维护了很多的MappedFile
,而MappedFile
可以理解为实际存储消息文件的映射。
同时CommitLog内部还会创建刷盘服务;如果是异步刷盘,则创建
FlushRealTimeService
对象;如果是同步刷盘,则创建GroupCommitService
对象。
为了便于理解Commitlog,MappedFileQueue, MappedFile
的对应关系,以及逻辑与物理的映射关系,我从网上找到一张图,可以清晰直观的表现出他们之间的关系。
不难发现,MappedFile就是文件的映射,但不仅仅映射消息文件,还映射索引文件。
4.3 消息存储对象load()
它的核心内容主要是三个对象的load
// load Commit Log
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
//load indexFile
this.indexService.load(lastExitOK);
复制代码
4.3.1 加载 commitlog 信息
它做的大致就是读取 $user.home/store/commitlog/
目录下的所有消息文件MappedFile, 将其根据文件名升序排序后,记录每个MappedFile文件的写入位置,然后将其放入CommitLog
对象中的MappedFileQueue
对象的mappedFiles
集合中
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
//TODO:这里暂时先记录文件已经写满,后看还会调用其他方法来确定实际的写入位置
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
复制代码
4.3.2 加载 consumequeue 信息
它做的大致就是读取 $user.home/store/consumequeue/
目录下的所有目录(每个目录名都是一个topic),然后遍历所有的目录(topic),然后再继续遍历topic下的所有目录(这个目录名是queueid),然后创建ConsumeQueue
对象,然后保存到DefaultMessageStore
对象的consumeQueueTable
表中。
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
//TODO:路径是 $user.home/store/consumequeue
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//TODO:600w, 每个consumequeue存储30个索引单元,每个20byte
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
复制代码
注意:创建
ConsuemeQueue
对象时,其内部也会创建MappedFileQueue
对象,MappedFileQueue
内部保存了很多的MappedFile
对象。这和创建CommitLog
对象时时一样的过程。
存储到consumeQueueTable
表中
public class DefaultMessageStore implements MessageStore {
//TODO: 一个queueid 对应一个 ConsumeqQueue, ConsumeQueue内部有很多文件MappedFile
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
}
复制代码
然后调用ConsumeQueue
的load()
方法,执行和CommitLog#load()
一模一样的逻辑。
4.3 创建Netty服务端对象和线程池
4.3.1 创建远程Netty服务端对象
//TODO:创建netty服务端对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
复制代码
这个对象有没有似曾相识的感觉?没错,我们在看NameServer启动过程中时也会创建这个对象。
我们继续看下NettyRemotingServer
的构造器
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
//TODO:创建ServerBootstrap对象,它是netty的服务端启动类
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
//TODO:创建一个线程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
//TODO:这里有一个usePoll的判断,为了减少篇幅,我移除了;无论是否为true,主要都是创建下面两个对象
{
//TODO:负责监听TCP网络连接请求
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
//TODO: 在eventLoopGroupBoss接收到连接请求时,负责将建立好连接的socket注册到selector中去
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
//TODO:....省略代码......
}
复制代码
总结一下创建NettyRemotingServer
对象时都做了什么
- 创建netty服务端启动类
ServerBootstrap
,这个后面还会看到,用来启动netty服务 - 创建一个线程池
publicExecutor
,他的作用就是一个默认线程池。在后面broker注册处理器时,如果没有指定线程池,那么就会使用这个默认的线程池。
在NameServer启动时,我们说过,它并不会使用这个线程池,因为只有在注册处理器并且没有指定线程池时才会使用
publicExecutor
,而NameServer使用的是注册默认处理器方法,注册一个DefaultRequestProcessor
对象,不会使用这个默认的线程池。
eventLoopGroupBoss
和eventLoopGroupSelector
线程组:它俩分别是netty用来处理连接事件与读写事件的线程了,eventLoopGroupBoss
对应的是netty的boss
线程组,eventLoopGroupSelector
对应的是worker
线程组
4.3.2 创建线程池
//TODO:创建发送消息的线程池,当producer发送消息到broker,会经过这个线程池来处理
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//TODO:创建拉取消息的线程池,当consumer拉取消息时,会用到这个线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//TODO:.....很多其他线程池.........
复制代码
4.3.3 注册处理器
public void registerProcessor() {
//TODO:发送消息处理器,当producer发送消息到broker时会使用这个处理器处理消息
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
//TODO:注册处理器
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
//TODO:当消费者消费失败,会将消息重新发回到broker,使用的就是这个处理器,只是code不一样而已
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
//TODO:拉取消息的处理器,在 new BrokerController对象时,内部会创建这个处理器对象
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
//TODO:.....注册其他处理器.......
}
复制代码
我们看下注册方法:
public interface RemotingServer extends RemotingService {
//TODO:Broker注册处理器基本上使用的都是这个方法
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
//TODO:NameServer启动注册处理器使用的是这个方法
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
}
复制代码
registerProcessor(...)
: Broker注册处理器基本上使用的都是这个方法registerDefaultProcessor(..)
: NameServer启动注册处理器使用的是这个方法
我们在看下注册是干什么?
注册就是将处理器写到
NettyRemotingServer
对象的processorTable
表中。这样就可以不同的业务code,获取不同的处理器。
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
//TODO:如果没有指定线程池,则使用这个默认的线程池,不过基本上不会使用默认的线程池,因为4.3.2步骤中,都会创建对应的线程池
executorThis = this.publicExecutor;
}
//TODO:创建Pair对象
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<>(processor, executorThis);
//TODO:写到处理器表中,key=code, value=Pair对象
this.processorTable.put(requestCode, pair);
}
复制代码
我们在看下Pair
的内部结构,因为Broker接收请求时还会看到它
public class Pair<T1, T2> {
private T1 object1;
private T2 object2;
public Pair(T1 object1, T2 object2) {
this.object1 = object1;
this.object2 = object2;
}
}
复制代码
object1=处理器,object2=线程池
4.3.4 启动定时任务
这里会启动很多定时任务,我就举其中持久化 comsumer offset的为例:
//TODO: 每隔5s持久化一次consumer的offset
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码
每隔5s持久化一次consumer的offset到默认的存储路径:
$user.home/store/config/consumerOffset.json
文件中
5.启动 BrokerController
对象过程
public void start() throws Exception {
//TODO:启动DefaultMessageStore
if (this.messageStore != null) {
this.messageStore.start();
}
//TODO:启动 NettyRemotingServer
if (this.remotingServer != null) {
this.remotingServer.start();
}
//TODO:.......
//TODO:启动 NettyRemotingClient
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
//TODO:注册Broker到NameServer
this.registerBrokerAll(true, false, true);
}
//TODO:每隔30s定时发送一次心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
//TODO:........
}
复制代码
5.1 启动消息存储对象
DefaultMessageStroe#start()
if (this.messageStore != null) {
this.messageStore.start();
}
复制代码
我们看它的内部实现:
public void start() throws Exception {
//TODO:....省略部分代码.......
//TODO:启动消息分发服务
this.reputMessageService.start();
//TODO:省略部分代码........
//TODO:恢复commitlog中的consumequeue offset
//TODO:HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable
this.recoverTopicQueueTable();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//TODO:启动高可用服务
this.haService.start();
//TODO:如果是Master机器,启动延迟消息服务
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
//TODO:启动consumequeue的刷盘服务
this.flushConsumeQueueService.start();
//TODO:启动commitlog刷盘服务
this.commitLog.start();
//TODO:启动采样服务
this.storeStatsService.start();
//TODO:创建 abort文件,关闭broker时删除。(存储路径默认:$user.home/store/abort
this.createTempFile();
//TODO:添加定时任务,比如到期删除过期文件,比如检查磁盘使用请情况
this.addScheduleTask();
this.shutdown = false;
}
复制代码
简单总结一下都做了什么?
- 启动
消息分发
服务,等待构建consumequeue和indexFile索引 - 恢复
CommitLog
对象内部维护的consumequeue offset
表的数据内容 - 如果是Master机器,则启动
延迟消息服务
- 启动
consumequeue
的刷盘服务 - 启动
CommitLog
的刷盘服务 - 启动各种
定时任务
,比如定期删除过期文件,比如检查磁盘使用情况
5.2 启动Netty服务端
NettyRemotingServer#start()
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//TODO:创建一些对象
prepareSharableHandlers();
ServerBootstrap childHandler =
//TODO:首先设置 bossGroup和workerGroup
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
//TODO:判断使用 epoll 还是 nio
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
//TODO:一些其他选项
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//TODO:绑定ip + 端口(broker这里绑定的是10911,nameServer绑定是9876)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
//TODO:绑定handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
//TODO:处理心跳的handler
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
//TODO:处理连接的handler
connectionManageHandler,
//TODO:处理读写请求的handler,在上面prepareSharableHandlers()方法中创建的对象
serverHandler
);
}
});
try {
//TODO: 启动netty服务端
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
//TODO:......省略部分代码........
}
复制代码
前面我们看NameServer启动时,看到它也创建了这个对象,唯一的区别就是配置参数不同。比如,broker绑定的端口是10911, nameserver绑定的端口是9876。
不难发现,这个方法就是真正的在启动netty服务端了,简单总结下关注的点
- 设置bossGroup和workerGroup线程组。这是netty中两个执行任务的循环线程组,bossGroup负责处理
accept事件
,workerGroup负责处理read/write事件
. - 决策使用epoll还是nio channel
- 绑定ip和端口
- 添加handler。如果
Channel
是出现了连接/读/写
等事件,这些事件会经过Pipeline
上的ChannelHandler
上进行流转
HandshakeHandler
: 负责处理握手操作NettyEncoder
:负责处理报文编码和解码IdleStateHandler
:负责处理心跳NettyConnectManageHandler
: 负责处理连接- NettyServerHandler:负责处理读写请求。这个
ChannelHandler
就是用来处理producer和consumer请求的。
- 服务端启动
5.3 启动Netty客户端
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
复制代码
其内部就是NettyRemotingClient
对象的启动。
严格来说,这里并没有启动客户端,只是先构建好客户端需要的必须参数。等具体调用时在与服务端建立连接。(比如,在向NameServer注册时,等调用具体的注册方法时在于服务端(nameserver)建立连接)。
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
//TODO:设置works线程组,设置NIO Channel
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
//TODO:添加处理器
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
//TODO:.....省略部分代码.......
}
复制代码
我们关注一下处理器即可:
NettyEncoder/NettyDecoder
: 报文编码解码处理器IdleStateHandler
: 心跳处理器NettyConnectManageHandler
: 负责连接的处理器NettyClientHandler
: 负责处理读写请求的处理器。
Broker为什么还要启动客户端呢?
因为Broker对于NameServer来说,它向NameServer发起注册请求(心跳,路由信息查询等)都是作为一个客户端的角色; 而Broker对于Producer/Consumer来说,它又是一个服务端的角色,接收生产者和消费者的请求。
5.4 将Broker注册到NameServer
Broker向NameServer发起注册,就是通过
BrokerOuterAPI
对象发起调用,通过其内部的NettyRemotingClient
对象发起注册请求。
不过关于Broker注册的具体逻辑,我这里就不展开说了,因为前面NameServer处理请求的文章中,我们已经讲的很明白了,所以如果不清楚的,可以直接看文章好了。
至此,Broker的启动就算了成功了。
6.总结
本文从源码的角度讲述了Broker的启动流程。简单总结下
- 读取broker配置文件,将配置文件内容映射到配置类中,然后创建
BrokerController
对象。 - 然后调用
BrokerController
对象的初始化
方法,加载持久化到配置文件中的配置信息到Map
中 - 创建netty服务端对象
NettyRemotingServer
- 启动各种定时任务,比如持久化comsumer offset的任务等等
- 然后启动消息分发,commitlog刷盘,comsumequeue刷盘等诸多服务
- 启动netty服务端,至此Broker就启动成功,可以接收和发送请求了。
限于作者个人水平,文中难免有错误之处,欢迎指正! 勿喷,感谢