一、watermark
flink接收到每条记录都会产生一个waterMark,计算公式为当前接收到的最大eventTime-延迟时间,窗口函数接收到消息满足条件会触发窗口操作,因此若无满足条件消息进入则窗口不会进行计算。
窗口计算触发条件:1、watermark时间>=window_end_time;2、在[window_start_time,window_end_time)中有数据。
//指定按照eventtime时间处理 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.fromElements((1,"A"),(2,"B"),(3,"B"),(3000,"C")).assignTimestampsAndWatermarks( //指定间隔时间为1秒 new BoundedOutOfOrdernessTimestampExtractor[(Int, String)](Time.seconds(1)) { //指定字段作为eventtime override def extractTimestamp(element: (Int, String)): Long = { element._1.toLong } } ) val result = stream.keyBy(1).window( //eventtimewindow TumblingEventTimeWindows.of(Time.seconds(1)) ).sum(0)