• Flink中的时间和窗口操作


    本专栏案例代码和数据集链接: https://download.csdn.net/download/shangjg03/88477960

    1.窗口概念

    在大多数场景下,我们需要统计的数据流都是无界的,因此我们无法等待整个数据流终止后才进行统计。通常情况下,我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计一次过去一小时内所有商品的点击量;或者每发生1000次点击后,都去统计一下每个商品点击率的占比。在 Flink 中,我们使用窗口 (Window) 来实现这类功能。按照统计维度的不同,Flink 中的窗口可以分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。

    2. 窗口类型

    2.1 flink支持两种划分窗口的方式(time和count)   

     如果根据时间划分窗口,那么它就是一个time-window    如果根据数据划分窗口,那么它就是一个count-window

    2.2 flink支持窗口的两个重要属性(size和interval)    

    如果size=interval,那么就会形成tumbling-window(无重叠数据)    

    如果size>interval,那么就会形成sliding-window(有重叠数据)    

    如果size

    2.3 通过组合可以得出四种基本窗口:

    `time-tumbling-window` 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5)) 

    `time-sliding-window`  有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3)) 

    `count-tumbling-window`无重叠数据的数量窗口,设置方式举例:

    countWindow(5)    

    `count-sliding-window` 有重叠数据的数量窗口,设置方式举例:

    countWindow(5,3)

    flink支持在stream上的通过key去区分多个窗口

    3. 时间窗口-Time Windows

    Time Windows 用于以时间为维度来进行数据聚合,具体分为以下四类:

    3.1 滚动窗口-Tumbling Windows

    滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔1小时统计过去1小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下:

    d25c2c54bb5d4470afe5de17a62e5889.png

    这里我们以词频统计为例,给出一个具体的用例,代码如下:

    1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. // 接收socket上的数据输入
    3. DataStreamSource<String> streamSource = env.socketTextStream("hadoop001"9999"\n"3);
    4. streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    5.     @Override
    6.     public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
    7.         String[] words = value.split("\t");
    8.         for (String word : words) {
    9.             out.collect(new Tuple2<>(word, 1L));
    10.         }
    11.     }
    12. }).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒统计一次每个单词出现的数量
    13. env.execute("Flink Streaming");

    测试结果如下:

    76af923dda5d440bb351dbebaf701fe5.png

    3.2 滑动窗-Sliding Windows

    用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天可以分为 240 个窗口。图示如下:

    63cee53a69aa4693b93f0d163279988d.png

    可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下:

    1. // 每隔3秒统计一次过去1分钟内的数据
    2. timeWindow(Time.minutes(1),Time.seconds(3))

    3.3 会话窗口-Session Windows

    当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过 Session Windows 来进行实现。

    7bc82d68beea4f2d8e5947e81a74c2fa.png

    具体的实现代码如下:

    1. // 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
    2. window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
    3. // 以事件时间为衡量标准    
    4. window(EventTimeSessionWindows.withGap(Time.seconds(10)))

    3.4 全局窗口-Global Windows

    最后一个窗口是全局窗口, 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。

    464d3acb2d5e4561a34154544ba358e8.png

    这里继续以上面词频统计的案例为例,示例代码如下:

    1. // 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
    2. window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();

    4. 数量窗口-Count Windows

    Count Windows 用于以数量为维度来进行数据聚合,同样也分为滚动窗口和滑动窗口,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下:

    1. // 滚动计数窗口,每1000次点击则计算一次
    2. countWindow(1000)
    3. // 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
    4. countWindow(1000,10)

    实际上计数窗口内部就是调用的我们上一部分介绍的全局窗口来实现的,其源码如下:

    1. public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
    2. return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    3. }
    4. public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    5. return window(GlobalWindows.create())
    6. .evictor(CountEvictor.of(size))
    7. .trigger(CountTrigger.of(slide));
    8. }

    5.案例分析

    189b462afbc04aecb0c7800e28ddb609.png

    5.1 Tumbling Time Window

    假如我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为滚动时间窗口(Tumbling Time Window)。翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。

    1. // 用户id和购买数量 stream
    2. val counts: DataStream[(Int, Int)] = ...
    3. val tumblingCnts: DataStream[(Int, Int)] = counts
    4. // 用userId分组
    5. .keyBy(0)
    6. // 1分钟的翻滚窗口宽度
    7. .timeWindow(Time.minutes(1))
    8. // 计算购买数量
    9. .sum(1)

    5.2  Sliding Time Window

    我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口。通过使用 DataStream API,我们可以这样实现:

    1. val slidingCnts: DataStream[(Int, Int)] = buyCnts
    2. .keyBy(0)
    3. .timeWindow(Time.minutes(1), Time.seconds(30))
    4. .sum(1)

    5.3 Tumbling Count Window

    当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。通过使用 DataStream API,我们可以这样实现:

    1. // Stream of (userId, buyCnts)
    2. val buyCnts: DataStream[(Int, Int)] = ...
    3. val tumblingCnts: DataStream[(Int, Int)] = buyCnts
    4. // key stream by sensorId
    5. .keyBy(0)
    6. // tumbling count window of 100 elements size
    7. .countWindow(100)
    8. // compute the buyCnt sum 
    9. .sum(1)

    5.4 Session Window

    在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。Session Window 的示例代码如下:

    1. // Stream of (userId, buyCnts)
    2. val buyCnts: DataStream[(Int, Int)] = ...
    3. val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
    4. .keyBy(0)
    5. // session window based on a 30 seconds session gap interval 
    6. .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
    7. .sum(1)

    一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。

  • 相关阅读:
    http请求与响应,同步异步请求以及异步请求axios的配置
    React基础-React中发送Ajax请求以及Mock数据
    H4TCPE;H4ETTC;四[4-(4‘-羧基苯基)苯基]乙烯;AIE聚集诱导发光材料
    3. Apache HBase 为什么快?
    DataGridView可以点击列排序 Sort(使用BindingList改写)
    lazarus开发界面程序用线程显示进度条
    变身“毒”苹果?全球首个 DMP 漏洞,仅 A14、M1 等苹果芯片独有
    给自己写的前端初学docker文档
    设备指纹技术详解丨设备指纹知多少,看这场直播就够了!
    【ppt技巧】给ppt文件设置带有密码的只读模式
  • 原文地址:https://blog.csdn.net/shangjg03/article/details/133922519