• Flink SQL--- Over Aggregation


    一、Over 聚合介绍

    Over 聚合定义(支持 Batch\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。那这里我们拿 Over 聚合​窗口聚合 做一个对比,其之间的最大不同之处在于:

    • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到;
    • Over 聚合:能够保留原始字段.在生产环境中,Over 聚合的使用场景还是比较少的。

    GROUP BY是针对每个group产生一个aggregate value,而OVER则是对每一个row产生一个aggregate value

    Over 聚合的语法总结如下:

    SELECT
      agg_func(agg_col) OVER (
        [PARTITION BY col1[, col2, ...]]
        ORDER BY time_col
        range_definition),
      ...
    FROM ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    其中:

    • ORDER BY:必须是时间戳列(事件时间、处理时间)
    • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
    • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。
      • 第一种为按照行数聚合​,
      • 第二种为按照时间区间聚合。

    可以在一个SELECT子句中定义多个OVER窗口聚合。但是,对于流查询,由于当前的限制,所有聚合的OVER窗口必须相同。

    二、案例

    数据源

    -- 源数据表
    CREATE TABLE orders (
        order_id BIGINT,
        product BIGINT,
        amount BIGINT,
        order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
        WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.order_id.min' = '1',
      'fields.order_id.max' = '2',
      'fields.amount.min' = '1',
      'fields.amount.max' = '10',
      'fields.product.min' = '1',
      'fields.product.max' = '2'
    );
    
    -- 结果表
    CREATE TABLE sink_table (
        product BIGINT,
        order_time TIMESTAMP(3),
        amount BIGINT,
        one_hour_prod_amount_sum BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.1、行数聚合

    按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。

    INSERT INTO sink_table
    SELECT product, order_time, amount,
      SUM(amount) OVER (
        PARTITION BY product
        ORDER BY order_time
        -- 标识统计范围是一个 product 的最近 5 行数据
        ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
      ) AS one_hour_prod_amount_sum
    FROM orders;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    flink web控制台查看结果
    在这里插入图片描述

    2.2、时间聚合

    按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

    INSERT INTO sink_table
    SELECT product, order_time, amount,
      SUM(amount) OVER (
        PARTITION BY product
        ORDER BY order_time
        -- 标识统计范围是一个 product 的最近 1 小时的数据
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
      ) AS one_hour_prod_amount_sum
    FROM orders;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.3、在一个 SELECT 中有多个聚合窗口的聚合方式

    Flink SQL 支持了一种简化写法,如下案例:

    SELECT order_id, order_time, amount,
      SUM(amount) OVER w AS sum_amount,
      AVG(amount) OVER w AS avg_amount
    FROM orders
    -- 使用下面子句,定义 Over Window
    WINDOW w AS (
      PARTITION BY product
      ORDER BY order_time
      RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    参考:
    hive开窗函数-- over()
    https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/over-agg/

  • 相关阅读:
    【Error: error:0308010C:digital envelope routines::unsupported】
    性能测试知多少---性能分析与调优的原理
    Mysql配置参数
    opencv车牌识别<二>
    hutool 工具类提高编码效率
    M0007 四则运算
    哪些行业可以做ISO认证?
    基于SSH开发HR(人力资源管理系统)简单工作流程系统 课程设计 大作业 毕业设计
    5G之ULCL
    Single View Point Omnidirectional Camera Calibration from Planar Grids
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127586878