• flink的AggregateFunction,merge方法作用范围


    背景

    AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现

    AggregateFunction.merge方法调用时机

    AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
    在这里插入图片描述

    对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并

    public void processElement(StreamRecord<IN> element) throws Exception {
            final Collection<W> elementWindows =
                    windowAssigner.assignWindows(
                            element.getValue(), element.getTimestamp(), windowAssignerContext);
     
            // if element is handled by none of assigned elementWindows
            boolean isSkippedElement = true;
     
            final K key = this.<K>getKeyedStateBackend().getCurrentKey();
     
            if (windowAssigner instanceof MergingWindowAssigner) {
                MergingWindowSet<W> mergingWindows = getMergingWindowSet();
     
                for (W window : elementWindows) {
     
                    // adding the new window might result in a merge, in that case the actualWindow
                    // is the merged window and we work with that. If we don't merge then
                    // actualWindow == window
                    W actualWindow =
                            mergingWindows.addWindow(
                                    window,
                                    new MergingWindowSet.MergeFunction<W>() {
                                        @Override
                                        public void merge(
                                                W mergeResult,
                                                Collection<W> mergedWindows,
                                                W stateWindowResult,
                                                Collection<W> mergedStateWindows)
                                                throws Exception {
     
                                            triggerContext.key = key;
                                            triggerContext.window = mergeResult;
     
                                            triggerContext.onMerge(mergedWindows);
     
                                            for (W m : mergedWindows) {
                                                triggerContext.window = m;
                                                triggerContext.clear();
                                                deleteCleanupTimer(m);
                                            }
     
                                            // 合并窗口的状态
                                            windowMergingState.mergeNamespaces(
                                                    stateWindowResult, mergedStateWindows);
                                        }
                                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    继续查看AbstractHeapMergingState.mergeNamespaces方法,

    public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
        if (sources == null || sources.isEmpty()) {
            return; // nothing to do
        }
     
        final StateTable<K, N, SV> map = stateTable;
     
        SV merged = null;
     
        // merge the sources
        for (N source : sources) {
     
            // get and remove the next source per namespace/key
            SV sourceState = map.removeAndGetOld(source);
     
            if (merged != null && sourceState != null) {
                //此处合并状态并调用AggregateFunction.merge方法
                merged = mergeState(merged, sourceState);
            } else if (merged == null) {
                merged = sourceState;
            }
        }
     
        // merge into the target, if needed
        if (merged != null) {
            map.transform(target, merged, mergeTransformation);
        }
    }
     
    //真正调用AggregateFunction.merge方法合并自定义的状态
    @Override
    protected ACC mergeState(ACC a, ACC b) {
        return aggregateTransformation.aggFunction.merge(a, b);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错

  • 相关阅读:
    强化学习调度环境:析取图和离散事件仿真
    spring6-资源操作:Resources
    【树莓派不吃灰】搭建sqlite3数据库环境
    【2022河南萌新联赛第(四)场:郑州轻工业大学】【部分思路题解+代码解析】
    SolidWorks如何绘制环形波纹垫片
    Qt应用开发(基础篇)——按钮基类 QAbstractButton
    数据结构与算法(三) 深度优先搜索
    Windows RDP远程桌面优化
    流媒体直播播放协议:HLS、RTMP、HTTP-FLV
    mysql数据库开放对外访问
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/134277215