Window Trigger决定了Window何时触发WindowFunction的计算逻辑,因为StreamRecord经过WindowAssigner的分配后,会累积到窗口状态中。然后将这条StreamRecord交给Trigger,通过Trigger#onElement()判断是否满足触发条件。一旦满足Trigger的触发条件,就会取出Window状态中保存的所有StreamRecord,基于指定的WindowFunction进行计算,最后将Window计算结果发送给下游。
以EventTimeTrigger为例,当一条StreamRecord经过WindowAssigner分配窗口后,立即被保存到窗口状态中。然后这条StreamRecord会被传递给EventTimeTrigger,由onElement()方法判断是否满足Trigger触发条件。如果当前Watermark >= 当前Window的max timestamp(Window的EndTime - 1ms),说明已经满足触发条件,因此返回“FIRE事件”;否则,说明当前Watermark并未达到Trigger触发条件,那就注册一个基于EventTime的Timer(定时为max timestamp,等Timer触发之时,也就是当前Window Trigger触发之时),并返回“CONTINUE事件”。
需要注意的是,这里注册Timer用到的TimerService是Flink系统内部专用的InternalTimerService。
/**
* EventTimeTrigger#onElement()方法:判断是否满足Trigger的触发条件,并返回对应的事件
*/
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// 如果Window中的“max timestamp” <= 当前Watermark,返回“FIRE事件”,也就是立即触发
// max timestamp = 当前Window的EndTime - 1ms
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
// 如果不满足Trigger触发条件,那就注册基于EventTime的Timer,定时为当前Window中的“max timestamp”
ctx.registerEventTimeTimer(window.maxTimestamp());
// 返回“CONTINUE事件”,表示:未达到Trigger触发条件,Window需要继续“累积”StreamRecord到内部的窗口状态中
return TriggerResult.CONTINUE;
}
}
当WindowOperator中的Watermark更新时,InternalTimeServiceManager会调用advanceWatermark()方法通知所有的InternalTimerService实例
// 当StreamOperator中的Watermark更新时,InternalTimeServiceManager会调用该方法通知所有的InternalTimerService实例
public void advanceWatermark(Watermark watermark) throws Exception {
// 遍历存储InternalTimerService实例的Map
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
// 调用每个InternalTimerService实例它自己的advanceWatermark()方法,更新Watermark
service.advanceWatermark(watermark.getTimestamp());
}
}
InternalTimerServiceImpl会借助Triggerable接口,将“Watermark变更”的消息传递给StreamOperator,当InternalTimerService管理的Timer触发时,会调用Triggerable#onEventTime()方法
// 当Watermark更新时,InternalTimeServiceManager会通知每个InternalTimerService实例
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
// 从存储"EventTime类型的Timer"的队列中,取出队头
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
// 删除队头
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
// Triggerable是可以被InternalTimerService实例调用的“通知接口”,众多StreamOperator会实现该接口。
// 这里可以简单粗暴的认为triggerTarget就是StreamOperator,也就是WindowOperator
triggerTarget.onEventTime(timer);
}
}
Triggerable是可以被InternalTimerService实例调用的“通知接口”,众多StreamOperator都会实现这个接口,包括WindowOperator。
@Internal
public interface Triggerable<K, N> {
void onEventTime(InternalTimer<K, N> timer) throws Exception;
void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
}
于是,InternalTimerService实例就“借助”Triggerable接口,将“Watermark变更”的消息传递给了WindowOperator。WindowOperator实现的Triggerable接口的onEventTime()方法,对于更新的Watermark的处理逻辑如下:
/**
* InternalTimerService实例管理着Timer,当Watermark更新时,InternalTimeServiceManager会遍历每个InternalTimerServiceImpl实例,
* 并进一步通过Triggerable接口,通知StreamOperator
*/
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
忽略一些代码块...
// triggerContext就是WindowOperator内部的Context,这里间接调用的Trigger#onEventTime()方法。
// 处理逻辑:将Timer中的timestamp取出来,交给Trigger判断“当前的Watermark是否已经满足了Window Trigger的触发条件”。
// 根据触发器Trigger的判断结果,决定将Window中的数据是交给自定义函数xxxFunction的process回调函数,还是抛弃
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
// 根据Trigger触发结果,决定将Window中的数据是交给自定义函数xxxFunction的process回调函数,还是clear抛弃
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
// 将Window中的数据通过自定义函数xxxFunction发送出去
emitWindowContents(triggerContext.window, contents);
}
}
if (triggerResult.isPurge()) {
windowState.clear();
}
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
triggerContext就是WindowOperator内部的Context,这里间接调用的Trigger#onEventTime()方法:
/**
* 每当Watermark更新,就会由InternalTimeServiceManager通知每个InternalTimerServiceImpl实例,
* InternalTimerServiceImpl会通过Triggerable接口定义的方法,通知WindowOperator
*/
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
// 前面判断不满足Trigger触发条件而注册的Timer,定时就是Window的max timestamp。
// 如果time等于Window的max timestamp,说明Watermark已经达到Window触发条件,直接返回“FIRE事件”;
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
最后,根据Trigger的判定结果,决定这个Window里的数据到底该咋处理…
整体流程总结:
① Window会“累积”StreamRecord,并将其保存到窗口状态中。
② 将当前的StreamRecord交给Trigger,判断是否满足触发器的触发时机。当前Watermark >=(当前Window的EndTime - 1ms),说明已经达到触发条件,于是返回“FIRE事件”
③ 前一步判断不满足Trigger触发条件,就向InternalTimerService注册内部专用的Timer,定时为:max timestamp,也就是当前Window的EndTime - 1ms。然后返回“CONTINUE事件”
④ 当WindowOperator中的Watermark更新时,InternalTimeServiceManager会通过InternalTimerServiceImpl实例,借助WindowOperator实现的“通知接口”Triggerable,将更新的Watermark通知给WindowOperator。
⑤ WindowOperator收到“Watermark变更”的消息后,拿着timestamp再一次的让Trigger判断:此时是否已经满足了触发器条件
⑥ 如果这个timestamp正好等于当初Timer设置的定时时长(当前Window的EndTime - 1ms),说明此时由于Watermark更新,推动着Window里的数据正好可以触发Trigger,就返回“FIRE事件”;否则,返回“CONTINUE事件”
⑦ 根据Trigger的判断结果,决定Window中的数据的“去留问题”