• 【入门Flink】- 08Flink时间语义和窗口概念


    Flink-Windows

    是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

    注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。【事件驱动,没有数据到达永远都不会创建窗口】

    1)窗口分类

    (1)按照驱动类型分

    (1)时间窗口

    时间窗口以时间点来定义窗口的开始(start)和结束(end),截取出的就是某一时间段的数据。

    (2)计数窗口

    计数窗口基于元素的个数截取数据,到达固定的个数时就触发计算并关闭窗口。

    (2)按照窗口分配数据的规则分类

    根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

    (1)滚动窗口(Tumbling Windows)

    滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是
    “首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口

    滚动窗口应用非常广泛,可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

    (2)滑动窗口(Sliding Windows)

    滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率

    滚动窗口也可以看作是一种特殊的滑动窗口一一窗口大小等于滑动步长(size=slide)
    滑动窗口适合计算结果更新频率非常高的场景。

    (3)会话窗口(Session Windows)

    会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。
    会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到
    来的时间间隔(gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,
    那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

    会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(session)

    在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。

    (4)全局窗口(Global Windows)

    “全局窗口”,这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时侯, 默认是不会做触发计算的,如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

    2)窗口 API

    (1)按键分区(Keyed)和非按键分区(Non-Keyed)

    (1)按键分区窗口(Keyed Windows)

    经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。

    stream.keyBy(...)
    .window(...)
    
    • 1
    • 2

    (2)非按键分区(Non-Keyed Windows)

    如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。

    stream.windowAll(...)
    
    • 1

    注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

    (2)窗口分配器(Window Assigners)和窗口函数(WindowFunctions)

    stream.keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(<window function>)
    
    • 1
    • 2
    • 3

    窗口分配器

    (1)时间窗口

    滚动处理时间窗口

    stream.keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .aggregate(...)
    
    • 1
    • 2
    • 3

    .of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

    滑动处理时间窗口

    stream.keyBy(...)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .aggregate(...)
    
    • 1
    • 2
    • 3

    滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

    处理时间会话窗口

    stream.keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
    .aggregate(...)
    
    • 1
    • 2
    • 3

    还可以调用 withDynamicGap()方法定义 session gap 的动态提取逻辑。

    滚动事件时间窗口

    stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)
    
    • 1
    • 2

    滑动事件时间窗口

    stream.keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .aggregate(...)
    
    • 1
    • 2
    • 3

    事件时间会话窗口

    stream.keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)
    
    • 1
    • 2

    (2)计数窗口

    滚动计数窗口

    stream.keyBy(...)
    .countWindow(10)
    
    • 1
    • 2

    滑动计数窗口

    stream.keyBy(...)
    .countWindow(10, 3)
    
    • 1
    • 2

    全局窗口

    stream.keyBy(...)
    .window(GlobalWindows.create());
    
    • 1
    • 2

    注意:使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

    窗口函数

    (1)增量聚合函数(ReduceFunction / AggregateFunction)

    归约函数(ReduceFunction)

    类似Reduce算子,只不过固定时间才会输出

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);
    
            stream.map(new WaterSensorMapFunction())
                    .keyBy(WaterSensor::getId)
                    // 设置滚动事件时间窗口
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                    .reduce(new ReduceFunction<WaterSensor>() {
                        @Override
                        public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                            System.out.println("调用reduce 方法,之前的结果:" + value1 + ",现在来的数据:" + value2);
                            return new WaterSensor(value1.getId(), System.currentTimeMillis(), value1.getVc() + value2.getVc());
                        }
                    })
                    .print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    聚合函数(AggregateFunction)

    ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样

    image-20231109192227819

    有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

    输入类型IN 就是输入流中元素的数据类型;累加器类型 ACC 是进行聚合的中间状态类型;而输出类型OUT是最终计算结果的类型。

    接口中有四个方法:

    • createAccumulator():创建一个累加器,为聚合创建了一个初始状态,每个聚合任务只会调用一次。
    • add():将输入的元素添加到累加器中。
    • getResult():从累加器中提取聚合的输出结果。
    • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

    AggregateFunction 的工作原理:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                    .map(new WaterSensorMapFunction());
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
    
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
            SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(
                    new AggregateFunction<WaterSensor, Integer, String>() {
                        @Override
                        public Integer createAccumulator() {
                            System.out.println("创建累加器");
                            return 0;
                        }
    
                        @Override
                        public Integer add(WaterSensor value, Integer accumulator) {
                            System.out.println(" 调用add方法,value=" + value);
                            return accumulator + value.getVc();
                        }
    
                        @Override
                        public String getResult(Integer accumulator) {
                            System.out.println("调用getResult方法");
                            return accumulator.toString();
                        }
    
                        @Override
                        public Integer merge(Integer a, Integer b) {
                            System.out.println("调用merge方法");
                            return null;
                        }
                    }
            );
            aggregate.print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    (2)全窗口函数(full window functions)

    基于全部的数据计算

    全窗口函数有两种:WindowFunction ProcessWindowFunction

    窗口函数(WindowFunction)

    基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

    stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());
    
    • 1
    • 2
    • 3
    • 4

    该类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

    不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用

    处理窗口函数(ProcessWindowFunction)

    ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。

    时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                    .map(new WaterSensorMapFunction());
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
            SingleOutputStreamOperator<String> process = sensorWS.process(
                    new ProcessWindowFunction<WaterSensor,
                            String, String, TimeWindow>() {
                        @Override
                        public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                            long count = elements.spliterator().estimateSize();
                            long windowStartTs = context.window().getStart();
                            long windowEndTs = context.window().getEnd();
                            String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                            String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
                            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);
                        }
                    }
            );
            process.print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    增量聚合和全窗口函数结合使用

    // ReduceFunction 与 WindowFunction 结合
    public <R> SingleOutputStreamOperator<R> reduce(
    ReduceFunction<T> reduceFunction,WindowFunction<TRKW>function)
    // ReduceFunction 与 ProcessWindowFunction 结合
    public <R> SingleOutputStreamOperator<R> reduce(
    ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)
        
    // AggregateFunction 与 WindowFunction 结合
    public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)
    // AggregateFunction 与 ProcessWindowFunction 结合
    public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,
    ProcessWindowFunction<VRKW>     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    结合使用

    public class WindowAggregateAndProcessDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                    .map(new WaterSensorMapFunction());
    
            KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数:
        /*
          增量聚合 Aggregate + 全窗口 process
          1、增量聚合函数处理数据: 来一条计算一条
          2、窗口触发时, 增量聚合的结果(只有一条)传递给全窗口函数
          3、经过全窗口函数的处理包装后,输出
    
          结合两者的优点:
          1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
          2、全窗口函数: 可以通过 上下文 实现灵活的功能
        */
            // sensorWS.reduce() //也可以传两个
            SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                    new MyAgg(),
                    new MyProcess()
            );
            result.print();
            env.execute();
        }
    
        public static class MyAgg implements AggregateFunction
    
                <WaterSensor, Integer, String> {
            @Override
            public Integer createAccumulator() {
                System.out.println("创建累加器");
                return 0;
            }
    
            @Override
            public Integer add(WaterSensor value, Integer accumulator) {
                System.out.println("调用 add 方法,value=" + value);
                return accumulator + value.getVc();
            }
    
            @Override
            public String getResult(Integer accumulator) {
                System.out.println("调用 getResult 方法");
                return accumulator.toString();
            }
    
            @Override
            public Integer merge(Integer a, Integer b) {
                System.out.println("调用 merge 方法");
                return null;
            }
        }
    
        // 全窗口函数的输入类型 = 增量聚合函数的输出类型
        public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {
            @Override
    
            public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyyMM-dd HH:mm:ss.SSS");
                long count = elements.spliterator().estimateSize();
                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    Flink-Time

    • Event Time:事件时间,一个是数据产生的时间(时间戳Timestamp)
    • Processing time:处理时间,数据真正被处理的时间

    image-20231108081425604

    事件时间在实际应用中更为广泛,从Flink 1.12版本开始,Flink已经将事件时间作为默认的时间语义

  • 相关阅读:
    Robust Lane Detection from Continuous Driving
    SpringBoot手动获取实例
    Blazor前后端框架Known-V1.2.15
    RabbitMQ二、RabbitMQ的六种模式
    2022年度新星,4款一见倾心的黑马软件,让你找不到拒绝的理由
    webassembly003 ggml GGML Tensor Library part-4 实现在浏览器端训练神经网络
    Java|学习|abstract ,接口 Interface , Object
    KBPC1010-ASEMI液压升降装置方案整流桥10A 1000V
    计算机网络一:因特网
    【LeetCode每日一题】——141.环形链表
  • 原文地址:https://blog.csdn.net/qq_43417581/article/details/134342227