• Flink 窗口


    介绍:流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段,其分为两种类型:1、时间窗口,2:计数窗口

    一、时间窗口

    时间窗口根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

    1.1、滚动窗口(Tumbling Windows)

    介绍:将数据依据固定的窗口长度(时间)对数据进行切片
    特点:时间对齐,窗口长度固定,没有重叠

    package com.xx.window;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    /**
     * @author aqi
     * @since 2023/8/30 15:46
     */
    @Slf4j
    public class WindowReduceDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
            SingleOutputStreamOperator<Demo> sensorDS = env
                    .socketTextStream("127.0.0.1", 7777)
                    .map(new DemoMapFunction());
    
            // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
            WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                    .keyBy(Demo::getId)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
            // 聚合(也可以使用别的算子进行聚合)
            SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                    (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
            );
            // 打印计算结果
            reduce.print();
            // 触发计算
            env.execute();
        }
    }
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Demo {
    
        private String id;
    
        private Long value;
    }
    
    class DemoMapFunction implements MapFunction<String, Demo> {
    
        @Override
        public Demo map(String value) {
            String[] datas = value.split(",");
            return new Demo(datas[0], Long.valueOf(datas[1]));
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    1.2、滑动窗口(Sliding Windows)

    介绍:滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
    特点:时间对齐,窗口长度固定,有重叠

    package com.xx.window;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    /**
     * @author aqi
     * @since 2023/8/30 15:46
     */
    @Slf4j
    public class WindowReduceDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
            SingleOutputStreamOperator<Demo> sensorDS = env
                    .socketTextStream("127.0.0.1", 7777)
                    .map(new DemoMapFunction());
    
            // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
    //        WindowedStream sensorWS = sensorDS
    //                .keyBy(Demo::getId)
    //                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
            // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据)
            WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                    .keyBy(Demo::getId)
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
    
            // 聚合(也可以使用别的算子进行聚合)
            SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                    (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
            );
            // 打印计算结果
            reduce.print();
            // 触发计算
            env.execute();
        }
    }
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Demo {
    
        private String id;
    
        private Long value;
    }
    
    class DemoMapFunction implements MapFunction<String, Demo> {
    
        @Override
        public Demo map(String value) {
            String[] datas = value.split(",");
            return new Demo(datas[0], Long.valueOf(datas[1]));
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    1.3、会话窗口(Session Windows)

    介绍:由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
    特点:时间无对齐

    package com.xx.window;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    
    /**
     * @author aqi
     * @since 2023/8/30 15:46
     */
    @Slf4j
    public class WindowReduceDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
            SingleOutputStreamOperator<Demo> sensorDS = env
                    .socketTextStream("127.0.0.1", 7777)
                    .map(new DemoMapFunction());
    
            // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
    //        WindowedStream sensorWS = sensorDS
    //                .keyBy(Demo::getId)
    //                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
    
            // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据)
    //        WindowedStream sensorWS = sensorDS
    //                .keyBy(Demo::getId)
    //                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
    
    
            // 会话窗口(超时间隔5s)
            WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                    .keyBy(Demo::getId)
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
    
    
            // 聚合(也可以使用别的算子进行聚合)
            SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce(
                    (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue())
            );
            // 打印计算结果
            reduce.print();
            // 触发计算
            env.execute();
        }
    }
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Demo {
    
        private String id;
    
        private Long value;
    }
    
    class DemoMapFunction implements MapFunction<String, Demo> {
    
        @Override
        public Demo map(String value) {
            String[] datas = value.split(",");
            return new Demo(datas[0], Long.valueOf(datas[1]));
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    1.4、总结

    滚动窗口:TumblingProcessingTimeWindows.of(Time.seconds(10))
    滑动窗口:SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))
    会话窗口:ProcessingTimeSessionWindows.withGap(Time.seconds(5))

    二、计数窗口

    和时间窗口类似,同样也分为三种,使用方法也基本相同

    1.1、滚动窗口(Tumbling Windows)

    窗口长度=5个元素

    sensorKs.countWindow(5);
    
    • 1

    1.2、滑动窗口(Sliding Windows)

    窗口长度=5个元素,滑动步长=2个元素

    sensorKs.countWindow(5, 2);
    
    • 1

    1.3、会话窗口(Session Windows)

    三、窗口触发方式

    3.1、增量聚合

    来一条数据,计算一条数据,窗口触发的时候输出计算结果
    函数:reduce、aggregate等,除了process都是增量函数

    3.2、全窗口函数

    数据来了不计算,存储起来,窗口触发的时候,计算并输出结果,并且可以获取到窗口信息、上下文信息等,灵活性非常的强
    函数:process

    package com.xx.window;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.time.DateFormatUtils;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    /**
     * @author aqi
     * @since 2023/8/30 15:46
     */
    @Slf4j
    public class WindowReduceDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送)
            SingleOutputStreamOperator<Demo> sensorDS = env
                    .socketTextStream("127.0.0.1", 7777)
                    .map(new DemoMapFunction());
    
            // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次)
            WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS
                    .keyBy(Demo::getId)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
            SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<Demo, String, String, TimeWindow>() {
    
                /**
                 * 全窗口函数的计算逻辑,窗口触发时才会调用一次,统一计算窗口的所有数据
                 * @param s 分组的key
                 * @param context 上下文
                 * @param elements 存的数据
                 * @param out 采集器
                 */
                @Override
                public void process(String s, ProcessWindowFunction<Demo, String, String, TimeWindow>.Context context, Iterable<Demo> elements, Collector<String> out) {
                    long start = context.window().getStart();
                    long end = context.window().getEnd();
    
                    String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                    String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
    
                    long count = elements.spliterator().estimateSize();
    
                    out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements);
                }
            });
    
            // 打印计算结果
            process.print();
            // 触发计算
            env.execute();
        }
    }
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Demo {
    
        private String id;
    
        private Long value;
    }
    
    class DemoMapFunction implements MapFunction<String, Demo> {
    
        @Override
        public Demo map(String value) {
            String[] datas = value.split(",");
            return new Demo(datas[0], Long.valueOf(datas[1]));
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    3.3、增量函数和全窗口函数组合使用

    package com.xx.window;
    
    import com.xx.entity.WaterSensor;
    import com.xx.functions.WaterSensorMapFunction;
    import org.apache.commons.lang3.time.DateFormatUtils;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    /**
     * @author aqi
     * @since 2023/8/30 15:46
     */
    public class WindowAggregateAndProcessDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("127.0.0.1", 7777)
                    .map(new WaterSensorMapFunction());
    
    
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
    
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
            SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                    // 第一个参数:输入数据的类型,第二个参数:累加器的类型,存储的中间计算结果的类型,第三个参数:输出的类型
                    new AggregateFunction<WaterSensor, Integer, String>() {
                        @Override
                        public Integer createAccumulator() {
                            System.out.println("初始化累加器");
                            return null;
                        }
    
                        @Override
                        public Integer add(WaterSensor value, Integer accumulator) {
                            if (accumulator == null) {
                                accumulator = 0;
                            }
                            Integer add = value.getVc() + accumulator;
                            System.out.println("调用add方法,累加结果:" + add);
                            return add;
                        }
    
                        @Override
                        public String getResult(Integer accumulator) {
                            System.out.println("获取最终结果");
                            return accumulator.toString();
                        }
    
                        @Override
                        public Integer merge(Integer a, Integer b) {
                            System.out.println("调用merge方法");
                            return null;
                        }
                    }, new ProcessWindowFunction<String, String, String, TimeWindow>() {
                        @Override
                        public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                            long start = context.window().getStart();
                            long end = context.window().getEnd();
    
                            String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
                            String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
    
                            long count = elements.spliterator().estimateSize();
    
                            out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements);
                        }
                    });
    
            result.print();
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
  • 相关阅读:
    【EMQX 5.0】2.2.4 Authentication 认证
    C++ 20 新特性简介
    TensorRT c++部署onnx模型
    dropbear-ssh2
    【广州华锐互动】3D全景虚拟旅游在文旅行业的应用场景
    2022国赛C题解析
    Node详细解释[带你快速入门Node](2)
    22年11月工作笔记整理(前端)
    多场景,跨平台测试,Neptune CHT-C助力用户打造安全的座舱环境
    架构的未来——End
  • 原文地址:https://blog.csdn.net/progammer10086/article/details/132581847