• 【理论知识:Window Aggregation】flink 窗口聚合功能概述:两种窗口聚合模式的使用例子、功能说明


    本文描述的是flink1.16.1版本下窗口聚合的语法

    窗口聚合定义在GROUP BY子句中定义,子句中包含TVF关系的window_start, window_end字段。
    语法如下:

    SELECT ...
    FROM <windowed_table> -- relation applied windowing TVF
    GROUP BY window_start, window_end, ...
    
    • 1
    • 2
    • 3

     

    Unlike other aggregations on continuous tables, window aggregation do not emit intermediate results but only a final result, the total aggregation at the end of the window. Moreover, window aggregations purge all intermediate state when no longer needed.

    窗口聚合不产生中间结果,只产生最终结果。此外,当不再需要中间状态时,窗口聚合会清除所有的中间状态。

     

    一. Windowing TVFs

    Flink supports TUMBLE, HOP and CUMULATE types of window aggregations. In streaming mode, the time attribute field of a window table-valued function must be on either event or processing time attributes. See Windowing TVF for more windowing functions information. In batch mode, the time attribute field of a window table-valued function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ.

    flink支持滚动(TUMBLE)、HOP、累加(CUMULATE)类型的窗口。流模式下,窗口表值函数的时间列的属性必须是事件时间或处理时间。

    批模式下,窗口表值函数的时间列的属性必须是TIMESTAMP或TIMESTAMP_LTZ。

     

    1. 三种类型聚合的例子

    如下表,展示了表的schema和表的数据,其中表中必须有时间属性的字段:

    Flink SQL> desc Bid;
    +-------------+------------------------+------+-----+--------+---------------------------------+
    |        name |                   type | null | key | extras |                       watermark |
    +-------------+------------------------+------+-----+--------+---------------------------------+
    |     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
    |       price |         DECIMAL(10, 2) | true |     |        |                                 |
    |        item |                 STRING | true |     |        |                                 |
    | supplier_id |                 STRING | true |     |        |                                 |
    +-------------+------------------------+------+-----+--------+---------------------------------+
    
    SELECT * FROM Bid;
    +------------------+-------+------+-------------+
    |          bidtime | price | item | supplier_id |
    +------------------+-------+------+-------------+
    | 2020-04-15 08:05 | 4.00  | C    | supplier1   |
    | 2020-04-15 08:07 | 2.00  | A    | supplier1   |
    | 2020-04-15 08:09 | 5.00  | D    | supplier2   |
    | 2020-04-15 08:11 | 3.00  | B    | supplier2   |
    | 2020-04-15 08:13 | 1.00  | E    | supplier1   |
    | 2020-04-15 08:17 | 6.00  | F    | supplier2   
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    滚动窗口聚合例子:

    SELECT window_start, window_end, SUM(price)
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end;
    +------------------+------------------+-------+
    |     window_start |       window_end | price |
    +------------------+------------------+-------+
    | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
    | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
    +------------------+------------------+-------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    滚动窗口聚合例子:

     SELECT window_start, window_end, SUM(price)
      FROM TABLE(
        HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end;
    +------------------+------------------+-------+
    |     window_start |       window_end | price |
    +------------------+------------------+-------+
    | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
    | 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
    | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
    | 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00  |
    +------------------+------------------+-------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    聚合窗口例子:

    SELECT window_start, window_end, SUM(price)
      FROM TABLE(
        CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end;
    +------------------+------------------+-------+
    |     window_start |       window_end | price |
    +------------------+------------------+-------+
    | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00  |
    | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00  |
    | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
    | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00  |
    | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00  |
    | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00  |
    | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
    | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
    +------------------+------------------+-------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    注意:为了更好的理解窗口行为,这里简化了时间戳的显示,不显示后面的零,例如,类型为timestamp(3),在Flink SQL Client中,2020-04-15 08:05应该显示为2020-04-15 08:05:00.000。

     

    2. GROUPING SETS子句语法 ing

    窗口聚合也支持GROUPING SETS语法。

    GROUPING SETS提供了更为复杂的分组操作。行按照GROUPING SETS单独分组,并为每个组计算聚合,就像简单的group by子句一样。
    具有GROUPING SETS的窗口聚合要求window_start和window_end列都必须在GROUP BY子句中,但不在GROUPING SETS子句中。

     SELECT window_start, window_end, supplier_id, SUM(price) as price
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
    +------------------+------------------+-------------+-------+
    |     window_start |       window_end | supplier_id | price |
    +------------------+------------------+-------------+-------+
    | 2020-04-15 08:00 | 2020-04-15 08:10 |      (NULL) | 11.00 |
    | 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  5.00 |
    | 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 |
    | 2020-04-15 08:10 | 2020-04-15 08:20 |      (NULL) | 10.00 |
    | 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  9.00 |
    | 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier1 |  1.00 |
    +------------------+------------------+-------------+-------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在 GROUPING SETS中可以设置零个或多个列,或表达式,理解方式和GROUP BY子句中使用的方式相同。
    空分组意味着将所有行聚合为单个组,即使没有输入行也输出该组。ing

    References to the grouping columns or expressions are replaced by null values in result rows for grouping sets in which those columns do not appear.

    对于没有出现这些列的分组集,结果行中对分组列或表达式的引用将被替换为null值。ing

     

    2.1.ROLLUP

    2.2. CUBE

    ing
     

    3. Selecting Group Window Start and End Timestamps

    可以直接使用window_start,window_end字段,放到select语句中
     

    4. 级联窗口聚合(Cascading Window Aggregation)

    The window_start and window_end columns are regular timestamp columns, not time attributes. Thus they can’t be used as time attributes in subsequent time-based operations. In order to propagate time attributes, you need to additionally add window_time column into GROUP BY clause. The window_time is the third column produced by Windowing TVFs which is a time attribute of the assigned window. Adding window_time into GROUP BY clause makes window_time also to be group key that can be selected. Then following queries can use this column for subsequent time-based operations, such as cascading window aggregations and Window TopN.

    window_start和window_end列是常规的时间戳列,而不是时间属性。因此,它们不能在后续的基于时间的操作中用作时间属性。为了向下传递时间属性,需要在group by子句中添加window_time列。

    window_time是窗口tvf生成的第三列,它是所分配窗口的时间属性。将window_time添加到GROUP BY子句中,使window_time成为可以被选择(select 句子中)的组键。之后,下面的查询可以使用该列进行后续的基于时间的操作,例如级联窗口聚合和window TopN。

    -- 滚动5分钟对于每一个supplier_id列值
    CREATE VIEW window1 AS
    
    -- 注意:window_start,window_end 在select子句中可选。
    --However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF.
    -- 如果出现在select子句中,两个列需要设置别名,为了防止与其他window TVF的这两个列重名。
    
    SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price
      FROM TABLE(
        TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY supplier_id, window_start, window_end, window_time;
    
    
    
    -- 第一个窗口滚动10分钟ing
    SELECT window_start, window_end, SUM(partial_price) as total_price
      FROM TABLE(
      -- rowtime:这里使用了上一个句子声明的时间列
          TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

     

    二. Group Window Aggregation

    注意:

    Warning: Group Window Aggregation is deprecated. It’s encouraged to use Window TVF Aggregation which is more powerful and effective.
    Compared to Group Window Aggregation, Window TVF Aggregation have many advantages, including:
    Have all performance optimizations mentioned in Performance Tuning.
    Support standard GROUPING SETS syntax.
    Can apply Window TopN after window aggregation result.
    and so on.

    分组聚合已经不推荐使用。推荐使用window的TVF聚合,相比于分组窗口聚合,窗口TVF聚合有以下优势:

    • 支持提到的所有性能优化
    • 支持GROUPING SETS
    • 窗口聚合后可以实现window TopN

    组窗口聚合在Group BY子句中定义。与使用常规GROUP BY子句的查询一样,在各组内(window下)计算算出单个结果集。
     

    1. Group Window Functions

    TUMBLE(time_attr, interval) 
    
    • 1
    • 用于定义一个滚动窗口。
    • 滚动时间窗口将行(数据)分配到固定非重叠的连续窗口
    • 滚动时间窗口可以被定义在事件时间(流模式+批模式)和处理时间(流模式)
    HOP(time_attr, interval, interval)
    
    • 1

    Defines a hopping time window (called sliding window in the Table API). A hopping time window has a fixed duration (second interval parameter) and hops by a specified hop interval (first interval parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Hopping windows can be defined on event-time (stream + batch) or processing-time (stream).

    SESSION(time_attr, interval)
    
    • 1

    Defines a session time window. Session time windows do not have a fixed duration but their bounds are defined by a time interval of inactivity, i.e., a session window is closed if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time (stream + batch) or processing-time (stream).

     

    2. 时间属性的声明

    流模式下,time_attr参数必须引用一个有效的时间属性,该字段已经明确了处理时间或事件时间。那么,如何声明时间属性,跳转如下官网:

    how to define time attributes.

     

    3. Selecting Group Window Start and End Timestamps

    start timestampsend timestamps以及time attribute字段可以通过如下辅助函数实现输出(用于select语句中):

    1.返回窗口的下界(时间戳)

    TUMBLE_START(time_attr, interval)
    HOP_START(time_attr, interval, interval)
    SESSION_START(time_attr, interval)
    
    • 1
    • 2
    • 3

     
    2.返回独占上界的时间戳

    TUMBLE_END(time_attr, interval)
    HOP_END(time_attr, interval, interval)
    SESSION_END(time_attr, interval)
    
    • 1
    • 2
    • 3

    此时间戳不能用于后续 rowtime attribute(基于时间)的操作例如:interval joins 、 group window 、 over window aggregations

     

    3.返回包含上界的时间戳。

    TUMBLE_ROWTIME(time_attr, interval)
    HOP_ROWTIME(time_attr, interval, interval)
    SESSION_ROWTIME(time_attr, interval)
    
    • 1
    • 2
    • 3

    此时间戳可以用于后续 rowtime attribute(基于时间)的操作例如:interval joins 、 group window 、 over window aggregations

     
    4.返回proctime(处理时间)属性

    TUMBLE_PROCTIME(time_attr, interval)
    HOP_PROCTIME(time_attr, interval, interval)
    SESSION_PROCTIME(time_attr, interval)
    
    • 1
    • 2
    • 3

    此时间戳可以用于后续 rowtime attribute(基于时间)的操作例如:interval joins 、 group window 、 over window aggregations

    辅助函数的传参,必须和group by子句中保持一致,以下例子:

    CREATE TABLE Orders (
      user       BIGINT,
      product    STRING,
      amount     INT,
      order_time TIMESTAMP(3),
      -- 声明时间属性
      WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE
    ) WITH (...);
    
    SELECT
      user,
      TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
      SUM(amount) FROM Orders
    GROUP BY
      TUMBLE(order_time, INTERVAL '1' DAY),
      user
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

     

    三. 小结

    1. flink1.16提供了两种窗口创建的模式,官网建议使用Windowing TVFs的方式去使用窗口功能
    2. 本文提供了两种窗口模式使用的例子
    3. Windowing TVFs提供了GROUPING SETS的能力,提供了更为复杂的分组操作; 级联窗口聚合模式实现时间属性的向下传递
    4. 流模式下,必须要声明时间字段,并声明为处理时间还是事件时间
    5. 本文讲了两种窗口模式下,select语句的Group Window Start and End Timestamps字段怎么获取。

     
    参考:

    https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/window-agg/#group-window-aggregation

  • 相关阅读:
    生态板行业分析:中国市场消费量同比增长0.8%
    SAP UI5 FileUploader 的本地文件上传技术实现分享
    Redis分布式锁的实现、优化与Redlock算法探讨
    微信小程序中长按识别二维码
    生产部长修炼宝典⑤:基于管控分析模型的生产管控可视化分析看板建设
    外贸网站被谷歌收录的方法
    银河麒麟服务器系统中intel-x710网卡丢包问题
    如何使用 Nginx 创建临时和永久重定向
    小小装饰器大大用处
    昇思MindSpore开源社区算力使能,快来SIG申请你的专属算力
  • 原文地址:https://blog.csdn.net/hiliang521/article/details/134007610