1、Flink中的时间语义
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
Event Time: 是时间创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳
Ingestion Time:数据进入Flink的时间
Processing Time: 是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time
例如,一条日志进入Flink的时间为2017-11-12 10:00:00.123
,到达Window的系统时间为2017-11-12 10:00:01.234
,日志的内容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。
2、EventTime
在Flink的流式处理中,绝大部分的业务都会使用EventTime,一般只在EventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
默认情况下,Flink框架中处理的时间语义为ProcessingTime,如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
1 import org.apache.flink.streaming.api.TimeCharacteristic 2 3 val env: StreamExecutionEnvironment = 4 StreamExecutionEnvironment.getExecutionEnvironment 5 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
3、Watermark
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
- Watermark是一种衡量Event TIme进展的机制
- Watermark是用于乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现
- 数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的
- Watermark可以理解为一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定EventTime 小于 maxEventTime -t 的所有数据都已经到达,如果有窗口的结束时间等于 maxEventTime -t,那么这个这个窗口被触发执行
窗口计算时间: 1. 窗口的划分:1min(60s) / window size(5s) = 12 [00 - 05) {05 - 10) 2. 数据被采集后,会放置在不同的窗口中: 02, 03, 07, 05 [00 - 05) <- 02, 03 [05 - 10) <- 07, 05 3. 触发窗口的计算 3.1 当时间语义到达窗口结束时间时,会自动触发窗口计算 : maxEventTime >= window end time 3.2 也可以使用watermark延迟计算 : maxEventTime - watermark >= window end time
有序流的Watermarker如下图所示:(Watermark设置为0)
乱序流的Watermarker如下图所示:(Watermark设置为2)
当Flink接收到数据时,会按照一定的规则去生成Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 延迟时长,也就是说,Watermark是由数据携带的,一旦数据携带的Watermark比当前未触发的窗口的结束时间要晚,那么就会触发相应窗口的执行。
由于Watermark是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。
Watermark 就是触发前一个窗口的“关窗时间”,一旦触发关窗那么以当前时刻为准在窗口范围内的所有数据都会收入窗口中
只要没有达到Watermark那么不管现实中的时间推进了多久都不会触发关窗
Watermark引入
1 object Watermark { 2 def main(args: Array[String]): Unit = { 3 4 // TODO Watermark 5 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; 6 env.setParallelism(1) 7 8 // 默认情况下,Flink采用ProcessingTime进行时间窗口的计算 9 // TODO 但是也可以设置为EventTime 10 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 11 val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999) 12 val waterDS = dataDS.map( 13 data=>{ 14 val datas = data.split(",") 15 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) 16 } 17 ) 18 19 // 设定数据的事件时间已经定义Watermark 20 val markDS: DataStream[WaterSensor] = waterDS.assignTimestampsAndWatermarks( 21 // TODO 设定Watermark延迟5s进行窗口计算 22 new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(3)) { 23 // TODO 提取每条数据的事件时间(毫秒) 24 override def extractTimestamp(element: WaterSensor): Long = { 25 element.ts * 1000 26 } 27 } 28 ) 29 30 // TODO 划分窗口 31 markDS.keyBy(_.id).timeWindow(Time.seconds(5)) 32 .apply( 33 (s:String, window:TimeWindow, list:Iterable[WaterSensor], out:Collector[String]) => { 34 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 35 out.collect(s"window:[${window.getStart}-${window.getEnd}):{ ${list.mkString(",")} }") 36 } 37 ).print("data>>>>") 38 39 markDS.print("watermark") 40 41 env.execute() 42 } 43 }
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。
TimestampAssigner有两种类型
- AssignerWithPeriodicWatermarks
- AssignerWithPunctuatedWatermarks
AssignerWithPeriodicWatermarks
周期性的生成watermark:系统会周期性的将watermark插入到流中(水位线也是一种特殊的事件数据)。默认周期是200毫秒。可以使用 ExecutionConfig.setAutoWatermarkInterval()
方法进行设置。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每隔5秒产生一个watermark env.getConfig.setAutoWatermarkInterval(5000)
产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks
的getCurrentWatermark()
方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。
这个检查保证了水位线是单调递增的。
如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。
自定义一个周期性的时间戳抽取:
1 object Time_Watermark2 { 2 def main(args: Array[String]): Unit = { 3 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; 4 env.setParallelism(1) 5 env.getConfig.setAutoWatermarkInterval(500) 6 // TODO 但是也可以设置为EventTime 7 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 8 9 val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999) 10 val waterDS = dataDS.map( 11 data=>{ 12 val datas = data.split(",") 13 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) 14 } 15 ) 16 17 // 设定数据的事件时间已经定义Watermark 18 val markDS: DataStream[WaterSensor] = waterDS.assignTimestampsAndWatermarks( 19 new MyPeriodicWatermarks(Time.seconds(5).getSize) 20 ) 21 22 // TODO 允许延迟数据进行处理 23 markDS.keyBy(_.id).timeWindow(Time.seconds(5)).allowedLateness(Time.seconds(3)) 24 .apply( 25 (s:String, window:TimeWindow, list:Iterable[WaterSensor], out:Collector[String]) => { 26 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 27 //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }") 28 out.collect(s"window:[${window.getStart}-${window.getEnd}):{ ${list.mkString(",")} }") 29 } 30 ).print("data>>>>") 31 32 markDS.print("watermark") 33 34 env.execute() 35 36 } 37 // 自定义数据抽取和设定watermark 38 // Watermark(时间点) = EventTime(10s) - 延迟时间(5) 39 class MyPeriodicWatermarks(t:Long) extends AssignerWithPeriodicWatermarks[WaterSensor] { 40 41 private var maxTS : Long = 0L 42 43 override def getCurrentWatermark: Watermark = { 44 new Watermark(maxTS - t) 45 } 46 47 // 抽取时间戳(EventTime) 48 override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = { 49 val ts = element.ts * 1000 50 maxTS = math.max(maxTS, ts) 51 ts 52 } 53 } 54 55 }
一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用assignAscendingTimestamps,这个方法会直接使用数据的时间戳生成watermark。
val tsDS = mapDS.assignAscendingTimestamps(e => e._2)
而对于乱序数据流,如果我们能大致估算出数据流中的事件的最大延迟时间,可以使用如下代码:
mapDS.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(5)){ override def extractTimestamp(element: (String, Long, Int)): Long = { element._2 * 1000 } })
AssignerWithPunctuatedWatermarks
间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理:
1 object Time_Watermark2 { 2 def main(args: Array[String]): Unit = { 3 4 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; 5 env.setParallelism(1) 6 env.getConfig.setAutoWatermarkInterval(500) 7 // TODO 但是也可以设置为EventTime 8 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 9 val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999) 10 val waterDS = dataDS.map( 11 data=>{ 12 val datas = data.split(",") 13 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) 14 } 15 ) 16 17 // 设定数据的事件时间已经定义Watermark 18 val markDS: DataStream[WaterSensor] = waterDS.assignTimestampsAndWatermarks( 19 new MyPunctuatedWatermarks() 20 ) 21 22 // TODO 允许延迟数据进行处理 markDS.keyBy(_.id).timeWindow(Time.seconds(5)).allowedLateness(Time.seconds(3)) 23 .apply( 24 (s:String, window:TimeWindow, list:Iterable[WaterSensor], out:Collector[String]) => { 25 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 26 out.collect(s"window:[${window.getStart}-${window.getEnd}):{ ${list.mkString(",")} }") 27 } 28 ).print("data>>>>") 29 30 markDS.print("watermark") 31 32 env.execute() 33 34 } 35 // 自定义间歇性生成watermark 36 class MyPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[WaterSensor] { 37 override def checkAndGetNextWatermark(lastElement: WaterSensor, extractedTimestamp: Long): Watermark = { 38 if (lastElement.id == "sensor_1") { 39 new Watermark(lastElement.ts * 1000 - 5000) 40 } else { 41 new Watermark(lastElement.ts * 1000) 42 } 43 } 44 45 override def extractTimestamp(element: WaterSensor, previousElementTimestamp: Long): Long = { 46 element.ts * 1000 47 } 48 } 49 }
4、EventTime在Window中的使用
4.1 TumblingEventTimeWindows滚动窗口
1 object Window_Tumbling { 2 def main(args: Array[String]): Unit = { 3 4 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; 5 env.setParallelism(1) 6 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 7 8 val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999) 9 val waterDS = dataDS.map( 10 data=>{ 11 val datas = data.split(",") 12 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) 13 } 14 ) 15 16 // 设定数据的事件时间已经定义Watermark 17 val markDS: DataStream[WaterSensor] = waterDS.assignTimestampsAndWatermarks( 18 // TODO 设定Watermark延迟5s进行窗口计算 19 new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(3)) { 20 // TODO 提取每条数据的事件时间(毫秒) 21 override def extractTimestamp(element: WaterSensor): Long = { 22 element.ts * 1000 23 } 24 } 25 ) 26 27 // TODO 允许延迟数据进行处理 28 markDS.keyBy(_.id) 29 //.timeWindow(Time.seconds(5)) 30 // timeWindow(size)方法其实就是TumblingEventTimeWindows.of的一种封装 31 .window(TumblingEventTimeWindows.of(Time.seconds(5))) 32 .apply( 33 (s:String, window:TimeWindow, list:Iterable[WaterSensor], out:Collector[String]) => { 34 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 35 out.collect(s"window:[${window.getStart}-${window.getEnd}):{ ${list.mkString(",")} }") 36 } 37 ).print("data>>>>") 38 39 markDS.print("watermark") 40 41 env.execute() 42 43 } 44 45 }
结果是按照Event Time的时间窗口计算得出的,而不是系统的处理时间
4.2 SlidingEventTimeWindows滑动窗口
1 object Window_Sliding { 2 def main(args: Array[String]): Unit = { 3 4 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; 5 env.setParallelism(1) 6 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 7 8 val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999) 9 val waterDS = dataDS.map( 10 data=>{ 11 val datas = data.split(",") 12 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) 13 } 14 ) 15 16 // 设定数据的事件时间已经定义Watermark 17 val markDS: DataStream[WaterSensor] = waterDS.assignTimestampsAndWatermarks( 18 // TODO 设定Watermark延迟5s进行窗口计算 19 new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(3)) { 20 // TODO 提取每条数据的事件时间(毫秒) 21 override def extractTimestamp(element: WaterSensor): Long = { 22 element.ts * 1000 23 } 24 } 25 ) 26 27 // TODO 允许延迟数据进行处理 28 markDS.keyBy(_.id) 29 //.timeWindow(Time.seconds(6), Time.seconds(3)) 30 //.window(SlidingEventTimeWindows.of(Time.seconds(6), Time.seconds(3))) 31 .apply( 32 (s:String, window:TimeWindow, list:Iterable[WaterSensor], out:Collector[String]) => { 33 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 34 out.collect(s"window:[${window.getStart}-${window.getEnd}):{ ${list.mkString(",")} }") 35 } 36 ).print("data>>>>") 37 38 markDS.print("watermark") 39 40 env.execute() 41 42 } 43 }
4.3 EventTimeSessionWindows会话窗口
相邻两次数据的EventTime的时间差超过指定的时间间隔就会触发执行。如果加入Watermark, 会在符合窗口触发的情况下进行延迟。到达延迟水位再进行窗口触发。
1 object Window_Session { 2 def main(args: Array[String]): Unit = { 3 4 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; 5 env.setParallelism(1) 6 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 7 8 val dataDS: DataStream[String] = env.readTextFile("input/data1.log") 9 val waterDS = dataDS.map( 10 data=>{ 11 val datas = data.split(",") 12 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) 13 } 14 ) 15 16 // 设定数据的事件时间已经定义Watermark 17 val markDS: DataStream[WaterSensor] = waterDS.assignTimestampsAndWatermarks( 18 // TODO 设定Watermark延迟5s进行窗口计算 19 new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(0)) { 20 // TODO 提取每条数据的事件时间(毫秒) 21 override def extractTimestamp(element: WaterSensor): Long = { 22 element.ts * 1000 23 } 24 } 25 ) 26 27 // TODO 将指定时间间隔的数据进行分窗口计算。 28 markDS.keyBy(_.id) 29 .window(EventTimeSessionWindows.withGap(Time.seconds(2))) 30 .apply( 31 (s:String, window:TimeWindow, list:Iterable[WaterSensor], out:Collector[String]) => { 32 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 33 out.collect(s"window:[${window.getStart}-${window.getEnd}):{ ${list.mkString(",")} }") 34 } 35 ).print("data>>>>") 36 37 markDS.print("watermark") 38 39 env.execute() 40 } 41 }