文章将讲解一下用的最多的 Connector —— Kafka,带大家利用 Kafka Connector 读取 Kafka 数据,做一些计算操作后然后又通过 Kafka Connector 写入到 kafka 消息队列去。
环境准备
此处略过,准备kafka环境
添加依赖,暂时用的是kafka0.10版本
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
其他依赖还有
<!--flink java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--log-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--alibaba fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
暂时先用的KakfaTool这个工具连接的kafka,界面如下
只需要填写zk连接就能连上了,连接上点击Data再点击+号按钮就能手动发消息了。
Flink 如何消费 Kafka 数据?
public class Main {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "xxx.xxxx.xxxx.xxxx:9092");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//value 反序列化
props.put("auto.offset.reset", "latest"); //从最新的数据开始消费
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
"metric", //kafka topic
new SimpleStringSchema(), // String 序列化
props)).setParallelism(1);
dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台
env.execute("Flink-kafka");
}
}
程序启动,用kafkaTool手打发消息后,控制台就能打印出来你发的消息。
使用 FlinkKafkaConsumer010 时传入了三个参数:
- Kafka topic:这个代表了 Flink 要消费的是 Kafka 哪个 Topic,如果你要同时消费多个 Topic 的话,那么你可以传入一个 Topic List 进去,另外也支持正则表达式匹配 Topic
- 序列化:上面代码我们使用的是 SimpleStringSchema,也可以自定一个Schema,实现DeserializationSchema和SerializationSchema,重写方法,主要是里面的序列化和反序列化的方法
- 配置属性:将 Kafka 等的一些配置传入
Flink 如何将计算后的数据发到 Kafka?
代码如下:
public class Main {
public static void main(String[] args) throws Exception{
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);
data.addSink(new FlinkKafkaProducer010<Metrics>(
parameterTool.get("kafka.sink.brokers"),
parameterTool.get("kafka.sink.topic"),
new MetricSchema()
)).name("flink-connectors-kafka")
.setParallelism(parameterTool.getInt("stream.sink.parallelism"));
env.execute("flink sink to kafka");
}
}
其他
-
数据 Sink 到下游的 Kafka,可你能会关心数据的分区策略,在 Flink 中自带了一种就是
FlinkFixedPartitioner,它使用的是 round-robin 策略进行下发到下游 Kafka Topic
的分区上的,当然也提供了 FlinkKafkaPartitioner 接口供你去实现自定义的分区策略。 -
如何消费多个topic的问题:
//单个 Topic public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { this(Collections.singletonList(topic), valueDeserializer, props); } //多个 Topic public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props); } //正则表达式 Topic public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) { this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props); }
-
想要获取数据的元数据信息:
在消费 Kafka 数据的时候,有时候想获取到数据是从哪个 Topic、哪个分区里面过来的,这条数据的 offset 值是多少。这些元数据信息在有的场景真的需要,那么这种场景下该如何获取呢?其实在获取数据进行反序列化的时候使用 KafkaDeserializationSchema 就行。//单个 Topic public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) { this(Collections.singletonList(topic), deserializer, props); } //多个 Topic public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) { super(topics, deserializer, props); } //正则表达式 Topic public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) { super(subscriptionPattern, deserializer, props); }
-
Kafka 消费 Offset 的选择
FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer0111<>(…);
consumer.setStartFromEarliest(); //从最早的数据开始消费
consumer.setStartFromLatest(); //从最新的数据开始消费
consumer.setStartFromTimestamp(…); //从根据指定的时间戳(ms)处开始消费。
consumer.setStartFromGroupOffsets(); //默认从提交的 offset 开始消费
1.当然了,如果你开启了checkpoint,并且设置了自动提交,就由ck去管理kafka的提交了,会在flink做checkpoint的时候一起提交这一阶段的所有offset;如果作业是从 Checkpoint 或者 Savepoint 还原的,那么上面这些配置无效,作业会根据状态中存储的 Offset 为准,然后开始消费。
2.如果想自己管理offset也可以,设置手动提交,讲offset信息保存在redis或其他数据库中,下次直接从redis出读取offset即可