1.1 consumer参数详解
BOOTSTRAP_SERVERS_CONFIG kafka ip+port
REQUEST_TIMEOUT_MS_CONFIG 请求超时时间
ENABLE_AUTO_COMMIT_CONFIG 是否自动提交
AUTO_COMMIT_INTERVAL_MS_CONFIG 自动提交间隔时间
SESSION_TIMEOUT_MS_CONFIG session失效时间
KEY_DESERIALIZER_CLASS_CONFIG key序列化工具类
VALUE_DESERIALIZER_CLASS_CONFIG value序列化工具类
GROUP_ID_CONFIG 消费组名称(id)
MAX_POLL_RECORDS_CONFIG 每次拉去最大数量(要配合 factory.setBatchListener(true);使用)
AUTO_OFFSET_RESET_CONFIG 默认offset位置(当不存在offset时从哪里开始读取)
MAX_POLL_INTERVAL_MS_CONFIG 最大poll数据时间间隔
1.2、自动提交 ENABLE_AUTO_COMMIT_CONFIG 设置为true
AUTO_COMMIT_INTERVAL_MS_CONFIG 定义时间间隔,多久进行提交一次offset
1.3、自动提交 ENABLE_AUTO_COMMIT_CONFIG 设置为false
由于使用spring-kafka,当手动设定时候会使用spring的提交策略来提交offset
通过自定义策略
factory.getContainerProperties().setAckMode() 来定义使用的方式
AbstractMessageListenerContainer.AckMode 中定义了7中策略给我们使用
RECORD 每次拉去处理完成后进行提交
BATCH 每次拉去完成之后都进行提交
TIME 指定时间间隔提交
COUNT 拉去到一定数量进行提交
COUNT_TIME 指定时间或达到数量后进行提交
下面两种比较特殊(需要用户自己使用ack进行提交)
MANUAL(用户处理后手动提交)
MANUAL_IMMEDIATE(用户拉取后立即手动提交)