DataStream简介
DataStream是flink实时流处理的基本数据模型,DataSet是flink批处理的数据模型。本文主要介绍DataStream,在flink的实时流处理中,所有的流对象都会继承DataStrem这个类。DataStream在实际转换(算子)处理中也会被处理成下面的五个流对象,这几个流对象除了拥有共同的方法外还有自己独有的方法,下面将一一介绍 DataSteam 及其子类的所有API该如何使用。
DataStream的API
Map
- 消费一个元素并产出一个元素
- 参数 MapFunction
- 返回DataStream
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
FlatMap
- 消费一个元素并产生零到多个元素
- 参数 FlatMapFunction
- 返回 DataStream
- 例子:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
Filter
- 根据FliterFunction返回的布尔值来判断是否保留元素,true为保留,false则丢弃
- 参数 FilterFunction
- 返回DataStream
- 例子:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
KeyBy
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
- 根据指定的Key将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行的节点的其中一个)。keyBy()是通过哈希来分区的。
- 只能使用KeyedState(Flink做备份和容错的状态)
- 参数 String,tuple的索引,覆盖了hashCode方法的POJO,不能使数组
- 返回KeyedStream
WindowAll
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
- 将元素按照某种特性聚集在一起(如时间:滑动窗口,翻转窗口,会话窗口,又如出现次数:计数窗口)
- 参数 WindowAssigner
- 返回 AllWindowedStream
Union
dataStream.union(otherStream1, otherStream2, ...);
- 将两个或多个datastream合并,创造一个新的流包含这些datastream的所有元素
- 参数DataStream(一个或多个)
- 返回UnionStream
Join
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
- 将两个DataStream按照key和window join在一起
- 参数:1. KeySelector1 2. KeySelector2 3. DataStream 4. WindowAssigner 5. JoinFunction/FlatJoinFunction
- 返回DataStream
- 例子:
- Transformation:1. 调用join方法后生成JoinedStream,JoinedStream保存了两个input 2. 调用where方法生成一个内部类Where对象,注入KeySelector1 3. 调用equalTo生成内部类EqualTo对象,注入KeySelector2 4. 调用window升成内部静态类WithWindow,并且注入WindowAssigner(在该对象中还可以注入Trigger和Evictor 5. 最后调用apply方法将(Flat)JoinFunction注入并且用一个(Flat)JoinCoGroupFunction封装起来,而在这一步会将所有注入的对象用在coGroup上。详情见下一个Window CoGroup的解析。
- Runtime: 与Window CoGroup相同,详情见下一个WIndow CoGroup解析
Window CoGroup
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
- 根据Key和window将两个DataStream的元素聚集在两个集合中,根据CoGroupFunction来处理这两个集合,并产出结果
- 参数 1. DataStream 2. KeySelector1 3. KeySelector2 4. WindowAssigner 5. CoGroupFunction
- 返回DataStream
-
Transformation:生成一个TaggedUnion类型和unionKeySelector,里面分别包含了两个流的元素类型和两个流的KeySelector。将两个流通过map分别输出为类型是TaggedUnion的两个流(map详情见StreamMap),再Union在一起(详情见Union),再使用合并过后的流和unionKeySelector生成一个KeyedStream(详情见KeyBy),最后使用KeyedStream的window方法并传入WindowAssigner生成WindowedStream,并apply CoGroupFunction来处理(详情见WindowedStream Apply方法)。总体来说,Flink对这个方法做了很多内部的转换,最后生成了两个StreamMapTransformation,一个PartitionTransformation和一个包含了WindowOperator的OneInputTransformation。
CoGroupTransformation
-
Runtime:参考每个Transformation对应的Runtime情况
Connect
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
- 将两个DataStream连接在一起,使得他们之间可以共享状态
- 参数 DataStream
- 返回ConnectedStreams
- Transformation:在这一步会生成一个包含了两个DataStream的ConnectedStreams对象,不会有Transformation产生。详情见后续ConnectedStreams的API详解。
Split
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
- 按照一个规则将一个流的元素产出到两个或多个支流(每个元素可以发送到不止一个支流)
- 参数 OutputSelector
- 返回 SplitStream
Iterate
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
});
- 通过将一个算子的输出重定向到某个输入Operator上来创个一个循环。非常适合用来持续更新一个模型。
- 过程 DataStream → IterativeStream → DataStream
ExtractTimestamps
stream.assignTimestamps (new TimeStampExtractor() {...});
- 从元素中提取timestamp来用作事件时间(EventTime)。
- 参数 TimeStampExtractor
- 返回 DataStream
Project
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
- 如果元素是Tuple,直接通过index提取出Tuple中的字段组成新的Tuple,并产出结果
- 参数 Tuple中的index(int, 一个或多个)
- 返回 DataStream
Custom partitioning
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
通过用户定义的流分区器(Partitioner)将每个元素传输到指定的subtask
- 参数 Partitioner, Tuple索引/POJO属性名/KeySelector
- 返回 DataStream
Transformation:partitionCustom类似于KeyBy,不过partitioner是由自己定制并且输出的不是KeyedStream。首先会通过KeySelector和用户实现的Partitioner生成一个CustomPartitionerWrapper(StreamPartitioner),再讲它注入到PartitionTransformation。
Random partitioning
dataStream.shuffle();
- 将元素按照均匀分布打散到下游
- 返回 DataStream
Rebalancing (Round-robin partitioning)
dataStream.rebalance();
- 通过轮询调度(Round-robin)将元素均匀的分配到下游
- 返回 DataStream
- 例子
Rescaling
dataStream.rescale();
- 通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集
- 返回 DataStream
- 原理:第一个task并行度为2,第二个task并行度为6,第三个task并行度为2。从第一个task到第二个task,Src的子集Src1 和 Map的子集Map1,2,3对应起来,Src1会以轮询调度的方式分别向Map1,2,3发送记录。从第二个task到第三个task,Map的子集1,2,3对应Sink的子集1,这三个流的元素只会发送到Sink1。
假设我们每个TaskManager有三个Slot,并且我们开了SlotSharingGroup,那么通过rescale,所有的数据传输都在一个TaskManager内,不需要通过网络。
Broadcasting
dataStream.broadcast();
- 将元素广播到每个分区
- 返回DataStream
KeyedStream的API
Reduce
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
根据ReduceFunction将元素与上一个reduce后的结果合并,产出合并之后的结果。
- 参数 ReduceFunction
- 返回 DataStream
- 例子:
Fold
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
- 根据FoldFunction和初始值,将元素与上一个fold过后的结果合并,产出合并之后的结果。
- 参数 FoldFunction
- 返回 DataStream
Aggregations
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
- Flink实现的一系列聚合方法,具体作用由方法名就可以得知
- 返回 DataStream
- 例子:
- Transformation:StreamGroupedReduce里注入了Flink内置的Aggregation方法实现,同Reduce
- Transformation:同Reduce
Window
dataStream.window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
- 窗口将同一个key的元素按照某种特性聚集在一起(如时间:滑动窗口,翻转窗口,会话窗口,又如出现次数:计数窗口)
- 返回WindowedStream
- 参数WindowAssigner
- 例子:
- Transformation: 生成一个WindowedStream,不产生Transformation,详情见WindowedStream详解
- Runtime:详情见WindowedStream
-
Interval Join
// this will join the two streams so that // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...});
- 给定一个时间间隔,将两个流中的元素按照key来做join
- 满足条件e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
- 参数 1. KeyedStream 2. Time: LowerBound and UpperBound 3. boolean(optional) 4. boolean(optional) 5. IntervalJoinFunction
- 返回DataStream
- 例子:
WindowedStream的API
-
Apply
- 使用WindowFunction对window重的元素做处理(例如聚合操作)并产出结果
- 参数 WindowFunction
- 返回 DataStream
- 例子:
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
Reduce
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
- 根据ReduceFunction将窗口中的元素按照key和window合并,并产出结果
- 参数 ReduceFunction
- 返回DataStream
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
- Flink实现的一系列聚合方法,具体作用由方法名就可以得知,需要注意的是他们被分别作用在按key和window分割过后的元素集合上
- 返回 DataStream
AllWindowedStream的API
-
Apply
// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
- 使用WindowFunction对window重的元素做处理(例如聚合操作)并产出结果
- 与WindowedStream的区别在于是否有key
- 参数 WindowFunction
- 返回 DataStream
Transformation:AllWindowedStream.apply()与WindowedStream.apply()基本是一致的,只是没有KeySelector
Runtime:通WindowedStream.apply()
ConnectedStreams的API
CoMap, CoFlatMap
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
- 同时对两个流进行Map或FlatMap操作
- 参数 CoMapFunction, CoFlatMapFunction
- 返回 DataStream
Transformation:ConnectedStream并不会产生Transformation,只会保存两个Input DataStream,从inputs中的DataStream获取父Transformation,并生成一个CoStream(Flat)Map算子。KeySelector依赖于父Transformation注入(如果是PartitionTransformation的话)。
SplitStream的API
Select
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
- 根据SplitStream中OutputSelector设定的规则获取一个或多个DataStream
- 参数 OutputNames
- 返回 DataStream
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦