kafka版本声明
- 使用的是
kafka 0.10.0.1
版本
修改offset(偏移量)
-
在使用
consumer
消费的时候,每个topic会产生一个offset(偏移量)
,offset(偏移量)
是在kafka
服务端__consumer__offsets
这个Topic
下存储的,我们修改这个offset(偏移量)
到我们想重新消费的位置即可以重新消费,standalone consumer(指consumer.assign()而非consumer.subscribe()的消费者)
每次都从最开始的地方消费所有数据,因为是独立的consumer
而非consumerGruop
,所以集群环境下,启动多个,会导致每个consumer
实例都会从最开始的地方(重新消费的offset
)消费所有数据,每个consumer
实例消费的数据都一样 -
修改步骤
-
第一步:创建一个名称为
testTopic
的Topic
# bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replicatoin-factor 1 --partitions 5 --topic testTopic
-
第二步:生产数据
static Properties props = new Properties(); @BeforeClass public static void testBefore() { props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } @Test public void testSync() { KafkaProducer<Integer, String> producer = new KafkaProducer<>(props); String key = "test1"; String topic = "testTopic"; //同一个key的消息放到同一个分区,不指定key则均衡分布,消息分区的选择是在客户端进行的 for (int i = 0; i < 100; i++) { try { String messageStr = "hello world " + i; ProducerRecord producerRecord = new ProducerRecord(topic, key, messageStr); Future<RecordMetadata> future = producer.send(producerRecord); List<PartitionInfo> partitionInfos = producer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { System.out.println(partitionInfo); } //同步调用 RecordMetadata recordMetadata = future.get(); System.out.println(ToStringBuilder.reflectionToString(recordMetadata)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
-
第三步:消费数据
static Properties props = new Properties(); @BeforeClass public static void testBefore() { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); } @Test public void testConsumerSync() { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); String topic = "testTopic"; consumer.subscribe(Collections.singletonList(topic)); while (true) { //如果缓冲区中没有数据会阻塞 ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); } }
-
第四步:查看
offset
,从下面可以看出指定的key = "test1"
的消息被分配到testTopic-2
分区下,并且当前的消费offset
是100
bash-4.3# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic testTopic --time -1 testTopic:2:100 testTopic:4:0 testTopic:1:0 testTopic:3:0 testTopic:0:0
-
第五步(第一种方式):修改偏移量,新版本的
kafka(0.9版本及以上)
不在保存偏移量到zookeeper
中,而是保存在Kafka
的一个内部Topic
中__consumer_offsets
,该Topic
默认有50个Partition
,每个Partition
有3个副本,分区数量由参数offset.topic.num.partition
设置。通过groupId
的哈希值和该参数取模的方式来确定某个消费者组已消费的offset
保存到__consumer_offsets
主题的哪个分区中。static Properties props = new Properties(); @BeforeClass public static void testBefore() { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); } @Test public void testConsumerModifyOffset() { props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); //不要使用同一个组名 props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup2"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); String topic = "testTopic"; //指定从offset==0开始消费 int offset = 0; int partition = 2; TopicPartition topicPartition = new TopicPartition(topic, partition); /** * 1.standalone consumer:指consumer.assign()而非consumer.subscribe()的消费者 * consumer group:指consumer.subscribe的消费者 * consumer.assign()和consumer.subscribe()不能同时使用 * 2. 当用户系统中同时出现了standalone consumer和consumer group,并且它们的group id相同时, * 此时standalone consumer手动提交位移时就会立刻抛出CommitFailedException。所以不要让 * standalone consumer和consumer.subscribe()的groupId一样,这里指定的是testgroup2 */ consumer.assign(Arrays.asList(topicPartition)); consumer.seek(new TopicPartition(topic, partition), offset); // consumer.subscribe(Collections.singletonList(topic)); while (true) { //如果缓冲区中没有数据会阻塞 ConsumerRecords<Integer, String> records = consumer.poll(1); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); } }
-
第五步(第二种方式):
/** * consumer.subscribe()指定偏移量 */ @Test public void testConsumerModifyOffsetForSubscribe() { //不要使用同一个组名 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group_ee"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); String topic = "testTopic"; consumer.subscribe(Collections.singletonList(topic)); consumer.poll(0); //从所有分区的所有偏移量开始消费 int offset = 10; for (TopicPartition partition : consumer.assignment()) { consumer.seek(partition, offset); } /* //从特定分区的特定偏移量开始消费 int partition = 0; TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.seek(topicPartition, offset); */ while (true) { //如果缓冲区中没有数据会阻塞 ConsumerRecords<Integer, String> records = consumer.poll(1); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } try { consumer.commitSync(); } catch (CommitFailedException e) { //只要没有发生不可恢复的错误,commitSync方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里 e.printStackTrace(); } } }
-
通过使用不同的groupId
-
使用不同的
groupId
,必须指定auto.offset.reset=earliest
。如果新的groupid
消费的offset
已经保存在__consumer__offsets
这个Topic
下,则并不会从开始消费所有数据,而是从最新的offset
开始消费.扫描二维码关注公众号,回复: 3717719 查看本文章@Test public void testConsumerSync() { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //新的groupId props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup3"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); String topic = "testTopic"; consumer.subscribe(Collections.singletonList(topic)); while (true) { //如果缓冲区中没有数据会阻塞 ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); } }