• Flink系列文档-(YY12)-窗口计算


    1 窗口基本概念

    1.1 概述

    窗口,就是把无界的数据流,依据一定规则划分成一段一段的有界数据流来计算;

    既然划分成有界数据段,通常都是为了"聚合";

    Keyedwindow重要特性:任何一个窗口,都绑定在自己所属的key上;不同key的数据肯定不会划分到相同窗口中去! 

    1.2 窗口分类

    滚动窗口 

    滑动窗口 

    会话窗口

     没有固定的窗口长度,也没有固定的滑动步长,而是根据数据流中前后两个事件的时间间隔是否超出阈值(session gap)来划分;

    1.3 窗口函数模板

      KeyedWindows

    stream

           .keyBy(...)               <-  keyed versus non-keyed windows

           .window(...)              <-  required: "assigner"

          [.trigger(...)]            <-  optional: "trigger" (else default trigger)

          [.evictor(...)]            <-  optional: "evictor" (else no evictor)

          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)

          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)

           .reduce/aggregate/apply()      <-  required: "function"

          [.getSideOutput(...)]      <-  optional: "output tag"

      NonKeyedWindows

    stream

           .windowAll(...)           <-  required: "assigner"

          [.trigger(...)]            <-  optional: "trigger" (else default trigger)

          [.evictor(...)]            <-  optional: "evictor" (else no evictor)

          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)

          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)

           .reduce/aggregate/apply()      <-  required: "function"

          [.getSideOutput(...)]      <-  optional: "output tag"

    代码模板示例 

    1. package com.blok2;
    2. import org.apache.flink.api.common.RuntimeExecutionMode;
    3. import org.apache.flink.api.java.functions.KeySelector;
    4. import org.apache.flink.configuration.Configuration;
    5. import org.apache.flink.streaming.api.datastream.AllWindowedStream;
    6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    7. import org.apache.flink.streaming.api.datastream.KeyedStream;
    8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    10. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction;
    11. import org.apache.flink.streaming.api.windowing.assigners.*;
    12. import org.apache.flink.streaming.api.windowing.time.Time;
    13. import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    14. import org.apache.flink.table.api.Tumble;
    15. /**
    16. * @Date: 22.12.4
    17. * @Author: Hang.Nian.YY
    18. * @qq: 598196583
    19. * @Tips: 学大数据 ,到多易教育
    20. * @Description:
    21. * 窗口分配API代码模板示例
    22. */
    23. public class _04_Windows {
    24. public static void main(String[] args) throws Exception {
    25. Configuration conf = new Configuration();
    26. conf.setInteger("rest.port", 9999);
    27. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    28. see.setParallelism(1);
    29. see.setRuntimeMode(RuntimeExecutionMode.BATCH);
    30. // 加载数据源
    31. DataStreamSource ds = see.readTextFile("data/dayao.txt");
    32. SingleOutputStreamOperator beans = ds.map(line -> {
    33. String[] arr = line.split(",");
    34. User user = new User(Integer.parseInt(arr[0]), arr[1], arr[2], Integer.parseInt(arr[3]));
    35. return user;
    36. }).returns(User.class);
    37. // ============= keyBy前
    38. // 1 指定时间语义 滚动窗口
    39. beans.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)));
    40. beans.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)));
    41. // 2 指定时间语义 滑动窗口
    42. // 参数一 size 参数2 步进
    43. beans.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(1)));
    44. beans.windowAll(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)));
    45. // 滚动计数
    46. beans.countWindowAll(100);
    47. // 滑动计数
    48. beans.countWindowAll(100, 20);
    49. //处理算子 ,keyBy
    50. KeyedStream keyed = beans.keyBy(new KeySelector() {
    51. @Override
    52. public String getKey(User user) throws Exception {
    53. return user.getCity();
    54. }
    55. });
    56. // ============= keyBy后
    57. // 处理时间 滚动
    58. // 处理时间 滑动
    59. // 事件时间 滚动
    60. // 事件时间 滑动
    61. keyed.window(TumblingProcessingTimeWindows.of(Time.seconds(2))) ;
    62. keyed.window(TumblingEventTimeWindows.of(Time.seconds(2))) ;
    63. keyed.window(SlidingEventTimeWindows.of(Time.seconds(10) , Time.seconds(2)));// 窗口大小 步进
    64. keyed.window(SlidingProcessingTimeWindows.of(Time.seconds(10) , Time.seconds(2))) ;
    65. keyed.countWindow(10) ; // 计数滚动
    66. keyed.countWindow(10 , 2) ; // 计数滑动
    67. // 开窗口以后 触发窗口内的数据的聚合操作
    68. // beans.print() ;
    69. see.execute();
    70. }
    71. }

    2 窗口聚合计算

  • 相关阅读:
    11.FrugalGPT
    桌面运维命令
    WinRAR广告屏蔽办法
    NLP(6)--Diffusion Model
    ChatGPT 之 PPT 大师
    [Shell详解-4]:echo命令、printf命令
    「设计模式」六大原则之五:依赖倒置原则小结
    2024全国水科技大会暨新能源及电子行业废水论坛(十一)
    最小生成树算法
    async/await 贴脸输出,这次你总该明白了
  • 原文地址:https://blog.csdn.net/qq_37933018/article/details/128170883