版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/u010597819/article/details/90105522
问题分析
RemotingConnectException
问题描述
重启服务时抛出下面的远程连接异常,且是偶发情况,客户端封装的consumer关闭是放在shutdown钩子方法中执行的
2019-05-09 14:47:00,071 WARN [Thread-16] RocketmqRemoting:createChannel:464 {} createChannel: connect remote host[brokerHost:10611] failed, DefaultChannelPromise@3721dbe4(failure: java.nio.channels.ClosedChannelException)
java.nio.channels.ClosedChannelException: null
2019-05-09 14:47:00,072 ERROR [Thread-16] RocketmqClient:: {} unregister client exception from broker: brokerHost:10611
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <brokerHost:10611> failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:357) ~[rocketmq-remoting-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.unregisterClient(MQClientAPIImpl.java:864) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.factory.MQClientInstance.unregisterClient(MQClientInstance.java:843) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.factory.MQClientInstance.unregisterClientWithLock(MQClientInstance.java:817) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.factory.MQClientInstance.unregisterConsumer(MQClientInstance.java:810) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.shutdown(DefaultMQPushConsumerImpl.java:525) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.shutdown(DefaultMQPushConsumer.java:464) ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at com......rocketmq.client.AbstractRocketMQConsumer.destroy(AbstractRocketMQConsumer.java:249) ~[universe-rocketmq-client-1.1.0-SNAPSHOT.jar!/:?]
at com......rocketmq.client.AbstractRocketMQConsumer$1.run(AbstractRocketMQConsumer.java:177) ~[universe-rocketmq-client-1.1.0-SNAPSHOT.jar!/:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
- 跟着堆栈跟踪到代码里销毁过程是调用的DefaultMQPushConsumerImpl的shutdown方法
- 问题出在了mQClientFactory的unregisterConsumer方法内
- mQclientFactory工厂调用unregisterClientWithLock方法关闭连接
- 注销客户端调用unregisterClient方法
if (addr != null) {
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException | InterruptedException | MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
}
}
- 注销方法调用mQClientAPIImpl客户端API注销客户端unregisterClient,我们所捕获到的异常是connection连接异常,此处的异常抛出没有该类型,继续跟进
- remotingClient客户端同步发送注销客户端请求,如果响应失败抛出MQBrokerException异常,继续查看同步调用逻辑
- 查看NettyRemotingClient同步调用逻辑发现在Channel通道为null或者处于非active状态时会抛出RemotingConnectException连接异常
- 那么继续看下获取channel的逻辑,发现,如果缓存的channel不存在的话,会直接创建一个新的channel,那么缓存中是否存在呢?可以看到文章开头日志中第一行写明了是在创建channel,说明了此处获取的channel为空(如果客户端已经shutdown的话会清空channelTables缓存),也就有可能此时的客户端已经被调用过了shutdown,下面第三个异常分析也证实该推断
@Override
public void shutdown() {
...
//客户端shutdown时会清空缓存
this.channelTables.clear();
...
}
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
...
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
// 连接异常,打印异常原因及堆栈
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}
//返回null,后面判断如果为空或者非active状态则抛出连接异常堆栈
return null;
}
我们可以看到连接失败,异常原因是ClosedChannelException,发生异常打印日志后返回null,也就是后面判断channel如果为null或者非active状态便抛出连接异常的原因
为什么会连接失败呢?
看异常的堆栈已经将case输出:ClosedChannelException:null,没错与前面的分析一致,已经有其他地方调用了shutdown方法并关闭了channel通道
同类问题导致的异常堆栈
RemotingSendRequestException
问题与上个异常原因几乎一致,只是发生的位置与时间点发生了一些小变化,上个异常发生在注册时失败,当前异常是发生在发送请求时产生的失败,失败原因也是一致的ClosedChannelException。此处不再赘述
org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <172.24.1.32:10611> failed
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:294)
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:338)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.unregisterClient(MQClientAPIImpl.java:864)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.unregisterClient(MQClientInstance.java:843)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.unregisterClientWithLock(MQClientInstance.java:817)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.unregisterConsumer(MQClientInstance.java:810)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.shutdown(DefaultMQPushConsumerImpl.java:525)
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.shutdown(DefaultMQPushConsumer.java:464)
at com......rocketmq.client.AbstractRocketMQConsumer.destroy(AbstractRocketMQConsumer.java:249)
at com......rocketmq.client.AbstractRocketMQConsumer$1.run(AbstractRocketMQConsumer.java:177)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
MQClientException
问题描述
同样是重启是产生的异常堆栈,但是与上面有些不同的是此次的异常堆栈是有spring bean的销毁方法调用过程中产生的,问题已经很明显指明状态不对导致的,之前的分析也是该抽象类产生,但是发生在shutdown钩子方法内,这样问题就较为明显了,钩子方法与销毁方法中都对同一个consumer执行了shutdown,导致该异常的出现
org.apache.rocketmq.client.exception.MQClientException: The consumer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.makeSureStateOK(DefaultMQPushConsumerImpl.java:426)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.persistConsumerOffset(DefaultMQPushConsumerImpl.java:932)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.shutdown(DefaultMQPushConsumerImpl.java:524)
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.shutdown(DefaultMQPushConsumer.java:464)
at com......rocketmq.client.AbstractRocketMQConsumer.destroy(AbstractRocketMQConsumer.java:249)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.DisposableBeanAdapter.invokeCustomDestroyMethod(DisposableBeanAdapter.java:364)
at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:287)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:578)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:554)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:961)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:523)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:968)
at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1033)
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1009)
at org.springframework.context.support.AbstractApplicationContext$2.run(AbstractApplicationContext.java:928)
org.apache.rocketmq.client.exception.MQClientException: The consumer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.makeSureStateOK(DefaultMQPushConsumerImpl.java:426)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.persistConsumerOffset(DefaultMQPushConsumerImpl.java:932)
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.shutdown(DefaultMQPushConsumerImpl.java:524)
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.shutdown(DefaultMQPushConsumer.java:464)
at com......rocketmq.client.AbstractRocketMQConsumer.destroy(AbstractRocketMQConsumer.java:249)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.DisposableBeanAdapter.invokeCustomDestroyMethod(DisposableBeanAdapter.java:364)
at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:287)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:578)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:554)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:961)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:523)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:968)
at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1033)
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1009)
at org.springframework.context.support.AbstractApplicationContext$2.run(AbstractApplicationContext.java:928)
为什么是偶现而不是必现呢?
原因就是由于shutdown中对状态判断是线程不安全的
- 如果并发调用时读取状态是SHUTDOWN_ALREADY,则正常返回
- 如果并发调用时读取状态不是SHUTDOWN_ALREADY而是RUNNING状态,则就会再次shutdown也就出现了我们所遇到的种种重启时产生的异常,所以只有在并发场景下才能出现该类异常
public void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown();
this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
问题总结
问题原因已经证实,是由于客户端封装中在shutdown钩子方法中调用销毁consumer,同时又在使用客户端时通过spring调用客户端监听bean中destroy方法中再次调用shutdown销毁consumer导致重启时会偶发出现以上列举的异常
如何解决呢?
- 最直接的方法便是不要重复并发调用同一个consumer的shutdown方法
- 当然如果对DefaultMQPushConsumerImpl的shutdown方法能加一个锁保证线程安全,个人认为这种方式对于使用者更加友好些。当然如果存在频繁shutdown的场景会影响性能,不过这是一个权衡取舍的问题