• Flink 窗口函数


    一、Window 概述

    Flink 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无线数据为有限块进行处理的手段。

    二、Window 分类

    Window 可以分为两类:

    • CountWindow(计数窗口):按照指定的数据条数生成一个Window,与时间无关;
    • TimeWindow(事件窗口):按照时间生成 Window;

    对于TimeWindow ,可以根据窗口实现原理的不同分成三类:

    • 滚动窗口(Tumbling Window)
    • 滑动窗口(Sliding Window)
    • 会话窗口(Session Window)

    2.1、滚动窗口(Tumbling Window)

    将数据依据固定的窗口长度对数据进行切片;

    特点:时间对其、窗口长度固定、没有重叠;
    在这里插入图片描述
    适用场景:适合做 BI 统计(每个时间段的聚合计算)。

    2.2、滑动窗口(Sliding Window)

    滑动窗口是固定的窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成;

    特点:时间对齐、窗口长度固定,可以重叠;
    在这里插入图片描述
    适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

    2.3、会话窗口(Session Window)

    由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口。

    特点:时间无法对齐;

    session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不在收到元素,即非活动间隔产生,那么这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的 session 窗口中去。
    在这里插入图片描述

    三、Window API

    3.1、CountWindow

    3.1.1、滚动窗口

    默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

    val minTempPerWindow: DataStream[(String, Double)] = dataStream
    .map(r => (r.id, r.temperature))
    .keyBy(_._1)
    .countWindow(5)
    .reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
    

    3.1.2、滑动窗口

    滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
    下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。

    val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0)
    //每当某一个 key 的个数达到 2 的时候,触发计算,计算最近该 key 最近 10 个元素的内容
    val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10,2)
    val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)
    

    3.2、TimeWindow

    3.1.1、滚动窗口

    Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。

    val minTempPerWindow = dataStream
    .map(r => (r.id, r.temperature))
    .keyBy(_._1)
    .timeWindow(Time.seconds(15))
    //或者指定TumblingEventTimeWindows
    //.window(TumblingEventTimeWindows.of(Time.seconds(15)))
    .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
    

    3.1.2、滑动窗口

    滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
    下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。

    val minTempPerWindow: DataStream[(String, Double)] = dataStream
    .map(r => (r.id, r.temperature))
    .keyBy(_._1)
    .timeWindow(Time.seconds(15), Time.seconds(5))
    //或者指定SlidingEventTimeWindows
    //.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5)
    .reduce((r1, r2) => (r1._1, r1._2.min(r2._2))))
    

    3.1.2、会话窗口

    val minTempPerWindow: DataStream[(String, Double)] = dataStream
    .map(r => (r.id, r.temperature))
    .keyBy(_._1)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .reduce((r1, r2) => (r1._1, r1._2.min(r2._2))))
    
  • 相关阅读:
    Vue项目打包为桌面应用
    Oracle/PLSQL: Lag Function
    【优化算法】基于matlab融合飞行机制的粒子群优化算法【含Matlab源码 1924期】
    目标级联分析法( Analytical Target Cascading , ATC )理论matlab程序
    JAVA毕设项目软考网络工程师在线练习平台(java+VUE+Mybatis+Maven+Mysql)
    idea快速搭建struts2框架
    【单片机毕业设计】【mcuclub-hj-012】基于单片机的空气质量(CO、有害混合气体)检测的设计
    @Resource和@Autowired
    我的十年程序员生涯--无锡之旅,开启岗前培训
    螺母加工工艺流程
  • 原文地址:https://blog.csdn.net/xfp1007907124/article/details/139748965