Trigger 的作用
英文单词 trigger 的意思是触发,作为名词是扳机的意思,例如枪支上的扳机就叫 trigger,所以也有开火的意思。Flink中,window操作需要伴随对窗口中的数据进行处理的逻辑,也就是窗口函数,而 Trigger 的作用就是决定何时触发窗口函数中的逻辑执行。
Trigger 抽象类
Flink中定义了Trigger抽象类,任何trigger必须继承Trigger类,并实现其中的onElement()
,onProcessingTime()
,onEventTime()
,clear()
等抽象方法,Flink官方提供了几种常用的trigger实现,同时,用户可以根据需求自定义trigger,以下是Trigger类的部分代码:
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
/**
* Called for every element that gets added to a pane. The result of this will determine
* whether the pane is evaluated to emit results.
*
* @param element The element that arrived.
* @param timestamp The timestamp of the element that arrived.
* @param window The window to which the element is being added.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/**
* Called when a processing-time timer that was set using the trigger context fires.
*
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* Called when an event-time timer that was set using the trigger context fires.
*
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* Returns true if this trigger supports merging of trigger state and can therefore
* be used with a
* {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
*
* <p>If this returns {@code true} you must properly implement
* {@link #onMerge(Window, OnMergeContext)}
*/
public boolean canMerge() {
return false;
}
/**
* Called when several windows have been merged into one window by the
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
*
* @param window The new window that results from the merge.
* @param ctx A context object that can be used to register timer callbacks and access state.
*/
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
/**
* Clears any state that the trigger might still hold for the given window. This is called
* when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
* and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
* well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
*/
public abstract void clear(W window, TriggerContext ctx) throws Exception;
}
onElement()
方法会在窗口中每进入一条数据的时候调用一次onProcessingTime()
方法会在一个ProcessingTime定时器触发的时候调用onEventTime()
方法会在一个EventTime定时器触发的时候调用clear()
方法会在窗口清除的时候调用