• FlinkSQL -- joins----flink-1.13.6


    前言

    Flink join 分类

    Flink 支持了非常多的数据 Join 方式,主要包括以下三种:

    • 动态表(流)与动态表(流)的 Join。
    • 动态表(流)与外部维表(比如 Redis)的 Join。
    • 动态表字段的列转行(一种特殊的 Join)。

    细分 Flink SQL 支持的 Join:

    • Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join。
    • Interval Join:流与流的 Join,两条流一段时间区间内的 Join。
    • Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join。
    • Lookup Join:流与 外部维表的 Join。
    • Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行。
    • Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join。

    Join 的应用场景

    关于 Join 的场景就太多太多了,在离线数仓开发中,Join 是最常用的算子之一了。

    比如:

    • 几乎所有公司的 APP 都会涉及到的曝光关联点击;两条流数据之间的维度拼接;将表打宽等等
    • 电商场景中的退单的订单关联下单的订单分析退单的单的特点等

    流式计算中的两大问题

    很多离线数仓的小伙伴会说,Join 这玩意非常简单啊,Hive SQL 简简单单的写个关联 SQL 就行啊。

    是的,在批式计算中,Join 的左右表都是 “全集”,所以在全集上面做关联操作是非常简单的,比如目前离线中的技术方案有 sort-merge、hash join 等,这些方案都非常成熟了。

    但是,在流式计算中,左右表的数据都是无界的,而且是实时到来的。这就会引起流式计算中的 2 个问题 + 大数据中的 2 个核心问题(我们以 A left join B 举例):

    流式计算中的 2 个问题:

    • 流式数据到达计算引擎的时间不一定:比如 A 流的数据先到了,A 流不知道 B 流对应同 key 的数据什么时候到,没法关联(数据质量问题)
    • 流式数据不知何时、下发怎样的数据:A 流的数据到达后,如果 B 流的数据永远不到,那么 A 流的数据在什么时候以及是否要填充一个 null 值下发下去(数据时效问题)

    从上面两个问题也可以得出大数据中的 2 个核心问题:

    • 数据质量问题
    • 数据时效性问题

    一、Regular Joins

    1.1、INNER Equi-JOIN

    SELECT *
    FROM Orders
    INNER JOIN Product
    ON Orders.product_id = Product.id
    
    • 1
    • 2
    • 3
    • 4

    案例:

     -- 曝光日志数据
    CREATE TABLE show_log(
    log_id BIGINT,
    show_params STRING
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '2',
    'fields.show_params.length' = '1',
    'fields.log_id.min' = '1',
     'fields.log_id.max' = '100'
     );
     
    -- 点击日志数据
     CREATE TABLE click_log(
     log_id BIGINT,
     click_params STRING
     )
     WITH (
     'connector' = 'datagen',
     'rows-per-second' = '2',
     'fields.click_params.length' = '1',
     'fields.log_id.min' = '1',
     'fields.log_id.max' = '10'
     );
     
     CREATE TABLE sink_showclick_result (
     s_id BIGINT,
     s_params STRING,
     c_id BIGINT,
     c_params STRING
     ) WITH (
     'connector' = 'print'
     );
     
     
     -- 流的 INNER JOIN,条件为 log_id
     INSERT INTO sink_showclick_result
     SELECT
     show_log.log_id as s_id,
     show_log.show_params as s_params,
     click_log.log_id as c_id,
     click_log.click_params as c_params
     FROM show_log
     INNER JOIN click_log ON show_log.log_id = click_log.log_id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1.2、OUTER Equi-JOIN

    SELECT *
    FROM Orders
    LEFT JOIN Product
    ON Orders.product_id = Product.id
    
    SELECT *
    FROM Orders
    RIGHT JOIN Product
    ON Orders.product_id = Product.id
    
    SELECT *
    FROM Orders
    FULL OUTER JOIN Product
    ON Orders.product_id = Product.id
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    案例:

      
     -- 流的 LEFT JOIN,条件为 log_id
     INSERT INTO sink_showclick_result
     SELECT
     show_log.log_id as s_id,
     show_log.show_params as s_params,
     click_log.log_id as c_id,
     click_log.click_params as c_params
     FROM show_log
     LEFT JOIN click_log ON show_log.log_id = click_log.log_id;
    
    +I[85, a, null, null]
    +I[58, 0, null, null]
    +I[59, 7, null, null]
    +I[10, d, 10, 7]
    +I[10, d, 10, 8]
    +I[10, d, 10, e]
    +I[10, d, 10, c]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    1.3、关于 Regular Join 的注意事项:

    实时 Regular Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。

    Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。

    流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。

    二、Interval Join (时间区间 Join)

    Interval Joins datastream api 实现
    双流驱动

    在这里插入图片描述

    我们看到,流A的每一个元素,都会和流B的一定时间范围的元素进行JOIN。
    其中,上界和下界可以是负数,也可以是整数。Interval join目前只支持事件时间

    2.1、应用场景

    为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以博主理解 Interval Join 就是用于消灭回撤流的。

    2.2、案例

     -- 曝光日志数据
    CREATE TABLE show_log(
    log_id BIGINT,
    show_params STRING,
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    WATERMARK FOR row_time AS row_time
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.show_params.length' = '1',
    'fields.log_id.min' = '1',
     'fields.log_id.max' = '10'
     );
     
    -- 点击日志数据
     CREATE TABLE click_log(
     log_id BIGINT,
     click_params STRING,
     row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
     WATERMARK FOR row_time AS row_time
     )
     WITH (
     'connector' = 'datagen',
     'rows-per-second' = '1',
     'fields.click_params.length' = '1',
     'fields.log_id.min' = '1',
     'fields.log_id.max' = '10'
     );
     
     CREATE TABLE sink_showclick_result (
     s_id BIGINT,
     s_params STRING,
     c_id BIGINT,
     c_params STRING
     ) WITH (
     'connector' = 'print'
     );
     
     
     -- 流的 JOIN,条件为 log_id
     INSERT INTO sink_showclick_result
     SELECT
     show_log.log_id as s_id,
     show_log.show_params as s_params,
     click_log.log_id as c_id,
     click_log.click_params as c_params
     FROM show_log
     JOIN click_log ON show_log.log_id = click_log.log_id
     AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '4' HOUR AND click_log.row_time;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    注意:
    与regular join相比较,interval join只支持带有时间属性的append-only流。由于时间属性是单调递增的,Flink可以在不影响结果正确性的情况下从其状态中删除旧值。

    2.3、总结

    • 支持 INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN
    • 左右流都会触发结果更新
    • State 自动清理,根据时间区间保留数据
    • 输出流保留时间属性
    • 时间界限之前的状态是可以被释放的,数据行Row在时间轴上不断推进水位线向前。
    • Interval join需要至少一个 equi-join 谓词和一个限制了双方时间的 join 条件。
    • 与regular join相比较,interval join只支持带有时间属性的append-only流。

    三、Temporal Joins (快照join)

    单流驱动

    3.0、Temporal Join 定义(支持 Batch\Streaming):

    Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 join 这个 拉链快照表 的 join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table,使用一个明细表去 join 这个 Versioned Table 的 join 操作就叫做 Temporal JoinTemporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 primary key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 join。

    flink-1.12版本中的解释更加好理解

    时态表 Join 意味着对任意表(左输入/探针侧)去关联一个时态表(右输入/构建侧)的版本,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。
    Flink 使用了 SQL:2011 标准引入的时态表 Join 语法,时态表 Join 的语法如下:

    SELECT [column_list]
    FROM table1 [AS <alias1>]
    [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
    ON table1.column-name1 = table2.column-name1
    
    • 1
    • 2
    • 3
    • 4

    3.1、Event Time Temporal Join

    3.1.1、介绍

    基于事件时间的时态表 join 使用(左侧输入/探针侧) 的 事件时间 去关联(右侧输入/构建侧) 版本表 对应的版本。 基于事件时间的时态表 join 仅支持关版本表或版本视图,版本表或版本视图只能是一个 changelog 流。 但是,Flink 支持将 append-only 流转换成 changelog 流,因此版本表也可以来自一个 append-only 流。 查看声明版本视图 获取更多的信息关于如何声明一张来自 append-only 流的版本表。

    将事件时间作为时间属性时,可将 过去 时间属性与时态表一起使用。这允许对两个表中在相同时间点的记录执行 Join 操作。 与基于处理时间的时态 Join 相比,时态表不仅将构建侧记录的最新版本(是否最新由所定义的主键所决定)保存在 state 中,同时也会存储自上一个 watermarks 以来的所有版本(按时间区分)。

    3.1.2、案例

    例如,在探针侧表新插入一条事件时间时间为 12:30:00 的记录,它将和构建侧表时间点为 12:30:00 的版本根据时态表的概念进行 Join 运算。 因此,新插入的记录仅与时间戳小于等于 12:30:00 的记录进行 Join 计算(由主键决定哪些时间点的数据将参与计算)。

    通过定义事件时间,watermarks 允许 Join 运算不断向前滚动,丢弃不再需要的构建侧快照。因为不再需要时间戳更低或相等的记录。

    下面的例子展示了订单流关联产品表这个场景举例,orders 表包含了来自 Kafka 的实时订单流,product_changelog 表来自数据库表 products 的 changelog , 产品的价格在数据库表 products 中是随时间实时变化的。

    SELECT * FROM product_changelog;
    
    (changelog kind)  update_time product_name price
    ================= =========== ============ ===== 
    +(INSERT)         00:01:00    scooter      11.11
    +(INSERT)         00:02:00    basketball   23.11
    -(UPDATE_BEFORE)  12:00:00    scooter      11.11
    +(UPDATE_AFTER)   12:00:00    scooter      12.99  <= 产品 `scooter``12:00:00` 时涨价到了 `12.99`
    -(UPDATE_BEFORE)  12:00:00    basketball   23.11 
    +(UPDATE_AFTER)   12:00:00    basketball   19.99  <= 产品 `basketball``12:00:00` 时降价到了 `19.99`
    -(DELETE)         18:00:00    scooter      12.99  <= 产品 `scooter``18:00:00` 从数据库表中删除
    
    如果我们想输出 product_changelog 表在 10:00:00 对应的版本,表的内容如下所示:
    update_time  product_id product_name price
    ===========  ========== ============ ===== 
    00:01:00     p_001      scooter      11.11
    00:02:00     p_002      basketball   23.11
    
    如果我们想输出 product_changelog 表在 13:00:00 对应的版本,表的内容如下所示:
    update_time  product_id product_name price
    ===========  ========== ============ ===== 
    12:00:00     p_001      scooter      12.99
    12:00:00     p_002      basketball   19.99
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    通过基于事件时间的时态表 join, 我们可以 join 上版本表中的不同版本:

    CREATE TABLE orders (
      order_id STRING,
      product_id STRING,
      order_time TIMESTAMP(3),
      WATERMARK FOR order_time AS order_time  -- defines the necessary event time
    ) WITH (
    ...
    );
    
    -- 设置会话的时间区间, changelog 里的数据库操作时间是以 epoch 开始的毫秒数存储的,
    -- 在从毫秒转化为时间戳时,Flink SQL 会使用会话的时间区间
    -- 因此,请根据 changelog 中的数据库操作时间设置合适的时间区间
    SET table.local-time-zone=UTC;
    
    -- 声明一张版本表
    CREATE TABLE product_changelog (
      product_id STRING,
      product_name STRING,
      product_price DECIMAL(10, 4),
      update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- 注意:自动从毫秒数转为时间戳
      PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key constraint
      WATERMARK FOR update_time AS update_time   -- (2) defines the event time by watermark                               
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'products',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'localhost:9092',
      'value.format' = 'debezium-json'
    );
    
    -- 基于事件时间的时态表 Join
    SELECT
      order_id,
      order_time,
      product_name,
      product_time,
      price
    FROM orders AS O
    LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P
    ON O.product_id = P.product_id;
    
    order_id order_time product_name product_time price
    ======== ========== ============ ============ =====
    o_001    00:01:00   scooter      00:01:00     11.11
    o_002    00:03:00   basketball   00:02:00     23.11
    o_003    12:00:00   scooter      12:00:00     12.99
    o_004    12:00:00   basketball   12:00:00     19.99
    o_005    18:00:00   NULL         NULL         NULL
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    基于事件时间的时态表 Join 通常用在通过 changelog 丰富流上数据的场景。

    注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。

    注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。

    3.2、Processing Time Temporal Join

    基于处理时间的时态表 join 使用任意表 (左侧输入/探针侧) 的 处理时间 去关联 (右侧输入/构建侧) 版本表(Versioned table) 的最新版本. 基于处理时间的时态表 join 当前只支持关联 版本表或版本视图,且支持 版本表或版本视图当前只能是 append-only 流。

    如果将处理时间作为时间属性,过去 时间属性将无法与时态表一起使用。根据定义,处理时间总会是当前时间戳。 因此,关联时态表的调用将始终返回底层表的最新已知版本,并且底层表中的任何更新也将立即覆盖当前值

    可以将处理时间的时态 Join 视作简单的 HashMap ,HashMap 中存储来自构建侧的所有记录。 当来自构建侧的新插入的记录与旧值具有相同的 Key 时,旧值会被覆盖。 探针侧的每条记录将总会根据 HashMap 的最新/当前状态来计算。

    接下来的示例展示了订单流 Orders 该如何与实时变化的汇率表 Lates 进行基于处理时间的 时态 Join 操作,LatestRates 总是表示 HBase 表 Rates 的最新内容。

    1、LatestRates

    10:15> SELECT * FROM LatestRates;
    
    currency   rate
    ======== ======
    US Dollar   102
    Euro        114
    Yen           1
    
    10:30> SELECT * FROM LatestRates;
    
    currency   rate
    ======== ======
    US Dollar   102
    Euro        114
    Yen           1
    
    10:52> SELECT * FROM LatestRates;
    
    currency   rate
    ======== ======
    US Dollar   102
    Euro        116     <==== changed from 114 to 116
    Yen           1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2、订单表

    SELECT * FROM Orders;
    
    amount currency
    ====== =========
         2 Euro             <== arrived at time 10:15
         1 US Dollar        <== arrived at time 10:30
         2 Euro             <== arrived at time 10:52
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3、汇率转换

    amount currency     rate   amount*rate
    ====== ========= ======= ============
         2 Euro          114          228    <== arrived at time 10:15
         1 US Dollar     102          102    <== arrived at time 10:30
         2 Euro          116          232    <== arrived at time 10:52
    
    • 1
    • 2
    • 3
    • 4
    • 5
    SELECT
      o.amout, o.currency, r.rate, o.amount * r.rate
    FROM
      Orders AS o
      JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r -- proctime 表示基于处理时间的join
      ON r.currency = o.currency
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    维表join案例 https://chbxw.blog.csdn.net/article/details/127555641

    四、Lookup Join (维表join) – flink-1.13

    https://zhuanlan.zhihu.com/p/214053444
    https://www.cnblogs.com/Springmoon-venn/p/14862865.html

    4.1、 Lookup Join 定义(支持 Batch\Streaming)

    Lookup Join 其实就是 维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

    4.2、 应用场景

    我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,Mysql,HBase 中,这就是 Lookup Join 的由来

    4.3、案例

    接下来我们讲一个小例子,首先定义一下stream source,我们使用flink 1.11提供的datagen来生成数据。

    我们来模拟生成用户的数据,这里只生成的用户的id,范围在1-100之间。

    CREATE TABLE datagen (
     userid int,
     proctime as PROCTIME()
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='2',
     'fields.userid.kind'='random',
     'fields.userid.min'='1',
     'fields.userid.max'='10'
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    然后再创建一个mysql维表信息:

    -- flink sql
    CREATE TABLE dim_mysql (
      id int,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://chb1:3306/chb_test',
       'table-name' = 'userinfo',
       'username' = 'root',
       'password' = '123456'
    )
    
    -- mysql
    CREATE TABLE userinfo (
      id int,
      name varchar(30),
      age  int,
      PRIMARY KEY (id)
    )
    insert into userinfo (id, name) values (1, 'chb', 20), (2, 'ling',18), (3, 'zhangsan', 33), (4, 'lisi',44);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    最后执行sql查询,流表关联维表:(注意 使用 Processing Time Temporal Join 的格式)

    SELECT * FROM datagen 
    LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  
    ON datagen.userid = dim_mysql.id;
    
    userid	time 			     userid 	name		age
     9 2022-11-01 16:22:49.669   (NULL)      (NULL)		(NULL)
     3 2022-11-01 16:22:49.670        3      zhangsan	33
     4 2022-11-01 16:22:50.669        4      lisi		44	
     2 2022-11-01 16:22:50.670        2      ling		18
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    4.4、Temporal Join 与 Lookup Join的区别 ?

    五、Array Expansion (列传行)

    CREATE TABLE show_log (
        log_id BIGINT,
        show_params as  ARRAY['a', 'b', 'c'] 
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.log_id.min' = '1',
      'fields.log_id.max' = '2'
    );
    
    SELECT
        log_id,
        show_param
    FROM show_log
    -- array 炸开语法
    CROSS JOIN UNNEST(show_params) AS t (show_param)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    六、Table Function

    应用场景(支持 Batch\Streaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑。

    ⭐ Table Function 使用分类:

    • ⭐Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 行转为 0 行,这行数据直接被丢弃
    • ⭐ Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值

    6.1、INNER JOIN

    SELECT order_id, res
    FROM Orders,
    LATERAL TABLE(table_func(order_id)) t(res)
    
    • 1
    • 2
    • 3

    6.2、LEFT OUTER JOIN

    SELECT order_id, res
    FROM Orders
    LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
      ON TRUE
    
    • 1
    • 2
    • 3
    • 4

    参考:
    https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins
    Flink 对线面试官(五):2w 字详述双流 Join 3 种解决方案 + 2 种优化方案
    Flink SQL 知其所以然(二十六):2w 字详述 Join 操作
    Flink Temporal Join Versioned Table Demo

  • 相关阅读:
    电脑重装系统后Win11扬声器无插座信息如何解决?
    [Rust笔记] 代码写明 Rust 中的泛型型变
    Vue 正计时器组件
    【leetcode】【周赛】第 301 场
    Swift 中的动态成员查找
    智能手术机器人起源及应用(一)
    剑指 Offer 53 - II. 0~n-1中缺失的数字
    Triples of Cows
    Docker数据卷和网络管理 下
    D - Magic Gems
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127587815