因为是用sql写的,写KafkaSink的时候,是准备用KafkaTableSink有两个问题,第一个KafkaTableSink只能接受appendstream但是table流是包含了删除的(流回撤),所以使用不了。使用的还是必须经过Filter将Table流中的false流剔除。第二个问题,进入KafkaTableSink的源码:
public KafkaTableSink( String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { this.topic = Preconditions.checkNotNull(topic, "topic"); this.properties = Preconditions.checkNotNull(properties, "properties"); this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); }
可以看到使用KafkaTableSink 的时候需要传入三个参数,第一个Topic的名称,第二个properties也就是kafka producer的参数,第三个参数进入FlinkKafkaPartitioner的源码:
public abstract class FlinkKafkaPartitioner<T> implements Serializable { private static final long serialVersionUID = -9086719227828020494L; public void open(int parallelInstanceId, int parallelInstances) { // overwrite this method if needed. } public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions); }
重点看partition这个方法,这个方法的含义是定义分区的依据,所以传入的第三个参数也就是如何定义消息分区。所以并没有选用kafkaSink。而是将Table流转换为了普通的流。使用FlinkKafkaProducer011()这个Sink。
进入FlinkKafkaProducer011的构造方法:因为我需要在key中添加自定义的内容,所以需要自定义一个KeyedSerializationSchema。
public FlinkKafkaProducer011( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);}
默认的KeyedSerializationSchema会调用KeyedSerializationSchemaWrapper这个类:
package org.apache.flink.streaming.util.serialization; import org.apache.flink.api.common.serialization.SerializationSchema; public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> { private static final long serialVersionUID = 1351665280744549933L; private final SerializationSchema<T> serializationSchema; public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) { this.serializationSchema = serializationSchema; } @Override public byte[] serializeKey(T element) { return null; } @Override public byte[] serializeValue(T element) { return serializationSchema.serialize(element); } @Override public String getTargetTopic(T element) { return null; // we are never overriding the topic } }
从源码可以看出,KeyedSerializationSchemaWrapper这个类对于key是直接返回的null,也就是如果我们自定义KeyedSerializationSchema的话,发送的kafka消息是没有key 的。所以直接实现KeyedSerializationSchema接口就可以自定义key了。element是流中的数据对象。
第二点自定义分区依据:
FlinkKafkaProducer011默认的分区类是FlinkFixedPartitioner():
package org.apache.flink.streaming.connectors.kafka.partitioner; import org.apache.flink.util.Preconditions; /** * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition. * * <p>Note, one Kafka partition can contain multiple Flink partitions. * * <p>Cases: * # More Flink partitions than kafka partitions * <pre> * Flink Sinks: Kafka Partitions * 1 ----------------> 1 * 2 --------------/ * 3 -------------/ * 4 ------------/ * </pre> * Some (or all) kafka partitions contain the output of more than one flink partition * * <p>Fewer Flink partitions than Kafka * <pre> * Flink Sinks: Kafka Partitions * 1 ----------------> 1 * 2 ----------------> 2 * 3 * 4 * 5 * </pre> * * <p>Not all Kafka partitions contain data * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will * cause a lot of network connections between all the Flink instances and all the Kafka brokers). */ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> { private int parallelInstanceId; @Override public void open(int parallelInstanceId, int parallelInstances) { Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative."); Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0."); this.parallelInstanceId = parallelInstanceId; } @Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { Preconditions.checkArgument( partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); return partitions[parallelInstanceId % partitions.length]; } }
从类上的注释就可以看出,默认的分区依据是根据Flink的平行度和kafka的分区数量来定制的,但是有时候我们会要求同一类型的消息发送到同一个分区来保证消息的有序性。所以这个时候需要继承FlinkKafkaPartitioner()这个类。
在使用kafkaSink的时候只需要自定义KeyedSerializationSchema和FlinkKafkaPartitioner就可以达到我们大部分的目标了。
吐槽一句Flink的KafkaSink封装的真是强,接口都不带有的。
努力吧,皮卡丘。