• 【大数据】Flink SQL 语法篇(四):Group 聚合、Over 聚合


    Flink SQL 语法篇》系列,共包含以下 10 篇文章:

    😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

    1.Group 聚合

    1.1 基础概念

    Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

    在这里插入图片描述

    1.2 窗口聚合和 Group 聚合

    应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

    那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

    首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL

    -- 数据源表
    CREATE TABLE source_table (
        -- 维度数据
        dim STRING,
        -- 用户 id
        user_id BIGINT,
        -- 用户
        price BIGINT,
        -- 事件时间戳
        row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
        -- watermark 设置
        WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.dim.length' = '1',
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '100000',
      'fields.price.min' = '1',
      'fields.price.max' = '100000'
    )
    
    -- 数据汇表
    CREATE TABLE sink_table (
        dim STRING,
        pv BIGINT,
        sum_price BIGINT,
        max_price BIGINT,
        min_price BIGINT,
        uv BIGINT,
        window_start bigint
    ) WITH (
      'connector' = 'print'
    )
    
    -- 数据处理逻辑
    insert into sink_table
    select dim,
        count(*) as pv,
        sum(price) as sum_price,
        max(price) as max_price,
        min(price) as min_price,
        -- 计算 uv 数
        count(distinct user_id) as uv,
        UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
    from source_table
    group by
        dim,
        -- 按照 Flink SQL tumble 窗口写法划分窗口
        tumble(row_time, interval '1' minute)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    转换为 Group 聚合 的写法如下:

    -- 数据源表
    CREATE TABLE source_table (
        -- 维度数据
        dim STRING,
        -- 用户 id
        user_id BIGINT,
        -- 用户
        price BIGINT,
        -- 事件时间戳
        row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
        -- watermark 设置
        WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.dim.length' = '1',
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '100000',
      'fields.price.min' = '1',
      'fields.price.max' = '100000'
    );
    
    -- 数据汇表
    CREATE TABLE sink_table (
        dim STRING,
        pv BIGINT,
        sum_price BIGINT,
        max_price BIGINT,
        min_price BIGINT,
        uv BIGINT,
        window_start bigint
    ) WITH (
      'connector' = 'print'
    );
    
    -- 数据处理逻辑
    insert into sink_table
    select dim,
        count(*) as pv,
        sum(price) as sum_price,
        max(price) as max_price,
        min(price) as min_price,
        -- 计算 uv 数
        count(distinct user_id) as uv,
        cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
    from source_table
    group by
        dim,
        -- 将秒级别时间戳 / 60 转化为 1min
        cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    确实没错,上面这个转换是一点问题都没有的。

    但是窗口聚合和 Group by 聚合的差异在于:

    • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
    • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

    1.3 SQL 语义

    SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

    • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
    • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
    • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

    1.4 Group 聚合支持 Grouping sets、Rollup、Cube

    Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

    SELECT 
        supplier_id
        , rating
        , product_id
        , COUNT(*)
    FROM (VALUES
        ('supplier1', 'product1', 4),
        ('supplier1', 'product2', 3),
        ('supplier2', 'product3', 3),
        ('supplier2', 'product4', 4))
    AS Products(supplier_id, product_id, rating)
    GROUP BY GROUPING SET (
        ( supplier_id, product_id, rating ),
        ( supplier_id, product_id         ),
        ( supplier_id,             rating ),
        ( supplier_id                     ),
        (              product_id, rating ),
        (              product_id         ),
        (                          rating ),
        (                                 )
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.Over 聚合

    Over 聚合定义(支持 Batch / Streaming):可以理解为是一种特殊的滑动窗口聚合函数。

    那这里我们拿 Over 聚合窗口聚合 做一个对比,其之间的最大不同之处在于:

    • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到。
    • Over 聚合:能够保留原始字段。

    注意:其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?

    • 应用场景:计算最近一段滑动窗口的聚合结果数据。
    • 实际案例:查询每个产品最近一小时订单的金额总和。
    SELECT order_id, order_time, amount,
      SUM(amount) OVER (
        PARTITION BY product
        ORDER BY order_time
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
      ) AS one_hour_prod_amount_sum
    FROM Orders
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • Over 聚合的语法总结如下:
    SELECT
      agg_func(agg_col) OVER (
        [PARTITION BY col1[, col2, ...]]
        ORDER BY time_col
        range_definition),
      ...
    FROM ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • ORDER BY:必须是时间戳列(事件时间、处理时间)。
    • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合。
    • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。如下案例所示。

    2.1 时间区间聚合

    按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 1 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

    CREATE TABLE source_table (
        order_id BIGINT,
        product BIGINT,
        amount BIGINT,
        order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
        WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.order_id.min' = '1',
      'fields.order_id.max' = '2',
      'fields.amount.min' = '1',
      'fields.amount.max' = '10',
      'fields.product.min' = '1',
      'fields.product.max' = '2'
    );
    
    CREATE TABLE sink_table (
        product BIGINT,
        order_time TIMESTAMP(3),
        amount BIGINT,
        one_hour_prod_amount_sum BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO sink_table
    SELECT product, order_time, amount,
      SUM(amount) OVER (
        PARTITION BY product
        ORDER BY order_time
        -- 标识统计范围是一个 product 的最近 1 小时的数据
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
      ) AS one_hour_prod_amount_sum
    FROM source_table
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2.2 行数聚合

    按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 5 5 行数据的 amount 之和。

    CREATE TABLE source_table (
        order_id BIGINT,
        product BIGINT,
        amount BIGINT,
        order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
        WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.order_id.min' = '1',
      'fields.order_id.max' = '2',
      'fields.amount.min' = '1',
      'fields.amount.max' = '2',
      'fields.product.min' = '1',
      'fields.product.max' = '2'
    );
    
    CREATE TABLE sink_table (
        product BIGINT,
        order_time TIMESTAMP(3),
        amount BIGINT,
        one_hour_prod_amount_sum BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO sink_table
    SELECT product, order_time, amount,
      SUM(amount) OVER (
        PARTITION BY product
        ORDER BY order_time
        -- 标识统计范围是一个 product 的最近 5 行数据
        ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
      ) AS one_hour_prod_amount_sum
    FROM source_table
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    预跑结果如下:

    +I[2, 2021-12-24T22:18:19.147, 1, 9]
    +I[1, 2021-12-24T22:18:20.147, 2, 11]
    +I[1, 2021-12-24T22:18:21.147, 2, 12]
    +I[1, 2021-12-24T22:18:22.147, 2, 12]
    +I[1, 2021-12-24T22:18:23.148, 2, 12]
    +I[1, 2021-12-24T22:18:24.147, 1, 11]
    +I[1, 2021-12-24T22:18:25.146, 1, 10]
    +I[1, 2021-12-24T22:18:26.147, 1, 9]
    +I[2, 2021-12-24T22:18:27.145, 2, 11]
    +I[2, 2021-12-24T22:18:28.148, 1, 10]
    +I[2, 2021-12-24T22:18:29.145, 2, 10]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:

    SELECT order_id, order_time, amount,
      SUM(amount) OVER w AS sum_amount,
      AVG(amount) OVER w AS avg_amount
    FROM Orders
    -- 使用下面子句,定义 Over Window
    WINDOW w AS (
      PARTITION BY product
      ORDER BY order_time
      RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    SpringBoot教程二自定义实现的参数校验器,可注解通用所有模块
    Python 有哪些好的学习资料或者博客?
    Python面向对象特性——多继承(概念、语法、代码演练、使用注意事项)
    【华为OD机试真题 python】图像物体的边界 【2022 Q4 | 200分】
    云原生之容器化:Docker三剑客之Docker Compose
    C# 设计模式之代理模式
    reactive对比ref
    【OpenCV】- 模板匹配(浩瀚星空只为寻找那一抹明月)
    利用随机森林对特征重要性进行评估(公式原理)
    浅谈智能化能源管理系统平台在企业中的应用
  • 原文地址:https://blog.csdn.net/be_racle/article/details/136286801