• Flink SQL ---Top-N ,Window Top-N


    一、Top-N

    Top-N 查询要求按列排序的最小或最大的 N 个值。最小值集和最大值集都被认为是 Top-N 查询。如果需要在某个条件上仅显示批处理/流表中最底部的N条记录或最顶部的 N 条记录,则 Top-N 查询非常有用。这个结果集可以用于进一步的分析。

    Flink 使用OVER窗口子句和筛选条件的组合来表示 Top-N 查询。通过OVER窗口 PARTITION BY 子句的功能,Flink 还支持每组 Top-N。例如,每个类别中实时销售额最高的前五种产品。批处理表和流表上的 SQL 支持 Top-N 查询。

    Top-N 语句结构

    SELECT [column_list]
    FROM (
       SELECT [column_list],
         ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
           ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
       FROM table_name)
    WHERE rownum <= N [AND conditions]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Note: the above pattern must be followed exactly, otherwise the optimizer won’t be able to translate the query.

    TopN查询为**“结果更新”**。Flink SQL将根据顺序键对输入数据流进行排序,因此,如果前N个记录发生了更改,则更改的记录将作为 撤回/更新记录发送到下游建议使用支持更新的存储作为Top-N查询的接收器。此外,如果前N个记录需要存储在外部存储中,结果表应该具有与top -N查询相同的唯一键。

    The TopN query is Result Updating. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream. It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query.
    
    • 1

    案例
    源数据

    create view pv_log as 
    select category_id, item_id, count(1) as pv from user_log group by category_id, item_id;
      
    SELECT *
    FROM (
       SELECT *,
         ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
       FROM pv_log)
    WHERE rownum <= 3;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.1、No Ranking Output Optimization

    如上所述,rownum字段将作为唯一键的一个字段写入结果表,这可能会导致大量记录写入结果表。例如,当排名9的记录(比如product-1001)被更新,它的排名被升级为1时,从排名1到9的所有记录都将作为更新消息输出到结果表中。如果结果表接收到太多的数据,它将成为SQL作业的瓶颈。

    优化方法是在Top-N查询的外部SELECT子句中省略rownum字段。这是合理的,因为前N记录的数量通常不多,因此消费者可以自己快速地对记录进行排序。如果没有rownum字段,在上面的例子中,只需要将更改的记录(product-1001)发送到下游,这可以减少对结果表的大量IO。

    -- 从输出中省略row_num字段
    SELECT category_id, pv
    FROM (
       SELECT *,
         ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
       FROM pv_log)
    WHERE rownum <= 3;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    二、Window Top-N

    特殊的Top-N,基于窗口
    格式:

    SELECT [column_list]
    FROM (
       SELECT [column_list],
         ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
           ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
       FROM table_name) -- relation applied windowing TVF
    WHERE rownum <= N [AND conditions]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    案例

    SELECT *
      FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY pv DESC) as rownum
        FROM (
          SELECT window_start, window_end, category_id, count(1) as pv, count(distinct user_id) as uv
          FROM TABLE(
            TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECONDS))
          GROUP BY window_start, window_end, category_id
        )
      ) WHERE rownum <= 3;
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.1、限制

    Currently, Flink only supports Window Top-N which follows after Window Aggregation. Window Top-N after Windowing TVF will be support in the near future.

  • 相关阅读:
    动手学Avalonia:基于硅基流动构建一个文生图应用(一)
    2020银川B - The Great Wall dp 1383A - String Transformation 1 并查集
    [Spring实战] 整合Spring/SpringMVC/Mybatis(SSM)实现登录功能(带前端)
    网站面临的主要漏洞和安全研究员发现问题
    中创算力九月员工生日会 | 愿尔祯祥,岁岁如常
    数据标注工具 Label-Studio
    东软睿驰受邀出席恩智浦汽车生态技术峰会,见证S32 CoreRide开放平台全新发布
    GitHub上24.3kStar的js进度条插件,确定不关注一下?
    如何优雅的避免空指针异常
    谷粒商城项目总结(一)-基础篇
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127584803