• 【API篇】八、Flink窗口函数


    //窗口操作
    stream.keyBy(<key selector>)
           .window(<window assigner>)
           .aggregate(<window function>)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。

    在这里插入图片描述

    • 增量聚合来一条数据,计算一条数据,窗口触发的时候输出计算结果
    • 全窗口函数数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果

    1、增量聚合之ReduceFunction

    public class WindowReduceDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            env.socketTextStream("node01", 9527)
               .map(new WaterSensorMapFunction())
               .keyBy(r -> r.getId())
               // 设置滚动事件时间窗口
               .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
               .reduce(new ReduceFunction<WaterSensor>() {
    
                   @Override
                   public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                       System.out.println("调用reduce方法,value1=:"+value1 + ",value2=:"+value2);
                       return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()+value2.getVc());
                   }
               })
               .print();
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    运行,输入数据,查看控制台:

    在这里插入图片描述

    2、增量聚合之AggregateFunction

    上面使用ReduceFunction的限制是,输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致,AggregateFunction则没有这个限制。AggregateFunction接口有四个方法:

    • createAccumulator:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
    • add:将输入的元素添加到累加器中。
    • getResult:从累加器中提取聚合的输出结果。
    • merge:合并两个累加器,并将合并后的状态作为一个累加器返回

    AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果

    public class WindowAggregateDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());    //自定义的实现类,String转自定义对象WaterSensor
    
    
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
    
            // 1. 窗口分配器
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
            SingleOutputStreamOperator<String> aggregate = sensorWS
                    .aggregate(
                            new AggregateFunction<WaterSensor, Integer, String>() {
                                @Override
                                public Integer createAccumulator() {
                                    System.out.println("创建累加器");
                                    return 0;
                                }
    							
    							//value即输入的数据,accumulator即之前的计算结果
                                @Override
                                public Integer add(WaterSensor value, Integer accumulator) {
                                    System.out.println("调用add方法,value="+value);
                                    return accumulator + value.getVc();
                                }
    
                                @Override
                                public String getResult(Integer accumulator) {
                                    System.out.println("调用getResult方法");
                                    return accumulator.toString();
                                }
    
                                @Override
                                public Integer merge(Integer a, Integer b) {
                                    System.out.println("调用merge方法");
                                    return null;
                                }
                            }
                    );
            
            aggregate.print();
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    运行,输入数据,查看控制台:

    在这里插入图片描述

    3、全窗口函数full window functions

    全窗口函数,即数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果Flink全窗口函数有两种,第一种为apply方法下的:

    stream
        .keyBy(<key selector>)
        .window(<window assigner>)
        .apply(new MyWindowFunction());
    
    • 1
    • 2
    • 3
    • 4

    传入一个WindowFunction的实现类,该方法已被第二种ProcessWindowFunction全覆盖,因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”(Context),通过这个上下文对象,可以获取窗口对象、窗口处理时间、事件时间水位线

    public class WindowProcessDemo {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
    
            // 1. 窗口分配器
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
            SingleOutputStreamOperator<String> process = sensorWS
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                            	/**
                            	* 全窗口函数计算逻辑,窗口结束时触发才调用一次
                            	* s 分组的key
                            	* context 上下文对象
                            	* elements 窗口内存的所有数据
                            	* out 采集器对象
                            	*/
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long count = elements.spliterator().estimateSize();
                                    long windowStartTs = context.window().getStart();
                                    long windowEndTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    );
    
            process.print();
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    效果:

    在这里插入图片描述

    在这里插入图片描述

    4、增量聚合函数搭配全窗口函数

    可以看出,增量和全窗口各有好处:

    • 增量聚合下,来一条计算一条,只存储中间计算结果,占用空间少
    • 全窗口函数则是可以通过上下文对象来实现灵活的功能

    像同时拥有两者的优点,可以调用aggregate方法的另一个重载方法:

    在这里插入图片描述

    // ReduceFunction与WindowFunction结合
    public <R> SingleOutputStreamOperator<R> reduce(
            ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) 
    
    // ReduceFunction与ProcessWindowFunction结合
    public <R> SingleOutputStreamOperator<R> reduce(
            ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)
    
    // AggregateFunction与WindowFunction结合
    public <ACCVR> SingleOutputStreamOperator<R> aggregate(
            AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)
    
    // AggregateFunction与ProcessWindowFunction结合
    public <ACCVR> SingleOutputStreamOperator<R> aggregate(
            AggregateFunction<TACCV> aggFunction,
            ProcessWindowFunction<VRKW> windowFunction)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    此时:

    • 基于第一个参数,即增量聚合函数,来处理数据,来一条聚合一条
    • 窗口触发后,调用第二个参数的处理逻辑,此时,把增量聚合的结果(只有一条数据)再传递给全窗口函数,也就是说全窗口的Iterable<> elements,长度为1,注意全窗口不再缓存所有数据
    • 经过全窗口,执行处理和包装,再输出
    public class WindowAggregateAndProcessDemo {
    
        public static void main(String[] args) throws Exception {
        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
            
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
    
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
    
            // 1. 窗口分配器
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    
    		//sensorWS.reduce()   //也可以传两个
    
            SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                    new MyAgg(),
                    new MyProcess()
            );
    
            result.print();
    
            env.execute();
        }
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    public  class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{
    
            @Override
            public Integer createAccumulator() {
                System.out.println("创建累加器");
                return 0;
            }
    
    
            @Override
            public Integer add(WaterSensor value, Integer accumulator) {
                System.out.println("调用add方法,value="+value);
                return accumulator + value.getVc();
            }
    
            @Override
            public String getResult(Integer accumulator) {
                System.out.println("调用getResult方法");
                return accumulator.toString();
            }
    
            @Override
            public Integer merge(Integer a, Integer b) {
                System.out.println("调用merge方法");
                return null;
            }
        }
    
    // 全窗口函数的输入类型 = 增量聚合函数的输出类型
    public  class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{
    
        @Override
        public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            long startTs = context.window().getStart();
            long endTs = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
            long count = elements.spliterator().estimateSize();
    
            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    注意,二者搭配时,根据前面分析,可以知道,必有:增量聚合函数的输出类型 = 全窗口函数的输入类型

    5、会话窗口动态获取间隔值

    到此,窗口API需要的窗口分配器(见上一篇)和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口,但应该有以下这些:

    • 时间滚动窗口
    • 时间滑动窗口
    • 时间会话窗口
    • 计数滚动窗口
    • 计数滑动窗口

    这里再记录下时间会话窗口+动态获取会话间隔:

    public class WindowSessionDemo {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("node01", 9527)
                    .map(new WaterSensorMapFunction());
                    
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
    
            // 1. 窗口分配器
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t -> t.getTs() * 1000L));
    
            SingleOutputStreamOperator<String> process = sensorWS
                    .process(
                            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
               
                                @Override
                                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                    long count = elements.spliterator().estimateSize();
                                    long windowStartTs = context.window().getStart();
                                    long windowEndTs = context.window().getEnd();
                                    String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                    String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                                }
                            }
                    );
    
            process.print();
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    来一条数据,根据这条数据获取一个值做为会话间隔,到达这个间隔前,下条数据到来了,则会话间隔又成了另一个值,动态的。运行:

    在这里插入图片描述

    可以看到,会话间隔动态获取,到达间隔时下条数据还没来,则结束本窗户,窗口口结束时触发才调用一次process,和分析的一致。最后补充一点,展开demo代码里的Lambda表达式,其实是一个抓取会话间隔的方法,定义了会话窗口间隔的获取逻辑。

    在这里插入图片描述

    再贴个计数滑动窗口:

    在这里插入图片描述

    6、触发器和移除器

    触发器主要是用来控制窗口什么时候触发计算,即什么时候执行窗口函数

    //基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
    stream.keyBy(...)
           .window(...)
           .trigger(new MyTrigger())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    移除器主要用来定义移除某些数据的逻辑

    基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

    stream.keyBy(...)
           .window(...)
           .evictor(new MyEvictor())
    
    
    • 1
    • 2
    • 3
    • 4

    Flink提供的几个窗口,比如滑动、滚动等,都有对触发器和移除器的默认实现,不用自定义。

    7、补充

    窗口的划分:

    • 窗口开始时间start是窗口长度的整数倍,向下取整

    在这里插入图片描述
    在这里插入图片描述

    • 窗口结束时间是start+窗口长度

    在这里插入图片描述
    在这里插入图片描述

    • 窗口是左闭右开,因为属于本窗口的最大时间戳为end-1

    在这里插入图片描述

    • 窗口的生命周期,创建是属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
    • 窗口的销毁是时间的进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认0)
    • 窗口什么时候触发输出:当时间进展 >= 窗口的最大时间戳(end -1ms)
  • 相关阅读:
    如何利用Python实现熵权法?
    BSAD检验比特币泡沫生成的时间点
    nodejs中对es6语法规范讲解
    基于JavaGUI实现的学生点名系统
    Perl 中的循环结构
    Web APIs(正则表达式)
    react基础知识3
    【Spring】 Spring中的IoC(控制反转)
    一文拿捏Spring事务之、ACID、隔离级别、失效场景
    KubeVela 1.3 发布:开箱即用的可视化应用交付平台,引入插件生态、权限认证、版本化等企业级新特性
  • 原文地址:https://blog.csdn.net/llg___/article/details/133999511