0. 背景重现
最近搭建一个新项目,基于SpringBoot框架,使用Kafka做消息中间件。
使用@KafkaListener注解来创建一个消费者,实现对Kafka消息的消费。我计划的执行顺序是这样的:服务启动之后,创建Consumer实例,执行loadResourceConfig初始化方法,之后才开始消费Kafka的消息。
但是出现了一个问题:没有等loadResourceConfig方式执行完毕,@KafkaListener就开始消费消息了。
这显然不是我们期望的,下面是大概的代码:
@Component
public class Consumer{
@PostConstruct
private void loadResourceConfig () {//加载数据
// 加载资源配置
}
/**
* 接收数据处理
* @param record
*/
@KafkaListener(id = "device-data",topics = {"${DataTopic}"})
public void listen(ConsumerRecord<String, ?> record) {
Optional kafkaMessage = Optional.ofNullable(record.value());
Optional<String> kafkaKey = Optional.ofNullable(record.key());
if (kafkaKey.isPresent()) {
Object value = kafkaMessage.get();
String gatewayId = kafkaKey.get();
//使用 加载的资源信息对数据进行处理
}
}
}
1.原因分析
@KafkaListener这个注解所标注的方法并没有在IOC容器中注册为Bean,而是会被注册在KafkaListenerEndpointRegistry中,KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean,具体可以看一下该类的源码,当然不是使用注解方式注册。
KafkaListenerEndpointRegistry注册完Kafka中的topic之后,就会自动启动监听容器,如此KafkaListener注解的方法就开始消费消息了。这个过程可能在自定义Bean创建完成之前执行。
知道了问题,以及原因,解决方法就比较简单了,我们只需要完成2点:
1.禁止KafkaListener自启动(AutoStartup)
2.手动启动单个Kafka的topic的监听
2.解决方法
@Component
public class Consumer{
@Autowired
KafkaManager kafkaManager;
@PostConstruct
private void loadResourceConfig () {//加载数据
// 加载资源配置
kafkaManager.startListener();//开启topic的监听
}
}
@Component
public class KafkaManager {
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止自动启动
container.setAutoStartup(false);
return container;
}
/**
* 开启kafka监听
*/
public void startListener() {
if (!registry.getListenerContainer("device-data").isRunning()) {
registry.getListenerContainer("device-data").start();
}
registry.getListenerContainer("device-data").resume();
}
上面的代码做了几件事:
1.使用ConsumerFactory 构建Kafka监听容器工厂ConcurrentKafkaListenerContainerFactory
2.Kafka监听容器工厂注册为Bean
3.禁止Kafka监听容器自动启动
4.在loadResourceConfig方法加载完成资源之后,调用startListener方法,手动启动Kafka容器监听。注意registry.getListenerContainer(“device-data”)的参数,就是 @KafkaListener注解中的id参数。
5.startListener中我们先判断容器是否运行(isRunning),如果没有则调用start方法启动。 resume方法是恢复运行。这样写的目的是,即便startListener多次执行,也没有问题。