1、日志
查找当前时间戳网站:http://coolaf.com/tool/unix
要在时间戳后面加000
000002 1569813325000
000002 1569813325000
000002 1569813345000
000002 1569813345000
000002 1569813368000
000002 1569813368000
000002 1569813443000
000002 1569813491000
000002 1569813588000
000002 1569813748000
2、代码
import java.util.Properties
import java.util.regex.Pattern
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import scala.collection.mutable
object StreamingApiWaterMark {
private val patternBd = Pattern.compile("^(.*),[0-9]{1,3}(.*) - (.*)")
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment.setParallelism(1)
val dstream = environment.socketTextStream("13.11.32.192", 1111)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).trim.toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.
assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)]
(Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.
window(TumblingEventTimeWindows.of(Time.seconds(60)))
val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.
fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
set += ts
}
groupDstream.print("window::::").setParallelism(1)
environment.execute()
}
}
3、结果
textkey:> (000002,1569813325000,1)
textkey:> (000002,1569813325000,1)
textkey:> (000002,1569813345000,1)
textkey:> (000002,1569813345000,1)
textkey:> (000002,1569813368000,1)
textkey:> (000002,1569813368000,1)
textkey:> (000002,1569813443000,1)
textkey:> (000002,1569813491000,1)
textkey:> (000002,1569813588000,1)
textkey:> (000002,1569813748000,1)
window::::> Set(1569813443000, 1569813491000, 1569813345000, 1569813325000, 1569813368000, 1569813588000)
4、总结
每60s一个滚动窗口,Watermark就等于当前所有到达数据中的maxEventTime - 延迟时长。
本案例的第二个字段是当前时间戳,延迟为1s。最后按照fold聚合。
window的触发要符合以下几个条件:
1、watermark时间 >= window_end_time
2、在[window_start_time,window_end_time)中有数据存在