版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Xw_Classmate/article/details/53264303
众所周知,由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
参数解释:--time -1 表示从最新的时间的offset中得到数据条数
参数解释:
其实在第4步中在配置文件consumer.properties中配置了group.id为wv,所以这儿得到的是wv,如果没有配置,会随机产生一个console-consumer-***的group.ig
参数解释:
在本例中,对应的分区=Math.abs("wv".hashCode()) % 50 = 7,即__consumer_offsets的分区7保存了这个consumer group的位移信息,下面让我们验证一下。
输出结果如下:
对于spark streaming来说,其中DStream中,DStream的本质是RDD序列,读取kafka时,也就是KafkaRDD,通过读取KafkaRDD的getPartitions方法,可以发现,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的。
不过依然有很多用户希望了解__consumer_offsets topic内部到底保存了什么信息,特别是想查询某些consumer group的位移是如何在该topic中保存的。针对这些问题,本文将结合一个实例探讨如何使用kafka-simple-consumer-shell脚本来查询该内部topic。
1. 创建 topic “test”
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 1 --partitions 3 #创建一个有3个partition、1个副本的 test topic
2. 使用kafka-console-producer.sh脚本生产消息
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test
3. 验证生产消息成功
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test --time -1
test:2:2
test:1:2
test:0:3
参数解释:--time -1 表示从最新的时间的offset中得到数据条数
输出结果每个字段分别表示topic、partition、untilOffset
上面的输出结果表明总共生产了7条消息
4. 创建一个console consumer group
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --consumer.config config/consumer.properties --topic test --from-beginning --new-consumer
参数解释:
--from-beginning 如果consumer之前没有建立offset,则从producer最开始的数据读取。
5. 获取该consumer group的group id(后面需要根据该id查询它的位移信息)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer
wv
其实在第4步中在配置文件consumer.properties中配置了group.id为wv,所以这儿得到的是wv,如果没有配置,会随机产生一个console-consumer-***的group.ig
6. 查询__consumer_offsets topic所有内容
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning --delete-consumer-offsets
参数解释:
--delete-consumer-offsets由于之前wv这个group.id
注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false
7. 计算指定consumer group在__consumer_offsets topic中分区信息
这时候就用到了第5步获取的group.id(本例中是console-consumer-46965)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions
在本例中,对应的分区=Math.abs("wv".hashCode()) % 50 = 7,即__consumer_offsets的分区7保存了这个consumer group的位移信息,下面让我们验证一下。
8. 获取指定consumer group的位移信息
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 7 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
输出结果如下:
[wv,test,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1479735556398,ExpirationTime 1479821956398]
[wv,test,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1479735556398,ExpirationTime 1479821956398]
[wv,test,2]::[OffsetMetadata[2,NO_METADATA],CommitTime 1479735556398,ExpirationTime 1479821956398]
上图可见,该consumer group果然保存在分区11上,且位移信息都是对的(这里的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这里的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
对于spark streaming来说,其中DStream中,DStream的本质是RDD序列,读取kafka时,也就是KafkaRDD,通过读取KafkaRDD的getPartitions方法,可以发现,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的。
参考:http://www.cnblogs.com/huxi2b/p/6061110.html