版权声明:本文为博主原创文章,未经博主允许不得转载,转载需在醒目位置注明出处。 https://blog.csdn.net/sinat_28771747/article/details/85158432
我看的源码是RocketMQ的3.2.6版本,该版本是RocketMQ被apache组织孵化之前的版本了,但是不影响源码阅读,阅读源码主要是想学一下大牛们的开发思想
namesrv的项目结构:
NamesrvStartup类作为namesrv的启动入口,主要作用是加载配置文件,环境检查,调用NamesrvController类启动server服务
NamesrvController作为namesrv服务控制类,作为namesrv服务的控制核心,其主要方法如下:
public boolean initialize() {
// 加载KV配置
this.kvConfigManager.load();
// 初始化通信层
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 增加定时任务--扫描过期的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//打印所有的config
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
//
// @Override
// public void run() {
// NamesrvController.this.routeInfoManager.printAllPeriodically();
// }
// }, 1, 5, TimeUnit.MINUTES);
return true;
}
/**
* 启动namesrv服务端
* @throws Exception
* @author: yangcheng
*/
public void start() throws Exception {
this.remotingServer.start();
}
RouteInfoManager.java类,原注释是“运行过程中的路由信息,数据只在内存中因此宕机后数据消失;但是Broker会定期推送最新数据”,其主要属性如下(类中其他方法主要作用就是对这些属性的增删查改):
/**
* topic与该topic相关连的所有queue--queue中包含了brokerName值
*/
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
/**
* broker的信息缓存
*/
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
/**
* 集群名称与集群中所有brokername的关系缓存
*/
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
/**
* 激活状态的Broker
*/
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;