• Flink SQL ---Top-N ,Window Top-N


    一、Top-N

    Top-N 查询要求按列排序的最小或最大的 N 个值。最小值集和最大值集都被认为是 Top-N 查询。如果需要在某个条件上仅显示批处理/流表中最底部的N条记录或最顶部的 N 条记录,则 Top-N 查询非常有用。这个结果集可以用于进一步的分析。

    Flink 使用OVER窗口子句和筛选条件的组合来表示 Top-N 查询。通过OVER窗口 PARTITION BY 子句的功能,Flink 还支持每组 Top-N。例如,每个类别中实时销售额最高的前五种产品。批处理表和流表上的 SQL 支持 Top-N 查询。

    Top-N 语句结构

    SELECT [column_list]
    FROM (
       SELECT [column_list],
         ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
           ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
       FROM table_name)
    WHERE rownum <= N [AND conditions]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Note: the above pattern must be followed exactly, otherwise the optimizer won’t be able to translate the query.

    TopN查询为**“结果更新”**。Flink SQL将根据顺序键对输入数据流进行排序,因此,如果前N个记录发生了更改,则更改的记录将作为 撤回/更新记录发送到下游建议使用支持更新的存储作为Top-N查询的接收器。此外,如果前N个记录需要存储在外部存储中,结果表应该具有与top -N查询相同的唯一键。

    The TopN query is Result Updating. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream. It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query.
    
    • 1

    案例
    源数据

    create view pv_log as 
    select category_id, item_id, count(1) as pv from user_log group by category_id, item_id;
      
    SELECT *
    FROM (
       SELECT *,
         ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
       FROM pv_log)
    WHERE rownum <= 3;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.1、No Ranking Output Optimization

    如上所述,rownum字段将作为唯一键的一个字段写入结果表,这可能会导致大量记录写入结果表。例如,当排名9的记录(比如product-1001)被更新,它的排名被升级为1时,从排名1到9的所有记录都将作为更新消息输出到结果表中。如果结果表接收到太多的数据,它将成为SQL作业的瓶颈。

    优化方法是在Top-N查询的外部SELECT子句中省略rownum字段。这是合理的,因为前N记录的数量通常不多,因此消费者可以自己快速地对记录进行排序。如果没有rownum字段,在上面的例子中,只需要将更改的记录(product-1001)发送到下游,这可以减少对结果表的大量IO。

    -- 从输出中省略row_num字段
    SELECT category_id, pv
    FROM (
       SELECT *,
         ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
       FROM pv_log)
    WHERE rownum <= 3;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    二、Window Top-N

    特殊的Top-N,基于窗口
    格式:

    SELECT [column_list]
    FROM (
       SELECT [column_list],
         ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
           ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
       FROM table_name) -- relation applied windowing TVF
    WHERE rownum <= N [AND conditions]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    案例

    SELECT *
      FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY pv DESC) as rownum
        FROM (
          SELECT window_start, window_end, category_id, count(1) as pv, count(distinct user_id) as uv
          FROM TABLE(
            TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECONDS))
          GROUP BY window_start, window_end, category_id
        )
      ) WHERE rownum <= 3;
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.1、限制

    Currently, Flink only supports Window Top-N which follows after Window Aggregation. Window Top-N after Windowing TVF will be support in the near future.

  • 相关阅读:
    Ubantu18.04系统安装Hexagon SDK教程
    计算机视觉--图像拼接
    关于MAC电脑无法正常登陆H3C iNodes登陆的解决办法
    Java.lang.Class类 isEnum()方法有什么功能呢?
    2023.11.15 关于 Spring Boot 配置文件
    深度学习经典网络:GoogleNet
    flask-sqlalchemy库
    荣耀MagicBook X 14 Pro锐龙版 2023 集显(FRI-H76)笔记本电脑原装出厂Windows11系统工厂模式安装包下载,带F10智能还原
    【发表案例】IF6.5+,中科院2区,2个月录用,6天见刊!
    详解节流防抖
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127584803