参考
RocketMQ的autoCreateTopicEnable配置
RocketMQ 解决 No route info of this topic 异常步骤
RocketMQ-Console安装及RocketMQ命令行管理工具介绍
标题
版权声明: https://blog.csdn.net/ph3636/article/details/79528638
1.使用RocketMQ进行发消息时,必须要指定topic,对于topic的设置有一个开关autoCreateTopicEnable,一般在开发测试环境中会使用默认设置autoCreateTopicEnable = true,但是这样就会导致topic的设置不容易规范管理,没有统一的审核等等,所以在正式环境中会在Broker启动时设置参数autoCreateTopicEnable = false。这样当需要增加topic时就需要在web管理界面上添加即可。
2.No route info of this topic异常是如何产生的?
当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
省略好多代码。。。
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
3.先从topicPublishInfoTable缓存中获取
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
}
否则从NameServer中获取,注意这个isDefault=false,defaultMQProducer=null
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
获取消息对应的topic信息,发请求 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);但是因为没有任何一个Broker有关于这个topic的信息,所以namesrv就会返回topic不存在,处理请求的代码在DefaultRequestProcessor的
case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request);也就是回应码ResponseCode.TOPIC_NOT_EXIST,然后抛出异常 throw new MQClientException(response.getCode(), response.getRemark());被捕获之后退出返回false。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
因为if条件不满足,所以获取默认的topic信息,注意isDefault=true,defaultMQProducer=defaultMQProducer
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
默认的topic为"TBW102",这个时候如果namesrv中如果还是没有这个topic的信息的话,就会抛出异常No route info of this topic。
3.autoCreateTopicEnable=true的作用。
当Broker启动时,TopicConfigManager初始化,这里会判断该标识,创建TBW102topic,并且在后续的心跳中把信息更新到namesrv中,这样在发消息的时候就不会抛出不存在的异常。
// MixAll.DEFAULT_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.DEFAULT_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
当从namesrv查出Topic相关的信息时,在topicRouteData2TopicPublishInfo设置消息队列数量 info.getMessageQueueList().add(mq);,调用updateTopicPublishInfo方法更新缓存topicPublishInfoTable
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
然后if (topicPublishInfo != null && topicPublishInfo.ok()) 这个条件就会符合,那个异常就不会抛出。
4.当autoCreateTopicEnable=false时,界面设置topic
MQAdminStartup启动时会设置很多命令,创建topic的类UpdateTopicSubCommand(),设置相应的信息,最后调用defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);发消息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor处理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);同步给其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chenaima1314/article/details/79403113
rocketmq运行时提示 No route info of this topic 异常产生的原因可能是
①Broker禁止自动创建Topic,且用户没有通过手工方式创建Topic
②Broker没有正确连接到Name Server
③Producer没有正确连接到Name Server
首先解决①这种情况,启动顺序要先启动nameserver,再启动broker,启动broker时加上autoCreateTopicEnable=true
例如 nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &
启动没有异常检查下nameserver中是否成功注册了broker,有两种方式
第一种、看broker的日志 如果出现形如
2018-02-28 16:21:35 INFO BrokerControllerScheduledThread1 - register broker to name server 192.168.192.129:9876 OK
2018-02-28 16:22:05 INFO BrokerControllerScheduledThread1 - register broker to name server 192.168.192.129:9876 OK
证明已经连接到nameserver上
第二种、 在bin目录下执行命令sh mqadmin clusterList -n localhost:9876 如果看到
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
DefaultCluster DEFAULT_BROKER 0 192.168.192.129:10911 V4_2_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 422168.55 -1.0000
也是证明已经连接到nameserver上。
如果按前两步检查没有问题,但启动还是报错,那么剩下的可能原因是producer无法连接到nameserver,很可能是防火墙的原因 ,要检验猜测只需要关闭防火墙,命令为systemctl stop firewalld.service
然后再次验证,应该已经可以使用了。
解决方案
1.1. 控制台使用
RocketMQ提供有控制台及一系列控制台命令,用于管理员对主题,集群,broker等信息的管理;
l 登录控制台:
首先进入RocketMQ工程,进入/RocketMQ/bin
在该目录下有个mqadmin脚本
l 查看帮助:
在mqadmin下可以查看有哪些命令
shmqadmin
l 查看具体命令的使用
sh mqadmin help 命令名称
例如,查看updateTopic的使用
sh mqadmin helpupdateTopic
1.2. 详细命令
1.2.1. 创建Topic
指令
updateTopic
类路径
com.alibaba.rocketmq.tools.command.topic.UpdateTopicSubCommand
参数
是否必填
说明
-b
如果 -c为空,则必填
broker地址,表示topic建在该broker
-c
如果 -b为空,则必填
cluster名称,表示topic建在该集群(集群可通过clusterList查询)
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
-p
否
指定新topic的权限限制( W|R|WR )
-r
否
可读队列数(默认为8)
-w
否
可写队列数(默认为8)
-t
是
topic名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
举例
在集群DefaultCluster上创建主题ZTEExample,nameserve地址为10.45.47.168:9876
sh mqadmin updateTopic –n 10.45.47.168 –c DefaultCluster –t ZTEExample
1.2.2. 删除Topic
指令
deleteTopic
类路径
com.alibaba.rocketmq.tools.command.topic.DeleteTopicSubCommand
参数
是否必填
说明
-c
是
cluster名称,表示删除某集群下的某个topic (集群可通过clusterList查询)
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;…
-t
是
topic名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
举例
在集群DefaultCluster上删除主题ZTEExample,nameserve地址为10.45.47.168:9876
sh mqadmin deleteTopic –n 10.45.47.168:9876 –c DefaultCluster –t ZTEExample
1.2.3. 创建(修订)订阅组
指令
updateSubGroup
类路径
com.alibaba.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand
参数
是否必填
说明
-b
如果 –c为空,则必填
broker地址,表示订阅组建在该broker
-c
如果 –b为空,则必填
cluster名称,表示topic建在该集群(集群可通过clusterList查询)
-d
否
是否容许广播方式消费
-g
是
订阅组名
-i
否
从哪个broker开始消费
-m
否
是否容许从队列的最小位置开始消费,默认会设置为false
-q
否
消费失败的消息放到一个重试队列,每个订阅组配置几个重试队列
-r
否
重试消费最大次数,超过则投递到死信队列,不再投递,并报警
-s
否
消费功能是否开启
-w
否
发现消息堆积后,将Consumer的消费请求重定向到另外一台Slave机器
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.4. 删除订阅组配置
指令
deleteSubGroup
类路径
com.alibaba.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
参数
是否必填
说明
-b
如果 –c为空,则必填
broker地址,表示订阅组建在该broker
-c
如果 –b为空,则必填
cluster名称,表示topic建在该集群(集群可通过clusterList查询)
-g
是
订阅组名
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.5. 更新Broker配置文件
指令
updateBrokerConfig
类路径
com.alibaba.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand
参数
是否必填
说明
-b
如果 –c为空,则必填
broker地址,表示订阅组建在该broker
-c
如果 –b为空,则必填
cluster名称,表示topic建在该集群(集群可通过clusterList查询)
-k
是
key值
-v
否
value值
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.6. 查看Topic列表信息
指令
topicList
类路径
com.alibaba.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand
参数
是否必填
说明
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
nameserve地址为10.45.47.168:9876
sh mqadmin topicList –n 10.45.47.168:9876
打印内容:
BenchmarkTest
%RETRY%simple-consumer-test
SELF_TEST_TOPIC
ZTEExample
注释:上述头三个主题是RocketMQ默认预先创建
1.2.7. 查看Topic路由信息
指令
topicRoute
类路径
com.alibaba.rocketmq.tools.command.topic.TopicRouteSubCommand
参数
是否必填
说明
-t
是
topic名称
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查看主题ZTEExample的路由,nameserve地址为10.45.47.168:9876
sh mqadmin topicRoute –n 10.45.47.168:9876 –t ZTEExample_Crm
打印内容:
{
"brokerDatas":[{
"brokerAddrs":{0:"10.45.47.168:10911"
},
"brokerName":"crmdb"
}],
"queueDatas":[{
"brokerName":"crmdb",
"perm":6,
"readQueueNums":8,
"writeQueueNums":8
}]
}
1.2.8. 查看Topic统计信息
指令
topicStats
类路径
com.alibaba.rocketmq.tools.command.topic.TopicStatsSubCommand
参数
是否必填
说明
-t
是
topic名称
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查看主题ZTEExample的统计信息,nameserve地址为10.45.47.168:9876
sh mqadmin topicStats –n 10.45.47.168:9876 –t ZTEExample
打印内容:(统计信息里包括有offset、最后更新时间)
#Broker Name #QID #Min Offset #Max Offset #Last Updated
crmdb 0 0 1 2014-02-10 11:37:44,977
crmdb 1 0 0
crmdb 2 0 0
crmdb 3 0 0
crmdb 4 0 0
crmdb 5 0 0
crmdb 6 0 0
crmdb 7 0 0
1.2.9. 查看Broker统计信息
指令
brokerStats
类路径
com.alibaba.rocketmq.tools.command.broker.BrokerStatsSubCommand
参数
是否必填
说明
-b
是
broker地址
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查看broker(crmdb)的统计信息,broker地址为10.45.47.168:10911,nameserve地址为10.45.47.168:9876
sh mqadmin brokerStats –n 10.45.47.168:9876 –b 10.45.47.168:10911
打印内容:
bootTimestamp : 1392003367470
brokerVersion : 29
brokerVersionDesc : V3_0_7
commitLogDiskRatio : 0.32690830974763857
commitLogMaxOffset : 217
commitLogMinOffset : 0
consumeQueueDiskRatio : 0.32690830974763857
dispatchMaxBuffer : 1
getFoundTps : 0.0 0.0 0.0
getMessageEntireTimeMax : 4
getMissTps : 0.0 0.0 0.0
getTotalTps : 0.0 0.0 0.0
getTransferedTps : 0.0 0.0 0.0
msgGetTotalTodayMorning : 0
msgGetTotalTodayNow : 1
msgGetTotalYesterdayMorning : 0
msgPutTotalTodayMorning : 0
msgPutTotalTodayNow : 1
msgPutTotalYesterdayMorning : 0
putMessageAverageSize : 217.0
putMessageDistributeTime :
0(0.0%)
1(100.0%)
0(0.0%)
0(0.0%)
0(0.0%)
0(0.0%)
0(0.0%)
putMessageEntireTimeMax : 6
putMessageSizeTotal : 217
putMessageTimesTotal : 1
putTps : 0.0 0.0 0.0
runtime : [ 0 days, 3 hours, 21 minutes, 1 seconds ]
sendThreadPoolQueueCapacity : 100000
sendThreadPoolQueueSize : 0
1.2.10. 根据消息ID查询消息
指令
queryMsgById
类路径
com.alibaba.rocketmq.tools.command.message.QueryMsgByIdSubCommand
参数
是否必填
说明
-i
是
msgId
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询msgId= 0A2D2FA800002A9F0000000000000000的消息,nameserve地址为10.45.47.168:9876
sh mqadmin queryMsgById –n 10.45.47.168:9876 –i 0A2D2FA800002A9F0000000000000000
打印内容:
Topic: ZTEExample
Tags: [SimpleTest]
Keys: [SimpleTest-1]
Queue ID: 0
Queue Offset: 0
CommitLog Offset: 0
Born Timestamp: 2014-02-26 14:49:10,875
Store Timestamp: 2014-02-26 14:48:44,840
Born Host: 10.45.46.229:4231
Store Host: 10.45.47.168:10911
System Flag: 0
Properties: {TAGS=SimpleTest, KEYS=SimpleTest-1, WAIT=true}
Message Body Path: /tmp/rocketmq/msgbodys/0A2D2FA800002A9F0000000000000000
1.2.11. 根据消息Key查询消息
指令
queryMsgByKey
类路径
com.alibaba.rocketmq.tools.command.message.QueryMsgByKeySubCommand
参数
是否必填
说明
-f
否
被查询消息的截止时间
-k
是
msgKey
-t
是
Topic名称
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询Topic= ZTEExample下key= SimpleTest-1的消息,nameserve地址为10.45.47.168:9876
sh mqadmin queryMsgByKey -n 10.45.47.168:9876 -t ZTEExample -k SimpleTest-1
打印内容:
#Message ID #QID #Offset
0A2D2FA800002A9F0000000000000000 0 0
1.2.12. 根据Offset查询消息
指令
queryMsgByOffset
类路径
com.alibaba.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand
参数
是否必填
说明
-b
是
Broker名称,表示订阅组建在该broker(这里需要注意填写的是broker的名称,不是broker的地址,broker名称可以在clusterList查到)
-i
是
query队列id
-o
是
offset值
-t
是
topic名称
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询brokerName=crm-168,Topic= ZTEExample的第1个队列下offset=0的消息,nameserve地址为10.45.47.168:9876
sh mqadmin queryMsgByOffset -n 10.45.47.168:9876 -b crm-168 -i 0 -t ZTEExample -o 0
打印内容:
Topic: ZTEExample
Tags: [SimpleTest]
Keys: [SimpleTest-1]
Queue ID: 0
Queue Offset: 0
CommitLog Offset: 0
Born Timestamp: 2014-02-26 14:49:10,875
Store Timestamp: 2014-02-26 14:48:44,840
Born Host: 10.45.46.229:4231
Store Host: 10.45.47.168:10911
System Flag: 0
Properties: {TAGS=SimpleTest, KEYS=SimpleTest-1, WAIT=true}
Message Body Path: /tmp/rocketmq/msgbodys/0A2D2FA800002A9F0000000000000000
1.2.13. 查询Producer的网络连接
l 该命令只打印当前与cluster连接的producer网络连接信息
指令
producerConnection
类路径
com.alibaba.rocketmq.tools.command.connection.ProducerConnectionSubCommand
参数
是否必填
说明
-g
是
生产者所属组名
-t
是
topic名称
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询当前属于group(生产者组)=simple-producer-test的生产者到topic=ZTEExample的网络连接,nameserve地址为10.45.47.168:9876
sh mqadmin producerConnection -n 10.45.47.168:9876 -g simple-producer-test -t ZTEExample
打印内容:
0001 10.45.46.229@simple-producer-test-99f09de2a20a4b6284bb949b452bee0c 10.45.46.229:4332 JAVA V3_0_7
1.2.14. 查询Consumer的网络连接
l 该命令只打印当前与cluster连接的consumer网络连接信息
指令
consumerConnection
类路径
com.alibaba.rocketmq.tools.command.connection.ConsumerConnectionSubCommand
参数
是否必填
说明
-g
是
消费者所属组名
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询当前属于group(消费者组)=simple-consumer-test的消费者的网络连接,nameserve地址为10.45.47.168:9876
sh mqadmin consumerConnection -n 10.45.47.168:9876 -g simple-consumer-test
打印内容:
001 10.45.46.229@simple-consumer-test-7babbb6021b040d29978494b16d559ae 10.45.46.229:4355 JAVA V3_0_7
Below is subscription:
001 Topic: ZTEExample SubExpression: *
ConsumeType: CONSUME_ACTIVELY
MessageModel: CLUSTERING
ConsumeFromWhere: CONSUME_FROM_LAST_OFFSET
1.2.15. 查看订阅组消费状态
指令
consumerProgress
类路径
com.alibaba.rocketmq.tools.command.consumer.ConsumerProgressSubCommand
参数
是否必填
说明
-g
是
消费者所属组名
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询当前属于group(消费者组)=simple-consumer-test的订阅状态,nameserve地址为10.45.47.168:9876
sh mqadmin consumerProgress -n 10.45.47.168:9876 -g simple-consumer-test
打印内容:
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Diff
ZTEExample crm-168 0 2 2 0
ZTEExample crm-168 1 0 0 0
ZTEExample crm-168 2 0 0 0
ZTEExample crm-168 3 0 0 0
ZTEExample crm-168 4 0 0 0
ZTEExample crm-168 5 0 0 0
ZTEExample crm-168 6 0 0 0
ZTEExample crm-168 7 0 0 0
Consume TPS: 0
Diff Total: 0
1.2.16. 查看集群消息
指令
clusterList
类路径
com.alibaba.rocketmq.tools.command.cluster.ClusterListSubCommand
参数
是否必填
说明
-m
否
打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询当前集群状态,nameserve地址为10.45.47.168:9876
sh mqadmin clusterList -n 10.45.47.168:9876
打印内容:
#Cluster Name #Broker Name #BID #Addr #Version #InTPS #OutTPS
CRM crm-168 0 10.45.47.168:10911 V3_0_7 0.00 0.00
CRM crm-181 0 10.45.47.181:10911 V3_0_7 0.00 0.00
sh mqadmin clusterList -n 10.45.47.168:9876 –m
#Cluster Name #Broker Name #InTotalYest #OutTotalYest #InTotalToday #OutTotalToday
CRM crm-168 0 0 2 3
CRM crm-181 0 0 0 0
1.2.17. 添加(更新)KV配置信息
指令
updateKvConfig
类路径
com.alibaba.rocketmq.tools.command.namesrv.UpdateKvConfigCommand
参数
是否必填
说明
-k
是
key值
-v
是
value值
-s
是
Namespace值
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.18. 删除KV配置信息
指令
deleteKvConfig
类路径
com.alibaba.rocketmq.tools.command.namesrv.DeleteKvConfigCommand
参数
是否必填
说明
-k
是
key值
-s
是
Namespace值
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.19. 添加(更新)Project group配置信息
指令
updateProjectGroup
类路径
com.alibaba.rocketmq.tools.command.namesrv.UpdateProjectGroupCommand
参数
是否必填
说明
-i
是
服务器ip
-p
是
project group名
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.20. 删除Projectgroup配置信息
指令
deleteProjectGroup
类路径
com.alibaba.rocketmq.tools.command.namesrv.DeleteProjectGroupCommand
参数
是否必填
说明
-i
是
服务器ip
-p
是
project group名
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.21. 取得Projectgroup配置信息
指令
getProjectGroup
类路径
com.alibaba.rocketmq.tools.command.namesrv.GetProjectGroupCommand
参数
是否必填
说明
-i
是
服务器ip
-p
是
project group名
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.22. 设置消费进度
l 根据时间来设置消费进度,设置之前要关闭这个订阅组的所有consumer,设置完再启动,方可生效
指令
resetOffsetByTime
类路径
com.alibaba.rocketmq.tools.command.offset.ResetOffsetByTimeSubCommand
参数
是否必填
说明
-f
否
通过时间戳强制回滚(true|false),默认为true
-s
是
时间戳
(currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS)
-g
是
消费者所属组名
-t
是
topic名称
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.23. 清除特定Broker权限
指令
wipeWritePerm
类路径
com.alibaba.rocketmq.tools.command.namesrv.WipeWritePermSubCommand
参数
是否必填
说明
-b
是
broker地址
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
1.2.24. 获取Consumer消费进度
l 该命令只打印当前与cluster连接的consumer的消费进度
指令
getConsumerStatus
类路径
com.alibaba.rocketmq.tools.command.offset.GetConsumerStatusCommand
参数
是否必填
说明
-g
是
消费者所属组名
-t
是
查询主题
-i
否
Consumer客户端ip
-h
否
打印帮助
-n
是
nameserve服务地址列表,格式ip:port;ip:port;...
举例
查询属于group(消费者组)=simple-consumer-test的消费者在Topic=ZTEExample上的消费状态,nameserve地址为10.45.47.168:9876
sh mqadmin getConsumerStatus -n 10.45.47.168:9876 -g simple-consumer-test -t ZTEExample
get consumer status from client. group=simple-consumer-test, topic=ZTEExample, originClientId=
#clientId #brokerName #queueId #offset
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 4 0
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 0 2
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 3 0
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 1 0
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 5 0
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 6 0
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 7 0
10.45.46.229@simple-consumer-test-3f89fb692e874640 crm-168 2 0