目录
概述
窗口是处理无界面流的核心。窗口将流分成有限大小的“桶”,我们可以在这些桶上进行计算。窗口主要有两种,一种基于时间(Time-based Window),一种基于数量(Count-based Window)。本文档重点介绍如何在Flink中执行窗口化。
窗口调用的一般结构如下所示。Keyed Windows表示带健值的流,而Non Keyed Windows表示不带健值的流。区别是带键的keyBy(…)调用window(),而不带键的流调用windowAll()。
窗口的生命周期
当属于某个窗口的第一笔数据到达时,就创建窗口;当时间超过结束时间戳加上用户指定的允许延迟时间时,窗口被删除。Flink保证只删除基于时间的窗口,而不是其他类型的窗口,如全局窗口。
每个窗口都有一个Trigger和函数(ProcessWindowFunction,ReduceFunction,AggregateFunction或FoldFunction)。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。触发策略可能类似于“当窗口中的元素数量大于4”时,或“当水位线通过窗口结束时”。触发器还可以决定在创建和删除之间的任何时间清除窗口的内容。在这种情况下,清除仅涉及窗口中的元素,而不是窗口元数据。这意味着仍然可以将新数据添加到该窗口。
还可以指定一个Evictor,它可以在触发器触发后以及应用函数之前和/或之后从窗口中删除元素。
Keyed和Non-Keyed窗口
在定义窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(...)将无界流分成逻辑的keyed stream。 如果未调用keyBy(...),则表示流不是keyed stream。对于Keyed流,可以将传入事件的任何属性用作key。 拥有Keyed stream将允许窗口计算由多个任务并行执行,因为每个逻辑Keyed流可以独立于其余任务进行处理。 相同Key的所有元素将被发送到同一个任务。在Non-Keyed流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为1。
窗口分配器
决定了是否使用keyed之后,下一步就是指定windows assigner了。windows assigner定义了流中的数据怎么分配给window。对于keyed stream调用window(...),而 non-keyed streams则调用windowAll()。
Flink已经预实现了多个windows assigner,比如tumbling windows, sliding windows, session windows and global windows,用户也可以通过实现WindowAssigner 类自定义windows assigner。
窗口由两个两个时间戳:start timestamp (包含) 和1个end timestamp (不包含) 组成,表示窗口的大小。可以通过TimeWindow 获取窗口的开始时间戳、结束时间戳、窗口内的最大时间戳。
Tumbling Windows
Tumbling Windows即滚动窗口,分配每一个元素给一个窗口,窗口可以指定大小,窗口之间没有重叠,如下所示。
时间间隔可以通过Time.milliseconds(x), Time.seconds(x), Time.minutes(x)等指定;还可以offset 参数指定偏移量, 对于调整窗口以适应时区要求非常有帮助。
//基于事件时间的滚动窗口
WindowedStream ds2 = ds1.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
//基于处理时间的滚动窗口
WindowedStream ds3 = ds1.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
Sliding Windows
Sliding Windows 即滑动窗口,可以通过window size参数指定窗口大小,通过window slide 参数指定窗口滑动的大小。如果window slide小于window size时,窗口存在重叠,这是某些数据就会被分配到多个窗口。
滑动窗口同样可以指定offset 参数指定偏移。
//基于事件时间的滑动窗口
WindowedStream ds2 = ds1.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)));
//基于处理时间的滑动窗口
WindowedStream ds3 = ds1.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)));
Session Windows
Session Windows即会话窗口:窗口不会出现重叠、没有固定开始时间和结束时间。当一个窗口在大于一个时间间隔(Session gap)内没有收到数据时,会话结束,窗口也关闭。这个时间间隔可以是静态配置的,也可以session gap extractor 函数进行定义。
//基于事件时间的静态会话窗口
WindowedStream ds2 = ds1.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)));
//基于处理时间的静态会话窗口
WindowedStream ds3 = ds1.keyBy(0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
//基于事件时间的动态会话窗口
WindowedStream ds2 = ds1.keyBy(0)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Object>() {
@Override
public long extract(Object o) {
return 0;
}
}));
//基于处理时间的动态会话窗口
WindowedStream ds3 = ds1.keyBy(0)
.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Object>() {
@Override
public long extract(Object o) {
return 0;
}
}));
Global Windows
全局窗口:相同key的所有元素分配给一个独立的窗口,如果没有自定义触发器,则窗口不会执行任何计算,因此使用该窗口函数时需要知道触发器。该窗口没有一个自然的结束点。
窗口函数
窗口函数(window function)就是在每个窗口中执行计算的函数,用于处理每个窗口中的数据。当windows准备就绪,就触发窗口函数执行。
主要的窗口函有:ReduceFunction, AggregateFunction, FoldFunction or ProcessWindowFunction。ReduceFunction和AggregateFunction在每个数据达到时进行聚合,执行比较高效;ProcessWindowFunction 通过在把窗口的所有元素保存在一个Iterable 中,然后再触发ProcessWindowFunction, 因此本地需要进行所有数据的缓存,执行起来效率比较低,但是通过扩展该函数,可以灵活实现自定义功能。
ReduceFunction
使用reduce算子时,需要重写一个ReduceFunction。它接受两个相同类型的输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。在窗口上进行reduce的原理与之类似,只不过多了一个窗口状态数据,这个状态数据的数据类型和输入的数据类型是一致的,是之前两两计算的中间结果数据。当数据流中的新元素流入后,ReduceFunction将中间结果和新流入数据两两合一,生成新的数据替换之前的状态数据。
AggregateFunction
AggregateFunction也是一种增量计算窗口函数,也只保存了一个中间状态数据,但AggregateFunction使用起来更复杂一些。它的源码定义:
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
// 在一次新的aggregate发起时,创建一个新的Accumulator,Accumulator是我们所说的中间状态数据,简称ACC
// 这个函数一般在初始化时调用
ACC createAccumulator();
// 当一个新元素流入时,将新元素与状态数据ACC合并,返回状态数据ACC
ACC add(IN value, ACC accumulator);
// 将两个ACC合并
ACC merge(ACC a, ACC b);
// 将中间数据转成结果数据
OUT getResult(ACC accumulator);
}
输入类型是IN,输出类型是OUT,中间状态数据是ACC,这样复杂的设计主要是为了解决输入类型、中间状态和输出类型不一致的问题,同时ACC可以自定义,我们可以在ACC里构建我们想要的数据结构。比如我们要计算一个窗口内某个字段的平均值,那么ACC中要保存总和以及个数。
ProcessWindowFunction
ProcessWindowFunction要对窗口内的全量数据都缓存。在Flink所有API中,process算子以及其对应的函数是最底层的实现,使用这些函数能够访问一些更加底层的数据,比如,直接操作状态等。
/**
* IN 输入类型
* OUT 输出类型
* KEY keyBy中按照Key分组,Key的类型
* W 窗口的类型
*/
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
/**
* 对一个窗口内的元素进行处理,窗口内的元素缓存在Iterable<IN>,进行处理后输出到Collector<OUT>中
* 我们可以输出一到多个结果
*/
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* 当窗口执行完毕被清理时,删除各类状态数据。
*/
public void clear(Context context) throws Exception {}
/**
* 一个窗口的上下文,包含窗口的一些元数据、状态数据等。
*/
public abstract class Context implements java.io.Serializable {
// 返回当前正在处理的Window
public abstract W window();
// 返回当前Process Time
public abstract long currentProcessingTime();
// 返回当前Event Time对应的Watermark
public abstract long currentWatermark();
// 返回某个Key下的某个Window的状态
public abstract KeyedStateStore windowState();
// 返回某个Key下的全局状态
public abstract KeyedStateStore globalState();
// 迟到数据发送到其他位置
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
Triggers
当窗口准备就绪,触发器触发窗口函数处理数据,每个WindowAssigner 都有一个默认的的触发器,如果默认触发器不适应业务需要,用户可以通过trigger(...).指定。
触发器有5个方法用于对不同的事件进行响应:
- onElement():当数据被增加到窗口时触发
- onEventTime():首次注册基于事件时间的定时器时被触发
- onProcessingTime():首次注册基于事件时间的定时器时被触发
- onMerge():带状态的窗口合并时触发
- clear() :窗口计算结束后清理窗口时触发
前三个方法通过返回TriggerResult来决定如何对调用事件执行操作,这些操作有四种:
- CONTINUE: 什么都不做
- FIRE: 触发窗口函数执行计算
- PURGE: 清理窗口的数据内容
- FIRE_AND_PURGE: 触发窗口函数执行计算,计算结束后清理窗口内容。
窗口就绪后,触发会执行,默认触发器返回FIRE。基于事件时间的窗口默认的触发器是EventTimeTrigger,触发时间是watermarks,基于处理时间的窗口默认的触发器是ProcessingTimeTrigger,触发时间是processing time。
Evictors
Evictors即追逐者。在WindowAssigner 和 Trigger后通过evictor(...)调用,用于在窗口函数执行前或执行后、触发器调用后清理数据。
Evictors有两个方法:
- evictBefore: 窗口函数调用前执行
- evictAfter:窗口函数调用后执行
Allowed Lateness
基于事件时间(event-time)的窗口,可能会发生数据迟到的现象。例如watermark 就是在数据元素中打上窗口的结束时间戳来跟踪基于事件的数据是否已经就绪。
默认情况下,当watermark 超过窗口的结束时间时,迟到数据将被删除,但是Flink允许设置一个最大的允许数据迟到时间,在允许迟到时间的范围内,数据将不会被删除,当迟到的数据到达窗口时,立即触发窗口函数更新计算值。
在Flink底层,Flink保存window的状态直到允许迟到的时间之后才删除window和保存的状态数据。
//基于事件时间的滚动窗口
WindowedStream ds2 = ds1.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.minutes(1));
迟到被删除的数据可以通过side output特性获取到另外一个数据流中,以便监控和跟踪。
DataStream<String> text = env.socketTextStream("192.168.23.210",9000);
DataStream ds1 = text.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(String s) throws Exception {
return Tuple2.of(s,1);
}
});
//定义一个OutputTag
OutputTag<Tuple2<String,Integer>> lateOutputTag = new OutputTag<Tuple2<String,Integer>>("late-data"){};
WindowedStream ds2 = ds1.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag);
//获取迟到被删除的数据
DataStream<Tuple2<String,Integer>> lateStream = ds2.getSideOutput(lateOutputTag);
使用window计算结果
窗口函数的计算结果扔是DataStream,不保留任何关于窗口操作的信息,如果想在窗口计算后仍然保留一些窗口计算过程的信息,只能在ProcessWindowFunction在人工编码附加进去。但是与数据有关的timestamp 信息被保留下来,用于允许数据迟到的情况下处理后续迟到数据。
窗口计算结果中保留数据的时间戳信息对于下游要使用相同窗口大小的数据流的情况非常有用。如第一个数据流每1小时为一个窗口计算产品销售额,接着下一个数据流每1小时为一个窗口计算Top10销售额对应的产品。
考虑state的大小
Flink 的windows支持定义一段很长的时间,比如数日、数周甚至数月,对应保存状态的state存储量可能会非常大。因此需要考虑Flink集群的存储大小,以下3点有助于存储的优化:
(1)Flink为每个数据在每一个窗口都创建一个副本。对于滚动窗口,因为一个数据严格属于一个窗口,所以不窗口重叠的情况;对于滑动窗口,因为窗口可能存在重叠的情况,数据可能存在多份。因此不建议滑动窗口设置成为一天或一秒。
(2)ReduceFunction
, AggregateFunction
, 和FoldFunction
对减少存储量非常显著,因为每个窗口只保存一个状态值。
(3)Evictor 防止了预聚合,因为每一个窗口的数据都需要逐一经过Evictor。