Top-N 查询要求按列排序的最小或最大的 N 个值。最小值集和最大值集都被认为是 Top-N 查询。如果需要在某个条件上仅显示批处理/流表中最底部的N条记录或最顶部的 N 条记录,则 Top-N 查询非常有用。这个结果集可以用于进一步的分析。
Flink 使用OVER窗口子句和筛选条件的组合来表示 Top-N 查询。通过OVER窗口 PARTITION BY 子句的功能,Flink 还支持每组 Top-N。例如,每个类别中实时销售额最高的前五种产品。批处理表和流表上的 SQL 支持 Top-N 查询。
Top-N 语句结构
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
Note: the above pattern must be followed exactly, otherwise the optimizer won’t be able to translate the query.
TopN查询为**“结果更新”**。Flink SQL将根据顺序键对输入数据流进行排序,因此,如果前N个记录发生了更改,则更改的记录将作为 撤回/更新记录发送到下游。建议使用支持更新的存储作为Top-N查询的接收器。此外,如果前N个记录需要存储在外部存储中,结果表应该具有与top -N查询相同的唯一键。
The TopN query is Result Updating. Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retraction/update records to downstream. It is recommended to use a storage which supports updating as the sink of Top-N query. In addition, if the top N records need to be stored in external storage, the result table should have the same unique key with the Top-N query.
案例
源数据
create view pv_log as
select category_id, item_id, count(1) as pv from user_log group by category_id, item_id;
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
FROM pv_log)
WHERE rownum <= 3;
如上所述,rownum字段将作为唯一键的一个字段写入结果表,这可能会导致大量记录写入结果表。例如,当排名9的记录(比如product-1001)被更新,它的排名被升级为1时,从排名1到9的所有记录都将作为更新消息输出到结果表中。如果结果表接收到太多的数据,它将成为SQL作业的瓶颈。
优化方法是在Top-N查询的外部SELECT子句中省略rownum字段。这是合理的,因为前N记录的数量通常不多,因此消费者可以自己快速地对记录进行排序。如果没有rownum字段,在上面的例子中,只需要将更改的记录(product-1001)发送到下游,这可以减少对结果表的大量IO。
-- 从输出中省略row_num字段
SELECT category_id, pv
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id, pv DESC) AS rownum
FROM pv_log)
WHERE rownum <= 3;
特殊的Top-N,基于窗口
格式:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]
案例
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY pv DESC) as rownum
FROM (
SELECT window_start, window_end, category_id, count(1) as pv, count(distinct user_id) as uv
FROM TABLE(
TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECONDS))
GROUP BY window_start, window_end, category_id
)
) WHERE rownum <= 3;
Currently, Flink only supports Window Top-N which follows after Window Aggregation. Window Top-N after Windowing TVF will be support in the near future.