尽管Kafka一般意义上都是建议顺序的消费数据,但难免会遇到回滚、重新处理等需求。甚至有些应用希望把kafka当做一个缓存来用,比如保留1天内的近时的数据记录,并支持各个消费者通过拖拽进度条的方式来查看。这个时候,当然就需要进行seek操作。
在librdkafka的新C语言API中,建议使用函数 rd_kafka_subscribe 在一个线程中处理多个 topic 的请求,并把以前的逐topic的seek函数 rd_kafka_seek 标记为“deprecated”,提示使用新的rd_kafka_seek_partitions来处理。 librdkafka 这种批量消费主题的策略,可以在重平衡时,用正则式匹配新创建的专题。但这种高级的功能,是对底层请求API进行了更深的封装,使得我们“摸到”真正的 topic:partition绑定的时机更难把握,搞不好就没有效果。
本文介绍最常见的三个seek的时机和方法,希望对初学者有所帮助。感谢我学弟的帮助,对Kafka的各类seek行为进行了深入的测试。
1. 消费前指定偏移
在消费前,创建专题分区描述表时,可以指定偏移。在官方的例子里,使用的是 rd_kafka_subscribe函数来登记topics,这样,是无需指定具体消费什么分区的。因此,用 RD_KAFKA_PARTITION_UA 作为分区的标记,参考下面代码:
rd_kafka_t *rk; /* Consumer instance handle */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
//...
rd_kafka_poll_set_consumer(rk);
/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
{
rd_kafka_topic_partition_list_add(subscription, topics[i],
/* the partition is ignored by subscribe() */
RD_KAFKA_PARTITION_UA);
}
rd_kafka_subscribe(rk, subscription);
本方法,需要提前知道自己想消费的分区的ID,并显式指定,并用分配函数 rd_kafka_assign 取代 rd_kafka_subscribe,使得server能够立刻把确切的topic-partition绑定到这个消费者. 同时,还有必要通过rd_kafka_query_watermark_offsets提前获取offset的合理范围。代码如下:
rd_kafka_t *rk; /* Consumer instance handle */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
//...
rd_kafka_poll_set_consumer(rk);
/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
{
int nPartID = my_get_part_id(topics[i]);
/*second place can seek a partion. query water mark and seek*/
int64_t low, high;
rd_kafka_query_watermark_offsets(rk,topics[i], 0, &low, &high, 5000);
rd_kafka_topic_partition_list_add(subscription, topics[i],
nPartID)->offset = (low+high)/2;
}
rd_kafka_assign(rk, subscription);
这种指定方式的特点:
- 可以立刻从offset消费
- 即使上次已经消费到末尾,也能马上回到特定的offset
- 在当前rk的生命周期内只能设置一次。设置多次可以用更宏观的生命周期包裹rk.
- 必须指明分区号。一般通过特定的策略人为指定。
2. 在 rebalance回调中分配
在重新平衡时,可以有机会触摸到当前的专题分区表。
void myrebalance(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque)
{
//first place can seek a partion, when using rd_kafka_subscribe
for (int i=0;i<partitions->cnt;++i)
{
if(!is_my_topic(partitions->elems[i].topic))
continue;
int64_t low, high;
rd_kafka_query_watermark_offsets(
rk,
partitions->elems[i].topic,
partitions->elems[i].partition, &low, &high, 5000);
partitions->elems[i].offset = (low + high)/2;
}
auto e = rd_kafka_assign(rk,partitions);
puts(rd_kafka_err2str(e));
}
int consumer(...)
{
//...
rd_kafka_t *rk; /* Consumer instance handle */
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
//设置回调!
rd_kafka_conf_set_rebalance_cb(conf,myrebalance);
rd_kafka_poll_set_consumer(rk);
/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
{
rd_kafka_topic_partition_list_add(subscription, topics[i],
/* the partition is ignored by subscribe() */
RD_KAFKA_PARTITION_UA);
}
rd_kafka_subscribe(rk, subscription);
}
这种指定方式的特点:
- 无法立刻从offset消费。只有等到服务方回调才能设置
- 即使上次已经消费到末尾,也能回到特定的offset,但要等待时机
- 在rk的生命周期内可以发挥多次作用。
- 无需指明分区号,分区号会携带给调用者。
3. 在消费时指定
第三种方法,就是在消费的过程中指定。因为即使 rd_kafka_subscribe 没有指定分区,在消费时,也会在元数据中携带确切的分区。
while (run) {
rd_kafka_message_t *rkm;
rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm)
continue; /* Timeout: */
if (needSeek)
{
rd_kafka_seek(rkm->rkt,rkm->partition,getNewOffset(),100);
}
//...
}
这种指定方式的特点:
- 在消费活跃时,可以立即设置offset,并在下一包回到offset
- 如果已经消费到末尾,则无法设置(运行不到这里)。
- 可以发挥多次作用(除非消费到末尾)。
- 无需指明分区号,分区号会携带给调用者。
4. 总结
可见,kafka seek的行为,是由它的原理决定的。
- 只有消费者已经assign到确切的partition,才能修改offset
- assign partition的行为可以是手动的(第一种),也可以是自动的(2、3)
- 当自动assign时,必须选取一个时机,获取当前的分区号,才能设置offset
- 最佳的方法是联合使用1、2、3这三种策略。