DataStream API编程指南之Windows编程(十)
窗口编程是处理无限流的核心。Windows将流分割为有限大小的“桶”,我们可以在“桶”上面应用计算。本文档重点介绍如何在Flink中执行窗口操作,以及开发人员如何从其提供的功能中最大限度地获益。
下面展示的是Flink window编程的一般结构。第一个片段引用的基于key的流,而第二个片段是不带key的流。我们可以看出来,唯一的不同就是带key的流调用keyBy(…),而不带key的流将window(...)
换成 windowAll(...)
。这也将作为页面其余部分的路线图。
(1)Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
(2)Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
在上面的代码片段中,方括号[…]是可选的。Flink允许我们以多种不同的方式定制窗口函数的逻辑,以便它最大程度满足自己的需要。
1、窗口生命周期
简而言之,当属于该窗口的第一个元素到达时,就会创建一个窗口,并且当时间(Event Time或者Processing time)到达(结束时间戳 + 用户指定的允许时延) (see Allowed Lateness)时,窗口就会被完全删除。Flink确保删除的只是基于时间的窗口而不是其他类型的窗口,例如,global window(see Window Assigners)。
例如,基于Event Time的窗口策略是每5分钟创建一个non-overlapping (或者 tumbling)窗口,且允许1分钟时延。那么,Flink将会为12:00~12:05之间的时间段创建一个新窗口,在第一个消息到来且timestamp刚好落在这个区间的时候。而当水位线到达12:06的时候,这个窗口将会被移除。
此外,每一窗口都有一个触发器Trigger (see Triggers) 和一个function(ProcessWindowFunction
, ReduceFunction
, AggregateFunction
or FoldFunction
) (see Window Functions)。函数将包含计算窗口内容的方法,而触发器则指定了在什么条件下窗口可以应用function。触发策略可能是像“当窗口元素个数超过4”,或者“水位线到达窗口的结束位置”。触发器还可以决定在窗口开始创建到结束时间内的任何时间清除窗口的内容。在这种情况下,清除仅仅指删除窗口中的元素,而不是窗口的元数据。这也就意味着新的数据仍然可以被添加到窗口中。
除了上面介绍的内容,你也可以指定一个Evictor
(see Evictors) ,它能够在触发器触发之后以及在函数应用之前或者之后从窗口中删除元素。
在下文中,我们将对上述每个组件进行更详细的介绍。
2、Keyed vs Non-Keyed Windows
首先,我们要做的事情就是指定我们的stream是否是带key的。这个需要在窗口被定义之前完成。使用keyBy(…)将无限的流切分成逻辑的带key流。如果keyBy(…)没有被调用,那么流就是不带key的。
在带key的流的情形下,任何输入事件的属性可以被当作key来使用。拥有一个keyed stream将允许多个任务并行执行窗口计算,这是因为每个keyed stream都可以独立于其他任务进行处理。所有具有相同key的元素会被发送到相同的并行任务中。
在non-keyed streams中,你的初始stream将不会被分成多个逻辑流,并且所有的窗口逻辑将会由单个任务执行,即,Non-Keyed窗口的并行度为1。
3、窗口分配器(Window Assigners)
窗口分配器:定义如何将数据分配给窗口。
在指定stream是否是带key之后,下一步就是要定义一个窗口分配器。这个窗口分配器定义了元素如何被分配到窗口中。这个是通过在window(…)(带key的流)和windowAll()(不带key的流)的调用中指定窗口分配器来完成的。
一个窗口分配器负责将每一个进来的元素分配到一个或者多个窗口。Flink带有预定义的窗口分配器用于处理最常见的用例,即 tumbling windows, sliding windows, session windows and global windows。我们也可以通过继承WindowAssigner类实现自定义的窗口分配器。所有内置的窗口分配器(除global windows)都是基于时间分配元素到窗口,而这个时间可以是processing time或者event time。可以查看event time这部分的内容,学习processing time或和vent time两种时间的不同,以及时间戳和水位线是如何生成的。
基于时间的窗口有一个开始时间戳(包含)和一个结束时间戳(不包含),[开始时间戳,结束时间戳)共同描述了窗口的大小。在代码中,Flink在处理基于时间的窗口时使用TimeWindow,该窗口具有查询开始和结束时间戳的方法,还有一个额外的方法maxTimestamp(),这个方法返回给定窗口允许的最大时间戳。
接下来,我们展示Flink的预定义窗口分配器是如何工作的,以及在DataStream程序中如何进行使用。接下来的图片可视化每一种窗口分配器的工作机制。紫色的圆圈代表stream中的元素,它们被一些key划分(在本例中,key是user1、user2和user3)。x轴则表示时间进展。
内置窗口分配器:
-
tumbling windows
, 滚动窗口 -
sliding windows
, 滑动窗口 -
session windows
,会话窗口 -
global windows
,全局窗口
Note:一个元素有可能被分配到多个窗口。
(1)tumbling windows, 滚动窗口
滚动窗口将每个元素分配给一个指定大小的窗口。滚动窗口有固定大小并且窗口是不会重叠的。例如,你可以指定一个5分钟的滚动窗口,当前窗口将会被评估,并每5分钟就会开始一个新的窗口。
滚动窗口会将无限数据流按照指定的时间间隔给切分成固定大小的窗口。
例子:
object WindowAppScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment
.getExecutionEnvironment
val data: DataStream[String] = env
.socketTextStream("localhost", 9999)
data.flatMap(_.split(","))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
.setParallelism(1)
env.execute("window app")
}
}
可以通过Time.milliseconds(x), Time.seconds(x), Time.minutes(x)等方式指定时间间隔。
如上例所示,滚动窗口分配器还可以采用可选的offset参数,该参数可以用于更改窗口的对齐方式。例如,如果滚动窗口没有offset,那么你将能够获取的窗口有1:00:00.000 - 1:59:59.999, 2:00:00.000 - 2:59:59.999等窗口。如果你想要改变窗口的对齐方式,只需要指定一个offset参数即可。如果有15分钟的offset,那么,将会获得1:15:00.000 - 2:14:59.999, 2:15:00.000 - 3:14:59.999等窗口。对于偏移参数的重要使用是将窗口调整为非UTC-0时区。例如,在中国你需要指定Time.hours(-8)的偏移参数。
(2)sliding windows, 滑动窗口
滑动窗口分配器将元素分配到固定大小的窗口。和滚动窗口分配器很像,窗口大小可以通过窗口大小参数进行配置。另一个窗口滑动参数用于控制滑动窗口启动的频率,也就是多久启动一个滑动窗口。因此,如果滑动窗口小于窗口大小,窗口可能会重叠。在这种情况下,一个元素会被分配给多个窗口。
例如,你有一个窗口大小是10分钟,而5分钟滑动一次。在这种情况下,你每隔5分钟获得一个窗口,这个窗口包含了最后10分钟内到达的时间,如下图所示。
Note: 窗口大小固定,但是窗口会进行滑动。每隔固定时间窗口进行滑动,例如窗口大小是10分钟,而每隔5分钟产生一个新的窗口,那么部分数据会分配给多个窗口。
窗口大小 > 滑动时间,那么就会重叠;
窗口时间 < 滑动时间,不会有重叠部分。
使用实例:
object SlidingWindowsAppScala {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment
.getExecutionEnvironment
val data: DataStream[String] = env
.socketTextStream("localhost", 9999)
data.flatMap(_.split(","))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(2), Time.seconds(2))
.sum(1)
.print()
.setParallelism(1)
env.execute("slide window app")
}
}
不同之处在于:添加一个滑动时间。
timeWindow(Time.seconds(2), Time.seconds(2))
时间间隔可以通过如下方式进行指定,Time.milliseconds(x), Time.seconds(x), Time.minutes(x)等。
如上一个示例所示,滑动窗口分配器还接受一个可选的offset(偏移)参数,可用于更改窗口的对齐方式。例如,如果没有30分钟的滑动窗口,最终生成的窗口是1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999等。如果你想要改变生成的窗口,只需要给定一个偏移参数。如果你给的滑动窗口偏移参数是15分钟,那么,就会得到1:15:00.000 - 2:14:59.999, 1:45:00.000 - 2:44:59.999等窗口。对于偏移参数的重要使用是将窗口调整为非UTC-0时区。例如,在中国你需要指定Time.hours(-8)的偏移参数。
(3)session windows, 会话窗口
会话窗口分配器通过"Session"进行分组。会话窗口不会重叠且没有固定的开始和结束时间,这与滚动窗口和滑动窗口不同。当会话窗口在一段确定的时间内不接受元素时,窗口就会被关闭,也就是发生不活动间隙的时候。一个会话窗口分配器可以通过静态会话间隙或者会话间隙提取函数进行配置,它们定义了不活动的时间有多长。当超过这个时间间隔时,当前的会话窗口就会被关闭,并且后续的元素将会被分配到新的会话窗口。
下面通过一个例子介绍如何使用会话窗口。
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
可以通过Time.milliseconds(x), Time.seconds(x), Time.minutes(x)等方式指定静态的时间间隔。
通过实现SessionWindowTimeGapExtractor 接口指定动态的时间间隔。
注意:因为会话窗口没有固定的开始时间和结束时间,所以它有不同于滚动窗口和滑动窗口的评估方式。在内部,会话窗口操作符会对于每一个到达的记录创建一个新的窗口,如果窗口之间的间隔小于定义的时间间隔,则将窗口进行合并。为了能够合并窗口,会话窗口操作符需要一个合并触发器和一个合并窗口函数,例如ReduceFunction, AggregateFunction, or ProcessWindowFunction (FoldFunction 不能够合并)。
(4)global windows,全局窗口
全局窗口分配器将所有相同key的元素分配到相同的全局窗口。这个窗口模式仅在你指定自定义触发器时才有用。否则,将不会执行任何计算,因为全局窗口没有自然的终点让我们能够处理聚合的数据。
下面的代码片段展示如何使用一个全局窗口。
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
4、Window Functions
在定义了窗口分配器之后,我们需要指定在每个窗口上执行的计算。这是窗口函数的任务,它用于在系统确定一个窗口已准备好进行处理时处理每个(可能是基于键的)窗口的元素。
窗口函数可以是ReduceFunction, AggregateFunction, FoldFunction or ProcessWindowFunction中的一种。前两个可以更有效地执行,这是因为对于到达窗口的元素,Flink可以增量聚合。一个ProcessWindowFunction可以获得窗口中所有元素的迭代器,和元素所属窗口的额外元信息。
使用ProcessWindowFunction进行窗口转换不能像其他情况一样有效地执行,这是因为Flink在调用函数之前,必须在内部缓存窗口中的所有元素。
可以通过将ProcessWindowFunction与ReduceFunction,AggregateFunction或FoldFunction组合使用来获得窗口元素的增量聚合以及ProcessWindowFunction接收的其他窗口元数据,从而缓解这种情况。 我们将看看这些变体的每个示例。
(1)Reducfunction
ReduceFunction指定如何组合输入中的两个元素来生成相同类型的输出元素。Flink使用ReduceFunction递增地聚合窗口的元素。
reduce不是等待窗口所有的数据进行一次性处理,而是数据两两处理。
object ReduceFunctionApp {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment
.getExecutionEnvironment
val data: DataStream[String] = env
.socketTextStream("localhost", 9999)
data.flatMap(_.split(","))
.map(x => (1, x.toInt))
.keyBy(0) // 因为key都是1,所以所有的元素都到一个task去执行
.timeWindow(Time.seconds(5))
.reduce((v1, v2) => {
// 不是等待窗口所有的数据进行一次性处理,而是数据两两处理
println(v1 + "..." + v2)
(v1._1, v1._2 + v2._2)
})
.print()
.setParallelism(1)
env.execute("slide window app")
}
}
(2)AggregateFunction
AggregateFunction是ReduceFunction的一般化的版本,它有三个类型:输入类型、累加器类型,输出类型。输入类型时输入流中的元素的类型,并且AggregateFunction有一个将输入元素添加到累加器的方法。这个接口也包含创建初始累加器的方法,该方法能够合并两个累加器和从累加其中提取输出(或者输出类型)。我们将在下面的例子中看到它的工作原理。
和ReduceFunction一样,Flink会增量聚合到达窗口的元素。一个AggregateFunction 的定义和使用方法如下所示:
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
上面的例子功能是计算了窗口中元素的第二个字段的平均值。
(3)FoldFunction
FoldFunction指定窗口的输入元素如何与输出类型的元素组合。对于添加到窗口的每个元素和当前输出值,将递增地调用FoldFunction。第一个元素与输出类型的预定义初始值组合在一起。
FoldFunction的定义与使用如下所示:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") {
(acc, v) => acc + v._2 }
上面例子功能是所有输入Long的值拼接成字符串,初始的字符串是空串。
注意:fold()函数不能用于会话窗口或者其他可以合并的窗口。
(4)ProcessWindowFunction
ProcessWindowFunction获取到窗口中的所有元素,并且能够通过context对象获取到时间和状态信息,相比于其他window function来说,ProcessWindowFunction更加灵活。但是,这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部进行缓冲,直到窗口准备好处理为止。
一个窗口的所有数据只执行一次process函数。
这种方式可以做一个窗口的数据排序。
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* The context holding window metadata
*/
abstract class Context {
/**
* Returns the window that is being evaluated.
*/
def window: W
/**
* Returns the current processing time.
*/
def currentProcessingTime: Long
/**
* Returns the current event-time watermark.
*/
def currentWatermark: Long
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
}
}
ProcessWindowFunction可以这样定义和使用:
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
上面的例子的功能是统计一个窗口中元素的个数。此外,窗口函数可以添加窗口的信息到输出中。
注意:使用ProcessWindowFunction 处理简单的聚合(例如,个数统计)是非常低效的。下一节将展示如何将ReduceFunction或AggregateFunction与ProcessWindowFunction结合使用,以获得增量聚合和ProcessWindowFunction的添加信息。
(5)ProcessWindowFunction with Incremental Aggregation
ProcessWindowFunction 能够和ReduceFunction、AggregateFunction、FoldFunction 结合使用,以实现增量聚合到达窗口的元素。当窗口关闭的时候,ProcessWindowFunction将会返回聚合结果。这允许它在访问processwindow函数的额外窗口元信息的同时增量地计算窗口。
注意:你还可以使用遗留的WindowFunction代替ProcessWindowFunction进行增量窗口聚合。
Incremental Window Aggregation with ReduceFunction
一下示例显示了如何将增量地ReduceFunction与ProcessWinddowFunction结合使用,用以返回窗口中最小地事件以及窗口地开始时间。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(
(r1: SensorReading, r2: SensorReading) => {
if (r1.value > r2.value) r2 else r1 },
( key: String,
context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((context.window.getStart, min))
}
)