实现注解式注入kafkaTemplate 生产者和消费者,简化配置文件
目录
消费者工厂
/** * 消费者工厂 */ @EnableKafka @Configuration public class KafkaConsumerFactory { @Autowired private ApplicationContext context; /** * 获取消费者工厂 */ public ConsumerFactory<String, String> consumerFactory(String kafkaBroker) { // 消费者配置信息 Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } /** * 容器配置 * * @param groupId 组名 * @param clazz 消费者监听器 * @param topicName topicName * @return 容器配置 */ public ContainerProperties containerProperties(String groupId, Class clazz, String topicName) { ContainerProperties containerProperties = new ContainerProperties(topicName); containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD); containerProperties.setGroupId(groupId); containerProperties.setMessageListener(context.getBean(clazz)); return containerProperties; } /** * 获取消费容器实例 * * @param kafkaBroker kafka server * @param groupId 组名 * @param clazz 消费者监听器 * @param topicName topicName * @param threadCount 消费线程数 * @return 消息监听容器 */ public ThreadMessageListenerContainer<String, String> kafkaListenerContainer( String kafkaBroker, String groupId, Class clazz, String topicName, int threadCount) { ThreadMessageListenerContainer<String, String> container = new ThreadMessageListenerContainer<>( consumerFactory(kafkaBroker), containerProperties(groupId, clazz, topicName)); container.setConcurrency(threadCount); container.getContainerProperties().setPollTimeout(3000); return container; } }
生产者工厂
/** * 生产者工厂 */ @EnableKafka @Configuration public class KafkaProducerFactory { @Autowired private ApplicationContext context; /** * 获取生产者工厂 */ public ProducerFactory<String, String> producerFactory(String kafkaBroker) { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } /** * 注册生产者实例 */ public KafkaTemplate<String, String> kafkaTemplate(String kafkaBroker, String topicName, Class clazz) { KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory(kafkaBroker), Boolean.FALSE); template.setDefaultTopic(topicName); template.setProducerListener((ProducerListener<String, String>) context.getBean(clazz)); return template; } }
初始化监听器、实例
/** * kafka 初始化 */ @Component public class KafkaInit { /** * kafka server */ @Value("${kafka.servers}") private String kafkaBroker; /** * 组名 */ @Value("${kafka.group}") private String groupId; /** * topicName */ @Value("${kafka.topic}") private String topicName; /** * 消费者工厂 */ @Autowired private KafkaConsumerFactory kafkaConsumerFactory; /** * 生产者工厂 */ @Autowired private KafkaProducerFactory kafkaProducerFactory; /** * 在服务器加载Servlet的时候运行,并且只会被服务器调用一次 */ @PostConstruct public void consumer() { kafkaConsumerFactory.kafkaListenerContainer(kafkaBroker, groupId, TestConsumerListener.class, topicName, 6) .startContainer(); // 加载消费者listener } /** * 获取生产者实例 */ @Bean("testSender") public KafkaTemplate<String, String> testSender() { return kafkaProducerFactory.kafkaTemplate(kafkaBroker, topicName, DefaultProducerListener.class); } }
用于手动控制容器加载
/** * 继承消息监听容器 */ public class ThreadMessageListenerContainer<K, V> extends ConcurrentMessageListenerContainer<K, V> { public ThreadMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) { super(consumerFactory, containerProperties); } public void startContainer() { super.doStart(); } }
生产者监听器
/** * 默认生产者监听器 */ @Component public class DefaultProducerListener extends ProducerListenerAdapter { /** * 发送消息成功后调用 */ @Override public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) { super.onSuccess(topic, partition, key, value, recordMetadata); System.out.println("消息发送成功!"); } /** * 发送消息错误后调用 */ @Override public void onError(String topic, Integer partition, Object key, Object value, Exception exception) { super.onError(topic, partition, key, value, exception); System.out.println("消息发送失败!"); } /** * 是否开启发送监听 * * @return true开启,false关闭 */ @Override public boolean isInterestedInSuccess() { return true; } }
消费者监听器
/** * 消费者监听器 */ @Component public class TestConsumerListener implements MessageListener<String, String> { /** * 消费消息 * * @param record 消息 */ @Override public void onMessage(ConsumerRecord<String, String> record) { System.out.println(record); } }
消息发送测试
/** * 消息发送 */ @Component public class TestProducerSender { /** * 转账队列发送 */ @Autowired @Qualifier("testSender") private KafkaTemplate kafkaTemplate; /** * 消息发送测试 */ public void sendMessage() { kafkaTemplate.sendDefault("message test"); } }