一、基础知识
Spring 通过发布事件的方式,可以通知观察者(即事件监听器)消费者的一些行为,消费者相关的事件如下所示:
-
AsyncConsumerStartedEvent:An event that is published whenever a new consumer is started.
-
AsyncConsumerStoppedEvent:An event that is published whenever a consumer is stopped (and not restarted).
-
AsyncConsumerRestartedEvent:An event that is published whenever a consumer is restarted.
-
ListenerContainerConsumerFailedEvent:Published when a listener consumer fails.
基于事件机制,可以通过监听事件ListenerContainerConsumerFailedEvent,当有消费者发生致命错误时,重新创建消费者消费消息,还可以将异常信息通知负责人。
二、问题现象
正常情况下 RabbitMQ 中队列是有消费者进行监听消费的,但 MQ 重装后有些队列居然无消费者服务来监听队列?????????类似于下图的情况:
在这说明一下啊,监听 queue.test 队列的消费者服务是启动的,没有挂!!!!为什么会是 0 呢????
三、问题产生原因
当消费者服务只是监听 queue.test
时,代码:@RabbitListener(queues = "queue.test")
,生产者服务未启动(queue.test 未创建),这时的消费者服务先启动了就会发生上述问题。出现问题了,身为攻城狮就应该去解决啊,所以我在本地搭了一个 RabbitMQ,专门解决这个问题。怎么搭的,大家可以百度去,一大把的教程,本人是建议去 RabbitMQ 官网去下载 rpm 包安装。传送门。
四、解决办法
产生这个问题的原因是因为撸代码一时爽,一直撸一直爽。想想消费者监听一个队列是多么容易啊,用注解 @RabbitListener
加上要监听的 queue name 就完事了,结果是酿成了上面的惨案,导致又得攻城狮祭天了。
其实啊,在这我们差了一件事,就是消费者其实也是需要写代码进行 exchange 和 queue 进行 binding 的,目的就是防止上述问题的产生,消费者远先于生产者启动,为什么我要加个 “远” 字呢,主要是因为消费者发现没有要监听的 queue 时,默认会进行三次重试监听 queue,三次都失败后就无法重试了(三次时间很短,默认10s一次),所以我在这用的 “远”。绑定代码如下:
/**
* @author ouyang
* @version 1.0
* @date 2019/9/18 17:34
**/
@ConfigurationProperties(value = "test")
@Component
@Configuration
public class TestRabbitMqConfig {
@Bean("queue.task")
public Queue createQueue() {
return new Queue(this.getQueueName(),true);
}
@Bean("exchange.task")
public TopicExchange createMessageExchange() {
return new TopicExchange(getTopicExchangeName());
}
@Bean("binding.task")
public Binding binding(@Qualifier("queue.task") Queue queue,
@Qualifier("exchange.task") TopicExchange messageExchange) {
return BindingBuilder.bind(queue).to(messageExchange).with("#");
}
private String queueName;
private String topicExchangeName;
public String getTopicExchangeName() {
return topicExchangeName;
}
public void setTopicExchangeName(String topicExchangeName) {
this.topicExchangeName = topicExchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
}
假设我们现在的消费者服务将监听的 queue 绑定了 exchange,也监听了 queue,不要想着这就太平了,你仔细想想上面的话,还是可能存在不能监听的情况的,比如,手贱不小心将 queue 删除了,监听过程中出现了异常,这时我们是可以使用 ListenerContainerConsumerFailedEvent
,通过实现该接口可以处理异常信息,当遇到监听异常时,我们可以停止监听,然后重新监听队列,当然,还有很多很多操作,比如,监听异常后邮件、短信等通知相关人员。
简单的示例代码:
/**
* @author ouyang
* @version 1.0
* @date 2020/2/19 20:46
**/
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
private final Logger logger = LoggerFactory.getLogger(ListenerContainerConsumerFailedEventListener.class);
@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
logger.error("listener queue failed:{}", event);
if(event.isFatal()) {
logger.error("reason:{}, 错误:{}", event.getReason(), event.getThrowable());
SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
String queueNames = Arrays.toString(container.getQueueNames());
//重启
try {
// 暂停10s
Thread.sleep(10000);
try {
container.stop();
} catch (Exception e) {
logger.error("stop listener queue {} failed!", queueNames);
}
Assert.state(!container.isRunning(), "listener queue: " + container + " is running!");
container.start();
logger.info("restat listener queue {} successed !", queueNames);
} catch (Exception e) {
logger.error("restat listener queue {} failed!", queueNames, e);
}
}
}
}
做到这里,基本上不会出现问题了。