1. 存在原因:丢失消费/重复消费
1 自动提交offset:
1.1 当自动提交时间为1s时,间隔时间达到1s,offset(100)已经提交,但是数据处理尚未完成(只处理了80)出错了(挂了),此时从新启动后会从已经提交的offset(100)开始消费处理,那么81-100这些数据就未处理,导致丢失消费
1.2 当自动提交时间为3s时,数据1s已经处理完了一批,突然挂了,由于提交时间未到,offset未提交,重新启动时,会重复处理已经处理过的数据,导致重复消费
2 官方手动提交(与上雷同问题)
2.1 同步手动提交
//同步提交,当前线程会阻塞 直到提交成功才会 继续消费后面的数据 效率低下 一般不用
//consumer.commitSync();
2.2 异步手动提交
//异步手动提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if( null != exception){
exception.printStackTrace();
}
}
});
2. 自定义存储offset
实现方案:
1 实现offset重新分配的机制
2 保证数据处理与offset提交能事务性
实现过程:
1 借助ConsumerRebalanceListener类,重写重新分配offset的方法以及提交offset的方法
2 结果mysql,实现事务性
3. 代码实现
package com.dream.bigdata.bi.es.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
public class MyManualCommitOffsetKafkaConsumer {
private static String group = "qdd";
private static String topic = "qdd100";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "113.143.100.155:9092,113.143.100.140:9092,113.143.100.148:9092");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = new KafkaConsumer(properties);
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
int sub_topic_partition_id = partition.partition();
long sub_topic_partition_offset = consumer.position(partition);
String date = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss")
.format(new Date(new Long(System.currentTimeMillis())));
DBUtils.update("replace into offset values(?,?,?,?,?)",
new Offset(
group,
topic,
sub_topic_partition_id,
sub_topic_partition_offset,
date
)
);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
int sub_topic_partition_id = partition.partition();
long offset = DBUtils.queryOffset(
"select sub_topic_partition_offset from offset where consumer_group=? and sub_topic=? and sub_topic_partition_id=?",
group,
topic,
sub_topic_partition_id
);
System.out.println("partition = " + partition + "offset = " + offset);
consumer.seek(partition, offset);
}
}
});
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
consumedata(consumerRecords);
}
}
@Transactional(rollbackFor = Exception.class)
public void consumedata(ConsumerRecords<String, String> consumerRecords){
List<Offset> offsets = new ArrayList<>();
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " ===> " + consumerRecord.value());
String date = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss").format(
new Date(
new Long(
System.currentTimeMillis()
)
)
);
offsets.add(new Offset(group, topic, consumerRecord.partition(), consumerRecord.offset(), date));
System.out.println("|---------------------------------------------------------------\n" +
"|group\ttopic\tpartition\toffset\ttimestamp\n" +
"|" + group + "\t" + topic + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.timestamp() + "\n" +
"|---------------------------------------------------------------"
);
}
for (Offset offset : offsets) {
DBUtils.update("replace into offset values(?,?,?,?,?)", offset);
}
}
}