前言
继上一篇关于suervisor启动流程的文章,发现一了关于kafkaRecordSupplier的一些疑问。本篇就单独拎出这个类来看一下做的什么事情。
首先KafkaRecordSupplier是干什么用的?它主要是操作kafka数据真正执行的地方,比如拉取kafka数据的poll函数在这个类里面实现的。
KafkaRecordSupplier继承了RecordSupplier, 而RecordSupplier只有kafka和Kinesis两个地方做了具体的实现,RecordSupplier是在正在运行的task中调用使用的。
详解
看一下这个类中有哪些方法和属性。
下面是KafkaRecordSupplier类中主要的属性,都是和连接操作kafka相关的:
KafkaConsumer<byte[], byte[]> consumer |
kafka consumer客户端,在构造函数中创建 |
Map<String, Object> consumerProperties |
kafka consumer客户端的参数配置,在构造函数中创建 |
String dataSource |
datasource |
下面是KafkaRecordSupplier类中的主要方法,都是和操作kafka相关的:
public void assign(Set<StreamPartition<Integer>> streamPartitions) |
告诉kafka从指定的topic-partations进行消费,不受group-id的限制 |
public void seek(StreamPartition<Integer> partition, Long sequenceNumber) |
用于指定partation的offset(druid是自己管理offsetde ) |
public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout) |
kafka客户端获取数据 |
从以上属性和方法可以看出KafkaRecordSupplier是干什么用的了。说白了就是kafka消费者的客户端。
疑问
正常理解的话,一个task任务只对应一个kafkaRecordSupplier对象的时候才能正常摄入数据才对,但是发现kafka-index会有三个地方创建了kafkaRecordSupplier,这就让我很迷惑了。下面就看看这个三个创建kafkaRecordSupplier对象的地方以及用法。
(1) kafka supervisor在创建supervisor线程之前创建了KafkaRecordSupplier对象,通过对该对象的分析,该对象只调用的assign()方法和seek()方法,并没有实际的获取数据的操作。该对象存在的作用是对topic进行初始化,和元数据库中的partations信息做一个对比并更新,比如对supervisor执行reset操作。
/**
* 此方法是在kafkaSupervisor类中的,在创建kafka supervisor线程之前调用的
*/
@Override
protected RecordSupplier<Integer, Long> setupRecordSupplier()
{
return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper, emitter, spec.getDataSchema().getDataSource());
}
(2) kafka index task在创建task任务的时候创建了kafkaRecordSupplier对象,该对象主要是用于拉取kafka中的数据用的,执行poll()函数。
/**
* 在kafaIndexTask类中创建kafka的RecordSupplier
* getRecords()的时候需要传入这个对象
*/
@Override
protected KafkaRecordSupplier newTaskRecordSupplier()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Map<String, Object> props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties());
props.put("auto.offset.reset", "none");
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
return new KafkaRecordSupplier(props, configMapper, emitter, dataSource);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
(3) 创建KafkaSamplerFirehose的时候创建了KafkaRecordSupplier对象,主要是用于通过firehose的方式获取数据的时候用到。不过在文档中没有找到关于type=firehose的时候关于配置kafa的描述。具体怎么使用还没有研究明白。猜测是在tranquility的时候使用的?
protected class KafkaSamplerFirehose extends SeekableStreamSamplerFirehose
{
private KafkaSamplerFirehose(InputRowParser parser)
{
super(parser);
}
/**
*
* @return
*/
@Override
protected RecordSupplier getRecordSupplier()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Map<String, Object> props = new HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties());
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "none");
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs()));
return new KafkaRecordSupplier(props, objectMapper, null, "");
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
}
总之,三个kafkaRecordSupplier对象各自负责自己用途,互补影响
END
以上,单独对KafkaRecordSupplier做了简单的描述,通过疑问其实可以对kafka的数据摄入有了更直观更深入的了解。如有描述不对的地方还请读者指教~