• 【Flink】窗口(Window)


    窗口理解

    窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。

    对窗口的正确理解
    我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理
    在这里插入图片描述

    Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口

    窗口的分类

    按照驱动类型分

    时间窗口(Time Window)

    以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车

    计数窗口(Count Window)

    计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车

    按照窗口分配数据的规则分类

    滚动窗口(Tumbling Window)

    窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
    滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
    在这里插入图片描述

    DataStream<T> input = ...;
    
    // 滚动 event-time 窗口
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 滚动 processing-time 窗口
    input
        .keyBy(<key selector>)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
        .<windowed transformation>(<window function>);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    滑动窗口(Sliding Windows)

    滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。
    在这里插入图片描述

    DataStream<T> input = ...;
    
    // 滑动 event-time 窗口
    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 滑动 processing-time 窗口
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 滑动 processing-time 窗口,偏移量为 -8 小时
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    会话窗口(Session Windows)

    会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。
    在这里插入图片描述

    DataStream<T> input = ...;
    
    // 设置了固定间隔的 event-time 会话窗口
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>);
        
    // 设置了动态间隔的 event-time 会话窗口
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withDynamicGap((element) -> {
            // 决定并返回会话间隔
        }))
        .<windowed transformation>(<window function>);
    
    // 设置了固定间隔的 processing-time session 窗口
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>);
        
    // 设置了动态间隔的 processing-time 会话窗口
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
            // 决定并返回会话间隔
        }))
        .<windowed transformation>(<window function>);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    全局窗口

    这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。
    在这里插入图片描述

    DataStream<T> input = ...;
    
    input
        .keyBy(<key selector>)
        .window(GlobalWindows.create())
        .<windowed transformation>(<window function>);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    计数窗口

    计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法

    滚动计数窗口

    滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。

    stream.keyBy(...)
           .countWindow(10)
    
    • 1
    • 2
    滑动计数窗口

    与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。

    stream.keyBy(...)
           .countWindow(103)
    
    • 1
    • 2

    窗口函数(Window Functions)

    定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
    窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

    ReduceFunction

    ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

    DataStream<Tuple2<String, Long>> input = ...;
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .reduce(new ReduceFunction<Tuple2<String, Long>>() {
        //v1 和v2是 2个相同类型的输入参数
          public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
            return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
          }
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    AggregateFunction

    ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。

    /**
     * The accumulator is used to keep a running sum and a count. The {@code getResult} method
     * computes the average.
     */
    private static class AverageAggregate
        implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
      @Override
      public Tuple2<Long, Long> createAccumulator() {
        return new Tuple2<>(0L, 0L);
      }
    
      @Override
      public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
        return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
      }
    
      @Override
      public Double getResult(Tuple2<Long, Long> accumulator) {
        return ((double) accumulator.f0) / accumulator.f1;
      }
    
      @Override
      public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
      }
    }
    
    DataStream<Tuple2<String, Long>> input = ...;
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .aggregate(new AverageAggregate());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    接口中有四个方法:

    • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
    • add():将输入的元素添加到累加器中。
    • getResult():从累加器中提取聚合的输出结果。
    • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

    可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

    ProcessWindowFunction

    ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据

    public class WindowProcessDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());
            KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);
            WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
            sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                @Override
                public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {
                    // 上下文可以拿到window对象,还有其他东西:侧输出流 等等
                    long startTs = context.window().getStart();
                    long endTs = context.window().getEnd();
                    String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                    String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
    
                    long count = elements.spliterator().estimateSize();
    
                    out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);
                }
            }).print();
            env.execute();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    增量聚合和全窗口函数的结合使用

    在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
    我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

    // ReduceFunction与WindowFunction结合
    public <R> SingleOutputStreamOperator<R> reduce(
            ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) 
    
    // ReduceFunction与ProcessWindowFunction结合
    public <R> SingleOutputStreamOperator<R> reduce(
            ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)
    
    // AggregateFunction与WindowFunction结合
    public <ACCVR> SingleOutputStreamOperator<R> aggregate(
            AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)
    
    // AggregateFunction与ProcessWindowFunction结合
    public <ACCVR> SingleOutputStreamOperator<R> aggregate(
            AggregateFunction<TACCV> aggFunction,
            ProcessWindowFunction<VRKW> windowFunction)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  • 相关阅读:
    【SSM -MyBatis篇03】MyBatis Generator(MBG)配置属性详解(基于MyBatis3) - 逆向生成 - 配置MBG模板
    多线程并发或线程安全问题如何解决
    Vue2组件间通讯
    Python环境安装、Pycharm开发工具安装(IDE)
    公关世界杂志公关世界杂志社公关世界编辑部2022年第14期目录
    记录一次hive表中 string字符串写入int字段引起的小bug
    vue2升级vue3:vue2 vue-i18n 升级到vue3搭配VueI18n v9
    PHP 行事准则:allow_url_fopen 与 allow_url_include
    Python实现基于物品的协同过滤推荐算法构建电影推荐系统
    java计算机毕业设计基于springboot+vue+elementUI的结婚婚庆婚纱拍摄管理系统(前后端分离)
  • 原文地址:https://blog.csdn.net/weixin_38996079/article/details/134494922