FlinkKafkaConsumer构造参数解析
介绍
FlinkKafkaConsumer的构造方法有7种:
但是通用的参数有:
- topic/topics
- valueDeserializationSchema/keyedDeserializationSchema
- consumerProperties
#1、topic/topics:用来传入一个或者多个topic信息
#2、valueDeserializationSchema/keyedDeserializationSchema:用来将二进制的value/k&v反序列化成java/scala中的对象
#3、consumerProperties:用来配置KafkaConsumerConfig
Topic
通常用来在FlinkKafkaConsumer的构造方法中传入一个或者多个kafka的主题topic
java.util.Collections.singletonList(topic:String) 指定topics
DeserializationSchema
作用 -> 将kafka中的二进制数据的value反序列化成Java/Scala中的能识别的对象结构
//这是一个java中的接口
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>
常用的接口实现类有以下3种:
TypeInformationSerializationSchema //这是比较常用的
AbstractDeserializationSchema //这是一个抽象的
JsonDeserializationSchema //这个已经在后续版本中弃用了,用 JsonKeyValueDeserializationSchema代替了
SimpleStringSchema //这是对于String类型的反序列化结构
SimpleStringSchema使用
new FlinkKafkaConsumer[String](
"flink_topic",
new SimpleStringSchema,
new Properties()
)
TypeInformationSerializationSchema使用
new FlinkKafkaConsumer[String](
Collections.singletonList("flink_topic"), //将单个元素添加到一个topic-List中
new TypeInformationSerializationSchema[String](
//TypeInformation 通常通过BasicTypeInfo.STRING_TYPE_INFO\INTGER_TYPE_INFO.....获取
//TypeSerializer 通常通过TypeSerializer的实现类进行声明
BasicTypeInfo.STRING_TYPE_INFO, new ValueSerializer[String](classOf[String])
),
new Properties()
)
//-------------------------------------------------------------------------------------------------
new TypeInformationSerializationSchema[String](BasicTypeInfo.STRING_TYPE_INFO,new StringSerializer)
KeyedDeserializationSchema
作用 -> 将kafka中的二进制数据的key & value反序列化成Java/Scala中的能识别的对象结构
该类的实现子类有以下几种:
KeyedDeserializationSchemaWrapper //开箱即用的kv反序列化结构
JSONKeyValueDeserializationSchema //用于将json二进制数据反序列化的结构
TypeInformationKeyValueSerializationSchema //比较灵活的,也比较常用的实现类
KeyedDeserializationSchemaWrapper使用
new FlinkKafkaConsumer[String](
Collections.singletonList("flink_topic"),
//KeyedDeserializationSchemaWrapper构造器中需要传入一个DeserializationSchema实例
new KeyedDeserializationSchemaWrapper[String](new SimpleStringSchema()),
new Properties()
)
TypeInformationKeyValueSerializationSchema使用
val config: ExecutionConfig = new ExecutionConfig
new FlinkKafkaConsumer[String,String](
Collections.singletonList("flink_topic"),
new TypeInformationKeyValueSerializationSchema(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
config
),
new Properties()
)
//TODO可以声明一个工具类,来获取FlinkKafkaConsumer的对象