自定义的timestamp以及watermark的方式
1)实现接口AssignerWithPeriodicWatermarks
接口将会周期性发送Watermark
- 实现接口AssignerWithPunctuatedWatermarks
接口根据一些到达数据的属性,例如一旦在流中碰到一个特殊的element便发送Watermark。
AscendingTimestampExtractor 和 BoundedOutOfOrdernessTimestampExtractor
参考: https://blog.csdn.net/shenxiaoming77/article/details/70598869
Periodic Watermarks
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
第一种,会以event time的Max,来设置watermark,第二种,是以当前的processing time来设置watermark
Periodic Watermarks
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
watermark不是由时间来触发的,而是以特定的event触发的,即本到某些特殊的event或message,才触发watermark,所以它的接口叫checkAndGetNextWatermark
public class WindowWaterMark {
public static void main(String[] args) throws Exception {
String hostName = "localhost";
Integer port = Integer.parseInt("8001");
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(2);
env.getConfig().setAutoWatermarkInterval(9000);
// get input data
DataStream<String> text = env.socketTextStream(hostName, port);
DataStream<Tuple3<String, Long, Integer>> counts =text.filter(new FilterClass()).map(new LineSplitter()).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>(){
private long currentMaxTimestamp = 0l;
private final long maxOutOfOrderness = 10000l;
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> element,
long previousElementTimestamp) {
// TODO Auto-generated method stub
long timestamp= element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
System.out.println("get timestamp is "+timestamp+" currentMaxTimestamp "+currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// TODO Auto-generated method stub
System.out.println("wall clock is "+ System.currentTimeMillis() +" new watermark "+(currentMaxTimestamp - maxOutOfOrderness));
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
})
.keyBy(0)
.timeWindow(Time.seconds(20))
// .allowedLateness(Time.seconds(10))
.sum(2);
counts.print();
// execute program
env.execute("Java WordCount from SocketTextStream Example");
}
参考:http://www.bubuko.com/infodetail-2342940.html
window的触发要符合以下几个条件:
1.watermark时间 >= window_end_time
2.在[window_start_time,window_end_time)中有数据存在
参考:https://blog.csdn.net/lmalds/article/details/52704170