• Flink入门系列06-window


    窗口(window)概念

    窗口,就是把无界的数据流,依据一定的规则划分成一段一段的有界数据流。既然划分为有界数据流,通常都是为了”聚合“。
    在这里插入图片描述
    Keyedwindow 重要特性:任何一个窗口,都绑定在自己所属的 key 上,不同 key 的数据肯定不会划分到相同的窗口中去。

    窗口类型

    • 滚动窗口
    • 滑动窗口
    • 会话窗口:没有固定的窗口长度,也没有固定的滑动步长,而是根据数据流中前后两个事件的时间间隔是否超出阈值来划分。

    窗口计算 API 模板

    • KeyedWindows

      stream
      	.keyBy(...) 
      	.window(...)    // required
      	[.trigger(...)] // optional
      	[.evictor(...)] // optional
      	[.allowedLateness(...)]    // optional
      	[.sideOutputLateData(...)] // optional
      	.reduce/aggregate/apply()  // required
      	[.getSideOutput(...)] // optional
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • NonKeyedWindows

      stream
      	.windowAll(...)    // required
      	[.trigger(...)]    // optional
      	[.evictor(...)]    // optional
      	[.allowedLateness(...)]    // optional
      	[.sideOutputLateData(...)] // optional
      	.reduce/aggregate/apply()  // required
      	[.getSideOutput(...)] // optional
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

    窗口计算 API 示例

    • 全局窗口

      // 全局 计数滚动窗口
      beanStream.countWindowAll(10)  // 10条数据一个窗口
              .apply(new AllWindowFunction<EventBean2, String, GlobalWindow>() {
                  @Override
                  public void apply(GlobalWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {
      
                  }
              });
      
      
      // 全局 计数滑动窗口
      beanStream.countWindowAll(10, 2); // 窗口长度为10条数据,滑动步长为2条数据
      /*.apply()*/
      
      
      // 全局 事件时间滚动窗口
      beanStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) // 窗口长度为30s的滚动窗口
              .apply(new AllWindowFunction<EventBean2, String, TimeWindow>() {
                  @Override
                  public void apply(TimeWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {
      
                  }
              });
      
      
      // 全局 事件时间滑动窗口
      beanStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))); // 窗口长度:30s,滑动步长:10s
      /*.apply()*/
      
      // 全局  事件时间会话窗口
      beanStream.windowAll(EventTimeSessionWindows.withGap(Time.seconds(30)));  // 前后两个事件的间隙超过30s就划分窗口
      /*.apply()*/
      
      // 全局  处理时间滚动窗口
      beanStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30)));
      
      
      // 全局  处理时间滑动窗口
      beanStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));
      
      // 全局  处理间会话窗口
      beanStream.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
    • Keyed 窗口

      KeyedStream<EventBean2, Long> keyedStream = beanStream.keyBy(EventBean2::getGuid);
      
      // Keyed 计数滚动窗口
      keyedStream.countWindow(10);
      
      
      // Keyed 计数滑动窗口
      keyedStream.countWindow(10, 2);
      
      
      // Keyed 事件时间滚动窗口
      keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)));
      
      
      // Keyed 事件时间滑动窗口
      keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)));
      
      
      // Keyed  事件时间会话窗口
      keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(30)));
      
      
      // Keyed  处理时间滚动窗口
      keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));
      
      
      // Keyed  处理时间滑动窗口
      keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));
      
      
      // Keyed  处理时间会话窗口
      keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32

    窗口聚合算子

    窗口聚合算子整体分为两类

    • 增量聚合算子,一次取一条数据,用聚合函数对中间累加器更新;窗口触发时,取累加器输出结果。
      如 min、max、minBy、maxBy、sum、reduce、aggregate
    • 全量聚合算子,数据”攒“在状态容器中,窗口触发时,把整个窗口的数据交给聚合函数。
      如 apply、process

    注意点

    1. window 和 windowAll 区别
      在 keyby 后数据分流,window是把不同的key分开聚合成窗口,而 windowall 则把所有的 key 都聚合起来,所以 windowall 的并行度只能为1,而 window 可以有多个并行度。

    2. 全量聚合算子 process 和 apply 区别
      apply 只能用于 window 之后

  • 相关阅读:
    LVS+Keepalived
    Seata
    ffmpeg录屏命令
    计算机毕设(附源码)JAVA-SSM京津冀畅游网设计
    MySQL修改密码&修改密码策略
    使用 Learner Lab - 使用 AWS Lambda 将图片写入 S3
    scrapy框架流程
    计算机毕业设计Java智慧书籍的网站(源码+系统+mysql数据库+lw文档)
    驱动人生深度扫描功能上线!使用感怎么样?
    LeetCode、3无重复最长子序列
  • 原文地址:https://blog.csdn.net/qq_17310871/article/details/126543741