目录
什么是 DataStream
Datastream API的名称来自一个特殊的DataStream类,该类用于表示Flink程序中的数据集合。可以将它们视为可以包含重复项的不可变数据集合。这些数据可以是有界的,也可以是无界的,用于处理它们的API是相同的。
注意:Flink 1.9版本后,流式API和批式API逐步进行合并,统一用流式API表示。
什么能被转化为流
Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有:
- 基本类型,即 String、Long、Integer、Boolean、Array
- 复合类型:Tuples、POJOs 和 Scala case classes
而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是对Avro有良好支持。
对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型
Tuple2<String, Integer> person = Tuple2.of("Fred", 35); String name = person.f0; Integer age = person.f1; |
如果满足以下条件,Flink 将数据类型识别为 POJO 类型。并允许“按名称”字段引用:
- 该类是公有且独立的(没有非静态内部类)
- 该类有公有的无参构造函数
- 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。
流式Flink程序的开发流程
- 获取 一个 StreamExecutionEnvironment
- 用数据源函数加载或创建原始数据
- 指定transformations 处理数据
- 指定sink算子保存数据
- 触发execution执行程序
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.File; public class DataStream01 { public static void main(String [] args) throws Exception { //1.获取 一个 StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String dir=System.getProperty("user.dir")+File.separator; //2.用数据源函数加载或创建原始数据 DataStream<String> text = env.socketTextStream("192.168.23.210",9000); //3.指定transformations 处理数据 DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } }); //4.指定sink算子保存数据 parsed.writeAsText(dir+"src"+File.separator+"outdir"); //5.触发execution执行程序 env.execute(); } } |
DataStream的数据源
DataStrem API 预定义了以下几种读取数据流的API,可以通过StreamExecutionEnvironment直接读取。
- readTextFile(path)-逐行读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回
- readFile(fileInputFormat,path)-按照指定的文件输入格式读取(一次)文件
- readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo) 这是前面两个方法在内部调用的方法。
- readFile(fileInputFormat,filePath,watchType, interval, typeOutInformation) 和前一种readFile一样,只是自定义读入后的格式。
Flink将文件读取过程分为两个子任务,即目录监视和数据读取。每个子任务都由一个单独的实体实现。监视由单个任务实现,而读取由多个并行运行的任务执行。后者的并行性等于作业并行性。单个监视任务的作用是扫描目录(根据watchType定期或仅扫描一次),找到要处理的文件,将其拆分为多个分片,并将这些拆分分配给下游的实时数据订阅者。每个分片只能由一个订阅者读取,而下游的一个订阅者可以逐个读取多个分片。
- socketTextStream:从套接字读取。元素可以用分隔符分隔。
- fromCollection(Iterator,Class):从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- fromElements(T…):从给定的对象序列创建数据流。所有对象的类型必须相同。
- fromParallelCollection(SplittableIterator,Class):从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。
- 自定义
addSource 附加一个新的源函数。例如,要从apachekafka读取,可以使用addSource(new flinkkafcaumer010<>(…))
迭代数据流
迭代(Iterations)就是利用每次的结果进行循环计算。迭代流程序实现了一个迭代功能,并将其嵌入到IterativeStream中。 由于DataStream没有边界,因此没有最大迭代次数。因此需要使用split或filter指定流的哪一部分反馈到迭代流,以及哪一部分向下游转发。
步骤:
- 指定StreamExecutionEnvironment
- 加载数据源
- 转换为IterationStream
- 定义迭代的transforation
- 定义Iteration结束条件filter
- 把结束条件的filter应用在Iteration中
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class IterationDataStream01 { public static void main(String [] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> someIntegers = env.generateSequence(0, 2); someIntegers.print(); IterativeStream<Long> iteration = someIntegers.iterate(); DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; } });
//minusOne.print(); DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } }); stillGreaterThanZero.print();
DataStream ds= iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } }); env.execute(); } } |
配置运行时参数
StreamExecutionEnvironment包含ExecutionConfig对象,该配置允许为运行时设置特定于作业的配置值。
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); ExecutionConfig executionConfig=env.getConfig(); executionConfig.setAutoWatermarkInterval(1); |
getParallelism()
/ setParallelism(int parallelism)
获取/设置平行度
常用的配置函数:
getMaxParallelism()
/setMaxParallelism(int parallelism)
获取/设置
最大平行度getNumberOfExecutionRetries()
/setNumberOfExecutionRetries(int numberOfExecutionRetries)
获取/设置失败重试次数
getExecutionRetryDelay()
/setExecutionRetryDelay(long executionRetryDelay)
获取/设置失败重试
getGlobalJobParameters()
/setGlobalJobParameters()
获取/设置作业的全局参数