之前参考别的文章实现了spring整合kafka接收单条数据处理,由于高吞吐量,高并发的需要,需要批量取kafka数据,并批量存储。在网上搜索的资料,发现资料比较少,本人在原来单条数据处理的基础上用BatchAcknowledgingMessageListener接口实现了批量接收kafka数据的功能。
前言:环境配置说明,由于spring托管的方式不支持低版本的kafka数据的批量接收,所以在升级kafka为1.1.0时再做批量接收处理。
兼容性参考下图:
1.环境配置:
名称 | 版本号 |
|
|
|
|
|
|
|
|
|
1.7 |
2.修改的配置文件
2.1) consumerProperties不再自动提交接收:
<entry key="enable.auto.commit" value="false"/>
2.2)监听类改为实现BatchAcknowledgingMessageListener接口的监听类
<bean id="messageListernerConsumerService" class="com.consumer.KafkaBatchAcknowledgingMessage"/>
2.3)在实现类中重载方法onMessage()方法。
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import java.util.List;
public class KafkaBatchAcknowledgingMessage implements BatchAcknowledgingMessageListener {
@Override
public void onMessage(List list, Acknowledgment acknowledgment) {
System.out.println("批量接收的消息:"+list);
acknowledgment.acknowledge();
}
@Override
public void onMessage(Object o) {
}
}
3.重启项目测试批量接收数据。
在Kafka堆积数据的情况下,每次批量接收数据在500条左右,实现数据的批量接收,减少了线程数,提高了效率。
欢迎大家沟通交流。