• ProcessWindowFunction 结合自定义触发器的陷阱


    背景:

    flink中常见的需求如下:统计某个页面一天内的点击率,每10秒输出一次,我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢?如果这样实现问题是什么呢?

    ProcessWindowFunction 结合自定义触发器实现统计点击率

    关键代码:
    在这里插入图片描述
    在这里插入图片描述
    完整代码参见:

    package wikiedits.func;
    
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
    import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import wikiedits.func.model.KeyCount;
    
    
    
    public class ProcessWindowFunctionAndTiggerDemo {
    
        public static void main(String[] args) throws Exception {
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 使用处理时间
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
            env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/windowtrigger"));
    
            // 并行度为1
            env.setParallelism(1);
            // 设置数据源,一共三个元素
            DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
                @Override
                public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                    int xxxNum = 0;
                    int yyyNum = 0;
                    for (int i = 1; i < Integer.MAX_VALUE; i++) {
                        // 只有XXX和YYY两种name
                        String name = (0 == i % 2) ? "XXX" : "YYY";
                        // 更新aaa和bbb元素的总数
                        if (0 == i % 2) {
                            xxxNum++;
                        } else {
                            yyyNum++;
                        }
                        // 使用当前时间作为时间戳
                        long timeStamp = System.currentTimeMillis();
                        // 将数据和时间戳打印出来,用来验证数据
                        if(xxxNum % 2000==0){
                            System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,
                                    time(timeStamp), xxxNum, yyyNum));
                        }
                        // 发射一个元素,并且戴上了时间戳
                        ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
                        // 每发射一次就延时1秒
                        Thread.sleep(1);
                    }
                }
    
                @Override
                public void cancel() {}
            });
    
            // 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction
            SingleOutputStreamOperator<String> mainDataStream = dataStream
                    // 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种
                    .keyBy(value -> value.f0)
                    // 5秒一次的滚动窗口
                    .timeWindow(Time.minutes(5))
                    // 10s触发一次计算,更新统计结果
                    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                    // 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子
                    .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
                        // 自定义状态
                        private ValueState<KeyCount> state;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            // 初始化状态,name是myState
                            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
                        }
    
                        public void clear(Context context) {
                            ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                            contextWindowValueState.clear();
                        }
    
                        @Override
                        public void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,
                                Collector<String> collector) throws Exception {
                            // 从backend取得当前单词的myState状态
                            KeyCount current = state.value();
                            // 如果myState还从未没有赋值过,就在此初始化
                            if (current == null) {
                                current = new KeyCount();
                                current.key = s;
                                current.count = 0;
                            }
                            int count = 0;
                            // iterable可以访问该key当前窗口内的所有数据,
                            // 这里简单处理,只统计了元素数量
                            for (Tuple2<String, Integer> tuple2 : iterable) {
                                count++;
                            }
                            // 更新当前key的元素总数
                            current.count += count;
                            // 更新状态到backend
                            state.update(current);
    
                            ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));
                            KeyCount windowValue = contextWindowValueState.value();
                            if (windowValue == null) {
                                windowValue = new KeyCount();
                                windowValue.key = s;
                                windowValue.count = 0;
                            }
                            windowValue.count += count;
                            contextWindowValueState.update(windowValue);
    
                            // 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串
                            String value = String.format("window, %s, %s - %s, %d, windowStateCount :%d,   total : %d",
                                    // 当前key
                                    s,
                                    // 当前窗口的起始时间
                                    time(context.window().getStart()),
                                    // 当前窗口的结束时间
                                    time(context.window().getEnd()),
                                    // 当前key在当前窗口内元素总数
                                    count,
                                    // 当前key所在窗口的总数
                                    contextWindowValueState.value().count,
                                    // 当前key出现的总数
                                    current.count);
    
                            // 发射到下游算子
                            collector.collect(value);
                        }
                    });
    
            // 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据
            mainDataStream.print();
    
            env.execute("processfunction demo : processwindowfunction");
    
        }
    
    
    
        public static String time(long timeStamp) {
            return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
        }
    
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166

    这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的,不过这其中需要注意点的点是:
    每隔10s触发public void process(String s, Context context, Iterable> iterable, Collector collector)方法时,iterable对象是包含了一天的窗口内收到的所有消息,也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。
    到这里所引起的问题也自然而然的出来了:对于ProcessWindowFunction 实现而言,flink内部是通过ListState的形式保存窗口内收到的所有消息的,注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息,这会导致状态膨胀,想一下,一天内所有的消息都会当成状态保存起来,这对于状态后端的压力是有多大!这些保存在ListState中的消息只有在窗口结束后才会清理:具体参见WindowOperator.clearAllState,那有解决方案吗?使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗?请看下一篇文章

    参考文章:
    https://www.cnblogs.com/Springmoon-venn/p/13667023.html

  • 相关阅读:
    直接选择排序
    机器学习库Scikit-Learn
    Newsmy储能电源与您相约九州汽车生态博览
    R语言读文件“-“变成“.“
    【git】git使用教程
    Linux设备树OF操作函数
    小程序商城免费搭建之java商城 电子商务Spring Cloud+Spring Boot+二次开发+mybatis+MQ+VR全景+b2b2c
    python/C++二分查找库函数(lower_bound() 、upper_bound,bisect_left,bisect_right)
    sudo -u root whoami && ****无法用root权限执行后续命令的解决办法
    机器学习---支持向量机的初步理解
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/132724677