1.有序生成
只需提取事件时间的时间戳作为水印即可。
java
DataStream<MyEvent> stream = ... DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { @Override public long extractAscendingTimestamp(MyEvent element) { return element.getCreationTime(); } });
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
2.有界无序生成策略
设置延迟的上限。我们知道每个事件都会延迟一段时间才到达,而这些延迟差异会比较大,所以有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟不会超过某个最大值
java
DataStream<MyEvent> stream = ... DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { @Override public long extractTimestamp(MyEvent element) { return element.getCreationTime(); } });
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
参考文章
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html