创建一个 topic,分区数为 1
mqadmin.cmd updateTopic -n localhost:9876 -c DefaultCluster -t TopicTest -r 1 -w 1
// 查看 topic 分区信息
mqadmin.cmd topicStatus -n localhost:9876 -t TopicTest
修改分区数为 2
mqadmin.cmd updateTopic -n localhost:9876 -c DefaultCluster -t TopicTest -r 2 -w 2
// 查看 topic 分区信息
mqadmin.cmd topicStatus -n localhost:9876 -t TopicTest
topic 增加了一个分区,client 怎么感知?
1. broker 执行命令,改变 topic 元数据,然后定时上报 topic 信息给 nameserver
2. client 定时从 nameserver 获取 topic 的元数据信息
3. 如果 topic 的分区发生了变化,client 会发现并记录新的分区信息
// MQClientInstance#updateTopicRouteInfoFromNameServer // DefaultMQPushConsumerImpl#updateTopicSubscribeInfo public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { if (subTable.containsKey(topic)) { this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info); } } }
同时 rebalance 一直在进行
// org.apache.rocketmq.client.impl.consumer.RebalanceService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
按照当前 topic 的分区和消费者信息,分配分区,然后比较新分配到的分区和正在处理的分区