1.概述
在fink中,窗口的划分可以是通过时间,也可以是通过数量来进行划分,比如timeWindow,countWidow
等,时间系统在窗口的应用主要是来注册窗口触发时间点的定时器,来决定窗口什么时候开始执行窗口函数。
关于窗口的操作,那么WindowOperator
这个类就是核心类,这个类里描述了关于窗口的操作的一些服务,他的继承结果如下:
从上图我们可以看到,他继承了AbstractUdfStreamOperator
,实现了Triggerable,OneInputStreamOperator
接口。
在WindowOperator
这个类中,初始化方法时open
,在这个初始化中,它注册了一个时间服务,这个服务可以注册定时器,在窗口中注册窗口触发的定时器。
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#open
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
在open
方法中初始化了internalTimerService
,那么具体的注册流程是在WindowOperator
这个类中的processElement
方法中,在这个方法的最后部分,不管注册的是process time
还是event time
时间窗口,都会调用registerCleanupTimer
这个方法,
protected void registerCleanupTimer(W window) {
// 首先会计算出窗口的触发时间
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
//按照时间类型进行注册 triggerContext表示的是WindowOperator.Context对象
//在注册相应类型触发器时,实际上会调用在WindowOperator 在open 中初始化的InternalTimerService
// 来完成相应的注册定时器
if (windowAssigner.isEventTime()) {
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
从上面的代码中,我们可以看到,方法的最上面是根据时间来进行判断,是否可以进行注册,最后根据时间的类型,进行注册相应的定时器。
triggerContext
表示的是WindowOperator.Context
对象,在注册相应类型触发器时,实际上会调用在WindowOperator
在open 中初始化的InternalTimerService
来完成相应的注册定时器
@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(window, time);
}
@Override
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(window, time);
}
定时器的触发操作最后会调用Triggerable
的onProcessingTime
或者onEventTime
方法,而在WindowOperator.open
方法中,初始的InternalTimerService
传入的Triggerable
对象正好是this,也就是WindowOperator
对象,也就是说在窗口中定时器的触发会调用WindowOperator
的onEventTime
或者onProcessingTime
方法,在这些方法里面会执行窗口函数触发逻辑判断、窗口函数操作与状态清除的工作。