• Window Trigger的设计与实现


    Window Trigger决定了Window何时触发WindowFunction的计算逻辑,因为StreamRecord经过WindowAssigner的分配后,会累积到窗口状态中。然后将这条StreamRecord交给Trigger,通过Trigger#onElement()判断是否满足触发条件。一旦满足Trigger的触发条件,就会取出Window状态中保存的所有StreamRecord,基于指定的WindowFunction进行计算,最后将Window计算结果发送给下游。

    以EventTimeTrigger为例,当一条StreamRecord经过WindowAssigner分配窗口后,立即被保存到窗口状态中。然后这条StreamRecord会被传递给EventTimeTrigger,由onElement()方法判断是否满足Trigger触发条件。如果当前Watermark >= 当前Window的max timestamp(Window的EndTime - 1ms),说明已经满足触发条件,因此返回“FIRE事件”;否则,说明当前Watermark并未达到Trigger触发条件,那就注册一个基于EventTime的Timer(定时为max timestamp,等Timer触发之时,也就是当前Window Trigger触发之时),并返回“CONTINUE事件”。

    需要注意的是,这里注册Timer用到的TimerService是Flink系统内部专用的InternalTimerService。

    /**
     * EventTimeTrigger#onElement()方法:判断是否满足Trigger的触发条件,并返回对应的事件
     */
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        // 如果Window中的“max timestamp” <= 当前Watermark,返回“FIRE事件”,也就是立即触发
        // max timestamp = 当前Window的EndTime - 1ms
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            // 如果不满足Trigger触发条件,那就注册基于EventTime的Timer,定时为当前Window中的“max timestamp”
            ctx.registerEventTimeTimer(window.maxTimestamp());
            // 返回“CONTINUE事件”,表示:未达到Trigger触发条件,Window需要继续“累积”StreamRecord到内部的窗口状态中
            return TriggerResult.CONTINUE;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    当WindowOperator中的Watermark更新时,InternalTimeServiceManager会调用advanceWatermark()方法通知所有的InternalTimerService实例

    // 当StreamOperator中的Watermark更新时,InternalTimeServiceManager会调用该方法通知所有的InternalTimerService实例
    public void advanceWatermark(Watermark watermark) throws Exception {
        // 遍历存储InternalTimerService实例的Map
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
            // 调用每个InternalTimerService实例它自己的advanceWatermark()方法,更新Watermark
            service.advanceWatermark(watermark.getTimestamp());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    InternalTimerServiceImpl会借助Triggerable接口,将“Watermark变更”的消息传递给StreamOperator,当InternalTimerService管理的Timer触发时,会调用Triggerable#onEventTime()方法

    // 当Watermark更新时,InternalTimeServiceManager会通知每个InternalTimerService实例
    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
    
        InternalTimer<K, N> timer;
    
        // 从存储"EventTime类型的Timer"的队列中,取出队头
        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            // 删除队头
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            // Triggerable是可以被InternalTimerService实例调用的“通知接口”,众多StreamOperator会实现该接口。
            // 这里可以简单粗暴的认为triggerTarget就是StreamOperator,也就是WindowOperator
            triggerTarget.onEventTime(timer);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    Triggerable是可以被InternalTimerService实例调用的“通知接口”,众多StreamOperator都会实现这个接口,包括WindowOperator。

    @Internal
    public interface Triggerable<K, N> {
    
       void onEventTime(InternalTimer<K, N> timer) throws Exception;
    
       void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    于是,InternalTimerService实例就“借助”Triggerable接口,将“Watermark变更”的消息传递给了WindowOperator。WindowOperator实现的Triggerable接口的onEventTime()方法,对于更新的Watermark的处理逻辑如下:

    /**
     * InternalTimerService实例管理着Timer,当Watermark更新时,InternalTimeServiceManager会遍历每个InternalTimerServiceImpl实例,
     * 并进一步通过Triggerable接口,通知StreamOperator
     */
    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        
        忽略一些代码块...
            
        // triggerContext就是WindowOperator内部的Context,这里间接调用的Trigger#onEventTime()方法。
        // 处理逻辑:将Timer中的timestamp取出来,交给Trigger判断“当前的Watermark是否已经满足了Window Trigger的触发条件”。
        // 根据触发器Trigger的判断结果,决定将Window中的数据是交给自定义函数xxxFunction的process回调函数,还是抛弃
        TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
    
        // 根据Trigger触发结果,决定将Window中的数据是交给自定义函数xxxFunction的process回调函数,还是clear抛弃
        if (triggerResult.isFire()) {
            ACC contents = windowState.get();
            if (contents != null) {
                // 将Window中的数据通过自定义函数xxxFunction发送出去
                emitWindowContents(triggerContext.window, contents);
            }
        }
    
        if (triggerResult.isPurge()) {
            windowState.clear();
        }
    
        if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }
    
        if (mergingWindows != null) {
            // need to make sure to update the merging state in state
            mergingWindows.persist();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    triggerContext就是WindowOperator内部的Context,这里间接调用的Trigger#onEventTime()方法:

    /**
     * 每当Watermark更新,就会由InternalTimeServiceManager通知每个InternalTimerServiceImpl实例,
     * InternalTimerServiceImpl会通过Triggerable接口定义的方法,通知WindowOperator
     */
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        // 前面判断不满足Trigger触发条件而注册的Timer,定时就是Window的max timestamp。
        // 如果time等于Window的max timestamp,说明Watermark已经达到Window触发条件,直接返回“FIRE事件”;
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
        TriggerResult.CONTINUE;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    最后,根据Trigger的判定结果,决定这个Window里的数据到底该咋处理…

    整体流程总结:

    • ① Window会“累积”StreamRecord,并将其保存到窗口状态中。

    • ② 将当前的StreamRecord交给Trigger,判断是否满足触发器的触发时机。当前Watermark >=(当前Window的EndTime - 1ms),说明已经达到触发条件,于是返回“FIRE事件”

    • ③ 前一步判断不满足Trigger触发条件,就向InternalTimerService注册内部专用的Timer,定时为:max timestamp,也就是当前Window的EndTime - 1ms。然后返回“CONTINUE事件”

    • ④ 当WindowOperator中的Watermark更新时,InternalTimeServiceManager会通过InternalTimerServiceImpl实例,借助WindowOperator实现的“通知接口”Triggerable,将更新的Watermark通知给WindowOperator。

    • ⑤ WindowOperator收到“Watermark变更”的消息后,拿着timestamp再一次的让Trigger判断:此时是否已经满足了触发器条件

    • ⑥ 如果这个timestamp正好等于当初Timer设置的定时时长(当前Window的EndTime - 1ms),说明此时由于Watermark更新,推动着Window里的数据正好可以触发Trigger,就返回“FIRE事件”;否则,返回“CONTINUE事件”

    • ⑦ 根据Trigger的判断结果,决定Window中的数据的“去留问题”

  • 相关阅读:
    Find the Maximum - 题解【思维,贪心】
    预算有限?如何挑选经济适用的ERP系统?
    ES6简介
    正则表达式
    如何使用 arrayList.removeAll(Collection<?> c)?
    为什么有的考生在提前批面试中会“沟通不好”?
    图像配准之图像重采样
    【iOS开发】(六)react Native 路由嵌套传参与框架原理(完)20240423
    Redis从入门到放弃(5):事务
    kali的三层镜像是什么意思
  • 原文地址:https://blog.csdn.net/qq_36299025/article/details/127614817