官网博客中: Apache Flink中的端到端精确一次处理概述 对Flink 端到端精确一次处理和两段提交的原理,有详尽的描述
这里要写的是,关于 Flink kafka 端到端精确一次的测试
之前就大概测试过相应内容,应该是测试失败了的,只得到了至少一次的结果(之前的关注点不在这个上面,下面会说明为什么只得到 至少一次 的结果)。
这一次是要做Flink HA 相关的配置,有个重要的点就是任务在异常恢复的时候,是否能保持精确一次,这个关乎线上的数据和我们代码的写法(如果Flink 不能保证精确一次,就需要在代码里添加对应的内容)。
测试的前提当然是,开启了 checkpoint,也设置了checkpoint mode 设为精确一次:
val rock = new RocksDBStateBackend(Common.CHECK_POINT_DATA_DIR) env.setStateBackend(rock.getCheckpointBackend) // checkpoint interval 10 minute env.enableCheckpointing(2 * 60 * 1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
同时 Kafka 的生产者也必须开启精确一次的语义: FlinkKafkaProducer 没有过期的公有构造方法,都需要制定 Kafka 生产者的一致性语义:
public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize)
特别吐槽下,1.10 版本 KafkaSerializationSchema 没有提供对应的实现类,让我这种菜鸟很尴尬
看下我的生产者写法(很挫)
Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"") val sink = new FlinkKafkaProducer[String](topic+"_out" // 没用了 , new MyKafkaSerializationSchema[String](topic + "_out") // 指到这里了 , Common.getProp , FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
MyKafkaSerializationSchema 的实现如下:
public class MyKafkaSerializationSchema<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> { private String topic; public MyKafkaSerializationSchema(String topic) { this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) { return new ProducerRecord<>(topic, element.toString().getBytes()); } @Override public String getTargetTopic(T element) { return null; } }
由于ProducerRecord 必须要指定 topic,但是又获取不到 FlinkKafkaProducer 中指定的topic,就先这样写了
测试代码就简单了:
val topic = "simple_string_kafka" val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), Common.getProp) # 开启精确一次语义会报错,必须在kakfa 的 prop 中指定事务过期的时间 Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"") val sink = new FlinkKafkaProducer[String](topic+"_out" // 没用 , new MyKafkaSerializationSchema[String](topic + "_out") // 指到这里了 , Common.getProp , FlinkKafkaProducer.Semantic.EXACTLY_ONCE) env.addSource(source) .name("source") .disableChaining() .map(str => { str }) .disableChaining() .name("map1") .map(s => { s }) .addSink(sink) .name("sink")
代码就这样的,然后就是打包上传服务器。
--------未完继续
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文