flink写数据到kafka exactly-once模式下找不到序列化类的问题
问题描述:
使用FlinkKafkaProducer011写数据到kafka集群时,在exactly-once模式下,在idea测试没问题,数据可以正常写入kafka,但是打成jar包到flink集群执行就报了如下错误。将模式改成AT_LEAST_ONCE的话,就可以正常部署到集群执行,在IDEA中也可以正常执行。
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) ~[?:?]
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) ~[?:?]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) ~[?:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) ~[?:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) ~[?:?]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:360) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:123) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.lambda$abortTransactions$3(FlinkKafkaProducer011.java:929) ~[?:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_231]
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) ~[?:1.8.0_231]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_231]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) ~[?:1.8.0_231]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_231]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_231]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_231]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_231]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_231]
kafka Properties配置如下:
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
问题定位
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.lambda$abortTransactions$3(FlinkKafkaProducer011.java:929) ~[?:?]
找到FlinkKafkaProducer011 的代码,929行为:try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =:
private void abortTransactions(Set<String> transactionalIds) {
transactionalIds.parallelStream().forEach(transactionalId -> {
// 不要弄乱原始配置或任何其他属性
// 原始对象
// -> 自己创建一个内部 kafka 生产者,不依赖于 initTransactionalProducer()。
final Properties myConfig = new Properties();
myConfig.putAll(producerConfig);
initTransactionalProducerConfig(myConfig, transactionalId);
try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
new FlinkKafkaProducer<>(myConfig)) {
// 调用 initTransactions 就足够了 - 这将中止任何拖延的事务
kafkaProducer.initTransactions();
}
});
}
源码显示abortTransactions方法用来处理消息的事务,确保由具有相同 transactional.id 的生产者的先前实例发起的任何事务都已完成。 如果前一个实例因正在进行的事务而失败,它将被中止。 如果最后一个事务已开始完成但尚未完成,则此方法等待其完成。可以看到,flink在处理失败的事务时,重新加载了一个FlinkKafkaProducer对象,很有可能是这个KafkaProducer对象的加载出了问题。kafka client中生产者对象的加载是利用的AppClassLoader,此时flink在创建FlinkKafkaProducer对象时,线程中的类加载器应该不是AppClassLoader,所以导致以上的问题。
查阅Flink社区相关邮件发现 链接:Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory,
- 这个问题的直接原因应该是FlinkKafkaProducer在任何情况下都在abortTransactions中使用系统类加载器而不是用户类加载器。
- FlinkKafkaProducer:第 1098 行 transactionalIds.parallelStream().forEach(…)。 这将使用静态线程池,并且该线程池的上下文类加载器应该是系统。
这可能是Flink在excetly-once模式下的一个bug.