• Flink SQL Over 聚合详解


    Over 聚合定义(⽀持 Batch\Streaming):**特殊的滑动窗⼝聚合函数,拿 Over 聚合 与 窗⼝聚合 做对⽐。

    窗⼝聚合:不在 group by 中的字段,不能直接在 select 中拿到

    Over 聚合:能够保留原始字段

    注意: ⽣产环境中,Over 聚合的使⽤场景较少。

    **应⽤场景:**计算最近⼀段滑动窗⼝的聚合结果数据。

    **实际案例:**查询每个产品最近⼀⼩时订单的⾦额总和:

    SELECT order_id,
    	order_time,
      amount,
     	SUM(amount) OVER (
     		PARTITION BY product
     		ORDER BY order_time
     		RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
     ) AS one_hour_prod_amount_sum
    FROM Orders
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Over 聚合语法如下:

    SELECT
     agg_func(agg_col) OVER (
     [PARTITION BY col1[, col2, ...]]
     ORDER BY time_col
     range_definition),
     ...
    FROM ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ORDER BY:必须是时间戳列(事件时间、处理时间);

    PARTITION BY:标识了聚合窗⼝的聚合粒度,如上述案例是按照 product 进⾏聚合;

    range_definition:标识聚合窗⼝的聚合数据范围,在 Flink 中有两种指定数据范围的⽅式。第⼀种为 按照⾏数聚合 ,第⼆种为 按照时间区间聚合 。

    1.时间区间聚合

    **案例:**输出一个产品最近⼀⼩时数据的 amount 之和。

    结果就是最近⼀⼩时数据的 amount 之和。

    CREATE TABLE source_table (
     order_id BIGINT,
     product BIGINT,
     amount BIGINT,
     order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
     WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second' = '1',
     'fields.order_id.min' = '1',
     'fields.order_id.max' = '2',
     'fields.amount.min' = '1',
     'fields.amount.max' = '10',
     'fields.product.min' = '1',
     'fields.product.max' = '2'
    );
    
    CREATE TABLE sink_table (
     product BIGINT,
     order_time TIMESTAMP(3),
     amount BIGINT,
     one_hour_prod_amount_sum BIGINT
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink_table
    SELECT product,
    	order_time,
      amount,
     SUM(amount) OVER (
     	PARTITION BY product
     	ORDER BY order_time
     	-- 标识统计范围是⼀个 product 的最近 1 ⼩时的数据
     	RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
     ) AS one_hour_prod_amount_sum
    FROM source_table
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    结果如下:

    +I[2, 2021-12-24T22:08:26.583, 7, 73]
    +I[2, 2021-12-24T22:08:27.583, 7, 80]
    +I[2, 2021-12-24T22:08:28.583, 4, 84]
    +I[2, 2021-12-24T22:08:29.584, 7, 91]
    +I[2, 2021-12-24T22:08:30.583, 8, 99]
    +I[1, 2021-12-24T22:08:31.583, 9, 138]
    +I[2, 2021-12-24T22:08:32.584, 6, 105]
    +I[1, 2021-12-24T22:08:33.584, 7, 145]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    2.⾏数聚合

    **案例:**输出一个产品最近 5 ⾏数据的 amount 之和。

    CREATE TABLE source_table (
     order_id BIGINT,
     product BIGINT,
     amount BIGINT,
     order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
     WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second' = '1',
     'fields.order_id.min' = '1',
     'fields.order_id.max' = '2',
     'fields.amount.min' = '1',
     'fields.amount.max' = '2',
     'fields.product.min' = '1',
     'fields.product.max' = '2'
    );
    
    CREATE TABLE sink_table (
     product BIGINT,
     order_time TIMESTAMP(3),
     amount BIGINT,
     one_hour_prod_amount_sum BIGINT
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink_table
    SELECT product,
    	order_time,
      amount,
     SUM(amount) OVER (
     PARTITION BY product
     ORDER BY order_time
     -- 标识统计范围是⼀个 product 的最近 5 ⾏数据
     ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
     ) AS one_hour_prod_amount_sum
    FROM source_table
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    结果如下:

    +I[2, 2021-12-24T22:18:19.147, 1, 9]
    +I[1, 2021-12-24T22:18:20.147, 2, 11]
    +I[1, 2021-12-24T22:18:21.147, 2, 12]
    +I[1, 2021-12-24T22:18:22.147, 2, 12]
    +I[1, 2021-12-24T22:18:23.148, 2, 12]
    +I[1, 2021-12-24T22:18:24.147, 1, 11]
    +I[1, 2021-12-24T22:18:25.146, 1, 10]
    +I[1, 2021-12-24T22:18:26.147, 1, 9]
    +I[2, 2021-12-24T22:18:27.145, 2, 11]
    +I[2, 2021-12-24T22:18:28.148, 1, 10]
    +I[2, 2021-12-24T22:18:29.145, 2, 10]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在⼀个 SELECT 中有多个聚合窗⼝,简化写法如下:

    SELECT order_id,
    	order_time,
      amount,
     SUM(amount) OVER w AS sum_amount,
     AVG(amount) OVER w AS avg_amount
    FROM Orders
    -- 使⽤下⾯⼦句,定义 Over Window
    WINDOW w AS (
     PARTITION BY product
     ORDER BY order_time
     RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • 相关阅读:
    Android dumpsys 常用命令
    容器的通俗讲解
    前端爱心代码跟个风
    高通域控占比接近9成,座舱智能化进入新一轮升级周期
    构造函数和析构函数的顺序问题
    Linux进程与操作系统详解
    Linux消息队列信号量(了解)
    C#学习 - 初识类型、变量、方法
    动静图结合详解: 归并排序 ,计数排序
    Nginx部署Vue项目css文件能加载但是不生效
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/134207187