何为路由中心?
在通常的理解中,提到消息队列,最简单的模型如下图:
很好理解,生产者只要知道消息队列的网络地址,然后发送消息到这个网络地址上。但是实际往往不是这么简单,因为消息队列一般是集群模式,再如下图,4个实例组成集群:
假如现在继续采用生产者/消费者直接记录4个实例网络地址的方式,也是可以实现的。随着业务的发展,需要对队列进行水平扩展,要将4台实例临时扩展到6台,那么生产者和消费者是不知道新增的两个实例网络地址的,这时只能重启,记录两个新实例的网络地址再重启。这是不是非常麻烦?针对这个问题可以思考下:
- 对于集群模式的服务,有没有更好更灵活的方式去记录路由信息?
- 当集群实例数量增加或者减少时,怎么让其他服务可以知道?并且不用做任何操作。
这时如果我们新增一个路由中心系统,负责两件事:
- 所有服务可以去路由中心上报自己的网络地址。
- 所有服务可以去路由中心获取其他服务的网络地址。
这样我们系统有多少的业务系统,我们只要知道路由中心的地址就可以了。
那么路由中心怎么感知实例数量的变化呢?最朴素的手段就是心跳,路由中心定期向所有上报的系统发送心跳包,如果心跳包没有响应则说明实例已经停止运行了,在路由记录中剔除该服务地址就可以了。
其实这两个步骤都有专门名称:
- 服务注册:将某个服务调用信息记录到一个公共组件中
- 服务发现:当服务新增/减少,要能被其他调用者自动发现。
市面上已有的代表性产品有:zookeeper、consul、etcd。
RocketMQ中的Namesrv
图中展示了Namesrv的3个功能,也是核心的3个功能:
- 注册broker
- 获取路由信息
- 定时心跳检测
接下来看看rocketmq的源码是如何实现这3个功能的
路由元信息
namesrv作为路由信息的管理者,本质上可以这么对namesrv进行理解:保存路由信息的数据结构 + 操作数据结构的算法,路由元信息便是第一部分,所以先看看namesrv保存路由信息的数据结构是什么样子?
在namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
类下,包含了所有的路由元信息
topicQueueTable
/**
* 保存Topic和队列的信息,也叫真正的路由信息。
* 一个Topic的queue可能分布在不同的broker中,也有可能分布在同一个broker中。
* key:topic . val:QueueData
*/
private final HashMap<String, List<QueueData>> topicQueueTable;
复制代码
brokerAddrTable
/**
* Broker基础信息,broker名称和broker信息的对应信息
* key:broker name . val:broker data
*/
private final HashMap<String, BrokerData> brokerAddrTable;
复制代码
clusterAddrTable
/**
* Broker集群信息,集群中所有Broker名称
* key: cluster name . val:set<broker name>
*/
private final HashMap<String, Set<String>> clusterAddrTable;
复制代码
brokerLiveTable
/**
* Broker状态信息,NameServer每次收心跳包都会替换该信息
* key: broker addr . val:broker live info
*/
private final HashMap<String, BrokerLiveInfo> brokerLiveTable;
复制代码
filterServerTable
/**
* 用于类模式消息过滤
* key:broker addr . val:filter server
*/
private final HashMap<String, List<String>> filterServerTable;
复制代码
知道了路由元信息的数据结构,再看看路由信息是如何保存到这些map中的以及如何查询的。
服务注册
注册broker和获取路由信息是namesrv提供的API,位于namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
类下的processRequest()
,根据请求的code判断请求的类型。下面包含了namesrv所有的对外接口API。
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
// 添加配置
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
// 获取配置
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
// 删除配置
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
// 注册broker
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
// 3.0.11之后
return this.registerBrokerWithFilterServer(ctx, request);
} else {
// 3.0.11之前
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
// 注销broker
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
// 根据topic获取路由信息
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
// 获取broker集群信息
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
// 删除Broker的写权限
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
// 获取全部Topic信息
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
// 删除Topic信息
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
// 获取kv列表
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
// 获取topic根据集群
return this.getTopicsByCluster(ctx, request);
..
default:
break;
}
复制代码
其中服务注册对应的是code是REGISTER_BROKER
,下面看到根据版本的不同分为registerBrokerWithFilterServer()
和registerBroker()
,我使用的是v4.9.1,所以看看registerBrokerWithFilterServer()
的实现
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
...省略...
// 注册的方法
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(...);
...省略...
}
复制代码
核心的注册逻辑在registerBroker()
中 ,继续看下registerBroker()
public RegisterBrokerResult registerBroker(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// step1
// 根据集群名称获取 broker name 集合
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
// 添加broker name 到集合中
brokerNames.add(brokerName);
// 是否第一次注册
boolean registerFirst = false;
// step2
// 如果 broker table 里没有数据,则表示broker为第一次注册
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// step3
// 如果 topic 信息不为空,而且是 master
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
// 如果是第一次注册,或者 broker topic 信息发生变动过
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
// 获取 topic info 列表
ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// step4
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo(System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// step5
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 如果broker的身份是slave
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
// 在返回结果中,携带master的地址,主从同步步骤需要。
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
复制代码
代码注释中使用step1 ~ step5对5个不同阶段进行标记,这5个阶段的作用可以这么理解
路由剔除
namesrv要删除一个broker的路由信息的只有一个:这个broker目前不能正常提供服务了。这也可以分为两种情况
- broker正常停止。
- broker发生异常。
Broker 正常停止
broker在停止的时候会给namesrv发送一个unregisterBroker
类型的消息,发送代码部分在broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
类的unregisterBrokerAll()
,namesrv处理这个消息的入口上文服务注册部分提到过,代码的逻辑很直白,就是从topicQueueTable
、brokerAddrTable
、brokerLiveTable
、filterServerTable
删除与该Broker相关的信息。
Broker 异常
broker异常的时候,肯定是无法主动向namesrv发送注销消息的,所以需要namesrv主动去发现哪些异常的broker。按照这个思路,可以想到两种实现手段:
方案一: namesrv启动一个定时任务,然后向所有broker发送心跳请求,无法响应的broker可以怀疑是发生了异常了。
方案二: 记录broker向namesrv最后发送心跳消息的时间,namesrv启动定时任务,判断每个最后心跳时间与当前的时间是否超时,超时说明broker异常导致无法发送心跳包
RocketMQ的设计思路和方案二类似。
启动定时任务
namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController#initailize()
public boolean initialize() {
//...略
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
//... 略
}
复制代码
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager#scanNotActiveBroker()
public void scanNotActiveBroker() {
// 从存活列表里获取broker数据
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 判断消息发送间隔
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
// 移除该broker信息
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
复制代码