• Flink 中的 Window (窗口)


    一、什么是Window?

    官方对Window的介绍:

    Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.         

    窗口是处理无界流的核心所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以计算处理。 

            通俗点说,就是按固定时间或长度将无限数据流切分成不同的窗口,我们就可以对窗口内截取的数据使用一些计算函数进行处理,从而得到一定范围内的统计结果。

    二、基本结构

            Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构,只有一点区别:keyed streams调用 keyBy(...)后再调用 window(...) , 而 non-keyed streams 只用直接调用 windowAll(...)。具体如下:

    1、Keyed Windows

    2、Non-Keyed Windows

    分析:

            使用 keyed stream 允许你的窗口计算由多个 task 并行(原始流会被分割为多个逻辑上的流),因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。

            但是对于 non-keyed stream,原始流不会被分割为多个逻辑上的流, 所以所有的窗口计算会被同一个 task 完成,也就是并行度为 1,会影响性能。

    三、Window分类

            Flink提供了多种窗口来满足大部分的使用场景。如:滚动窗口( Tumbling Window)、滑动窗口(Sliding Window)、会话窗口( Session Window)和 全局窗口(Global Windows)。

    1、滚动窗口(Tumbling Windows)

            滚动窗口是将数据分配到指定窗口中,滚动窗口的大小是固定的,且各自范围之间不重叠。可以在细分为滚动时间窗口滚动计数窗口

            使用场景:适用于按照指定的周期来统计指标。

            例如:指定滚动窗口的大小为 5 秒钟,那么每 5 秒钟就会有一个窗口被计算,并且创建一个新的窗口。

    代码使用:

    1. DataStream input = ...;
    2. // tumbling event-time windows
    3. input
    4. .keyBy()
    5. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    6. .();
    7. // tumbling processing-time windows
    8. input
    9. .keyBy()
    10. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    11. .();
    12. // daily tumbling event-time windows offset by -8 hours.
    13. input
    14. .keyBy()
    15. .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    16. .();

    时间间隔可以用 Time.milliseconds(x)Time.seconds(x)Time.minutes(x) 等来指定。

    2、滑动窗口(Sliding Windows)

            滑动窗口将数据分配到大小固定且允许相互重叠的桶中,这意味着每个数据有可能同时属于多个桶。窗口大小可以通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。可以在细分为时间滑动窗口计数滑动窗口

            使用场景是:根据指定的统计周期来计算指定窗口时间大小的指标。

            例如:每隔5秒钟,计算一次前10秒的数据(窗口大小为10,滑动距离为5,每5s得到一个新的窗口, 里面包含之前 10s到达的数据)。
            窗口大小 > 滑动距离时,窗口之间有重叠,前2个窗口是下图的window1和window2。
            窗口大小 = 滑动距离时,也就是滚动窗口了,前2个窗口是下图的window1和window3。
            窗口大小 < 滑动距离时,窗口之间不会重叠,前2个窗口是下图的window1和window4,这种设置会遗漏数据。

    代码使用:

    1. DataStream input = ...;
    2. // 滑动 event-time 窗口
    3. input
    4. .keyBy()
    5. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    6. .();
    7. // 滑动 processing-time 窗口
    8. input
    9. .keyBy()
    10. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    11. .();
    12. // 滑动 processing-time 窗口,偏移量为 -8 小时
    13. input
    14. .keyBy()
    15. .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    16. .();

    3、会话窗口(Session Windows)

            会话窗口把数据按活跃的会话分组。 与滚动窗口滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭(即在一段不活跃的间隔之后)。 会话窗口可以设置固定的会话间隔(session gap)定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

            会话窗口在一些常见的真实场景中非常有用,这些场景既不适合用滚动窗口,也不适用滑动窗口。

    代码使用:

    1. DataStream<T> input = ...;
    2. // 设置了固定间隔的 event-time 会话窗口
    3. input
    4. .keyBy(<key selector>)
    5. .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    6. .<windowed transformation>(<window function>);
    7. // 设置了动态间隔的 event-time 会话窗口
    8. input
    9. .keyBy(<key selector>)
    10. .window(EventTimeSessionWindows.withDynamicGap((element) -> {
    11. // 决定并返回会话间隔
    12. }))
    13. .<windowed transformation>(<window function>);
    14. // 设置了固定间隔的 processing-time session 窗口
    15. input
    16. .keyBy(<key selector>)
    17. .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    18. .<windowed transformation>(<window function>);
    19. // 设置了动态间隔的 processing-time 会话窗口
    20. input
    21. .keyBy(<key selector>)
    22. .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
    23. // 决定并返回会话间隔
    24. }))
    25. .<windowed transformation>(<window function>);

    4、全局窗口(Global Windows)

            全局窗口是将拥有相同 key 的所有数据分发到一个全局窗口。 窗口需要你指定了自定义的 trigger 时才有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

    代码使用:

    1. DataStream input = ...;
    2. input
    3. .keyBy()
    4. .window(GlobalWindows.create())
    5. .();

    四、例子

    1、滚动窗口例子

    需求:每隔5s统计一次,统计最近5s内单词出现的频次

    1. import org.apache.flink.api.common.functions.FlatMapFunction;
    2. import org.apache.flink.api.java.tuple.Tuple2;
    3. import org.apache.flink.streaming.api.datastream.DataStream;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    6. import org.apache.flink.streaming.api.windowing.time.Time;
    7. public class TestTumblingTimeWindows{
    8. public static void main(String[] args) throws Exception {
    9. //1.创建流环境
    10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    11. //2、获取数据
    12. DataStream source = env.socketTextStream("node1", 9000);
    13. DataStream> windowCounts = source
    14. .flatMap((FlatMapFunction>) (value, out) -> {
    15. for (String word : value.split("\\s")) {
    16. out.collect(Tuple2.of(word, 1));
    17. }
    18. })
    19. //.keyBy(0) //过时
    20. .keyBy(t -> t.f0)
    21. //滚动窗口
    22. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    23. .sum(1);
    24. windowCounts.print();
    25. env.execute("TestTumblingTimeWindows");
    26. }
    27. }

     2、滑动窗口例子

    需求:每隔5s统计一次,统计最近10s内单词出现的频次

    1. import org.apache.flink.api.common.functions.FlatMapFunction;
    2. import org.apache.flink.api.java.tuple.Tuple2;
    3. import org.apache.flink.streaming.api.datastream.DataStream;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    6. import org.apache.flink.streaming.api.windowing.time.Time;
    7. public class TestSlidingTimeWindows{
    8. public static void main(String[] args) throws Exception {
    9. //1.创建流环境
    10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    11. //2、获取数据
    12. DataStream source = env.socketTextStream("node1", 9000);
    13. DataStream> windowCounts = source
    14. .flatMap((FlatMapFunction>) (value, out) -> {
    15. for (String word : value.split("\\s")) {
    16. out.collect(Tuple2.of(word, 1));
    17. }
    18. })
    19. //.keyBy(0) //过时
    20. .keyBy(t -> t.f0)
    21. //滑动窗口
    22. .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
    23. .sum(1);
    24. windowCounts.print();
    25. env.execute("TestSlidingTimeWindows");
    26. }
    27. }

  • 相关阅读:
    【JavaWeb】第五章 jQuery(下篇)
    使用gateway对用户认证(用于确定用户是否登录)
    嵌入式中一篇搞定Cmake使用教程
    java基于微信小程序的学习打卡系统 uniapp 小程序
    ASR6500S系列LoRa SIP模块集成了RF前端和LoRa无线电收发器SX1262系列
    SpringBoot 刷新上下文4--处理ComponentScan
    软件包管理—源码包管理—源码包安装过程
    【node】nodemailer配置163、qq等邮件服务指南
    计算机网络学习心得
    关键词推广-关键词推广软件
  • 原文地址:https://blog.csdn.net/icanlove/article/details/126349532