版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/yidan7063/article/details/85755687
这个是接着第一篇写的:主要是实现 直接上代码
1.实现KeyedSerializationSchema接口
public class DefinedSerializationTest implements KeyedSerializationSchema<Tuple2<String, String>> {
private static final long serialVersionUID = -6095174255432490089L;
@Override
public byte[] serializeKey(Tuple2<String, String> element) {
//序列化key
return element.f0.getBytes();
}
@Override
public byte[] serializeValue(Tuple2<String, String> element) {
//序列化value
return element.f1.getBytes();
}
@Override
public String getTargetTopic(Tuple2<String, String> element) {
//在发送的时候需要指定topic 这里可以直接返回null
return null;
}
}
这个类主要是定义如何序列化key和value
2.继承FlinkKafkaPartitioner
public class PartitionerTest extends FlinkKafkaPartitioner<Tuple2<String, String>> {
/**
* @param record 发送的数据
* @param key key 值
* @param value value值
* @param targetTopic 发送的topic
* @param partitions topic的分区数
* @return 发送到几号分区
*/
@Override
public int partition(Tuple2<String, String> record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return 0;
}
}
3.写一个发送方法
public void kafkaSendTest(DataStream<Tuple2<String, String>> dataStream, String topic, FlinkKafkaPartitioner flinkKafkaPartitioner) {
dataStream.addSink(new FlinkKafkaProducer011<>(topic,
new DefinedSerializationTest(),
properties,
Optional.of(flinkKafkaPartitioner))).name("sinkToKafka");
}
这样就可以自己定义发送的key,value以及分区数
努力吧,皮卡丘