Flink 支持了非常多的数据 Join 方式,主要包括以下三种:
细分 Flink SQL 支持的 Join:
关于 Join 的场景就太多太多了,在离线数仓开发中,Join 是最常用的算子之一了。
比如:
很多离线数仓的小伙伴会说,Join 这玩意非常简单啊,Hive SQL 简简单单的写个关联 SQL 就行啊。
是的,在批式计算中,Join 的左右表都是 “全集”,所以在全集上面做关联操作是非常简单的,比如目前离线中的技术方案有 sort-merge、hash join 等,这些方案都非常成熟了。
但是,在流式计算中,左右表的数据都是无界的,而且是实时到来的。这就会引起流式计算中的 2 个问题 + 大数据中的 2 个核心问题(我们以 A left join B 举例):
流式计算中的 2 个问题:
从上面两个问题也可以得出大数据中的 2 个核心问题:
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id
案例:
-- 曝光日志数据
CREATE TABLE show_log(
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '100'
);
-- 点击日志数据
CREATE TABLE click_log(
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_showclick_result (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
-- 流的 INNER JOIN,条件为 log_id
INSERT INTO sink_showclick_result
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log
INNER JOIN click_log ON show_log.log_id = click_log.log_id;
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id
SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id
SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id
案例:
-- 流的 LEFT JOIN,条件为 log_id
INSERT INTO sink_showclick_result
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;
+I[85, a, null, null]
+I[58, 0, null, null]
+I[59, 7, null, null]
+I[10, d, 10, 7]
+I[10, d, 10, 8]
+I[10, d, 10, e]
+I[10, d, 10, c]
实时 Regular Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。
Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。
流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。
Interval Joins datastream api 实现
双流驱动

我们看到,流A的每一个元素,都会和流B的一定时间范围的元素进行JOIN。
其中,上界和下界可以是负数,也可以是整数。Interval join目前只支持事件时间
为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以博主理解 Interval Join 就是用于消灭回撤流的。
-- 曝光日志数据
CREATE TABLE show_log(
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
-- 点击日志数据
CREATE TABLE click_log(
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_showclick_result (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
-- 流的 JOIN,条件为 log_id
INSERT INTO sink_showclick_result
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log
JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '4' HOUR AND click_log.row_time;
注意:
与regular join相比较,interval join只支持带有时间属性的append-only流。由于时间属性是单调递增的,Flink可以在不影响结果正确性的情况下从其状态中删除旧值。
单流驱动
Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 join 这个 拉链快照表 的 join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table,使用一个明细表去 join 这个 Versioned Table 的 join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 primary key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 join。
时态表 Join 意味着对任意表(左输入/探针侧)去关联一个时态表(右输入/构建侧)的版本,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。
Flink 使用了 SQL:2011 标准引入的时态表 Join 语法,时态表 Join 的语法如下:
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
基于事件时间的时态表 join 使用(左侧输入/探针侧) 的 事件时间 去关联(右侧输入/构建侧) 版本表 对应的版本。 基于事件时间的时态表 join 仅支持关版本表或版本视图,版本表或版本视图只能是一个 changelog 流。 但是,Flink 支持将 append-only 流转换成 changelog 流,因此版本表也可以来自一个 append-only 流。 查看声明版本视图 获取更多的信息关于如何声明一张来自 append-only 流的版本表。
将事件时间作为时间属性时,可将 过去 时间属性与时态表一起使用。这允许对两个表中在相同时间点的记录执行 Join 操作。 与基于处理时间的时态 Join 相比,时态表不仅将构建侧记录的最新版本(是否最新由所定义的主键所决定)保存在 state 中,同时也会存储自上一个 watermarks 以来的所有版本(按时间区分)。
例如,在探针侧表新插入一条事件时间时间为 12:30:00 的记录,它将和构建侧表时间点为 12:30:00 的版本根据时态表的概念进行 Join 运算。 因此,新插入的记录仅与时间戳小于等于 12:30:00 的记录进行 Join 计算(由主键决定哪些时间点的数据将参与计算)。
通过定义事件时间,watermarks 允许 Join 运算不断向前滚动,丢弃不再需要的构建侧快照。因为不再需要时间戳更低或相等的记录。
下面的例子展示了订单流关联产品表这个场景举例,orders 表包含了来自 Kafka 的实时订单流,product_changelog 表来自数据库表 products 的 changelog , 产品的价格在数据库表 products 中是随时间实时变化的。
SELECT * FROM product_changelog;
(changelog kind) update_time product_name price
================= =========== ============ =====
+(INSERT) 00:01:00 scooter 11.11
+(INSERT) 00:02:00 basketball 23.11
-(UPDATE_BEFORE) 12:00:00 scooter 11.11
+(UPDATE_AFTER) 12:00:00 scooter 12.99 <= 产品 `scooter` 在 `12:00:00` 时涨价到了 `12.99`
-(UPDATE_BEFORE) 12:00:00 basketball 23.11
+(UPDATE_AFTER) 12:00:00 basketball 19.99 <= 产品 `basketball` 在 `12:00:00` 时降价到了 `19.99`
-(DELETE) 18:00:00 scooter 12.99 <= 产品 `scooter` 在 `18:00:00` 从数据库表中删除
如果我们想输出 product_changelog 表在 10:00:00 对应的版本,表的内容如下所示:
update_time product_id product_name price
=========== ========== ============ =====
00:01:00 p_001 scooter 11.11
00:02:00 p_002 basketball 23.11
如果我们想输出 product_changelog 表在 13:00:00 对应的版本,表的内容如下所示:
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
通过基于事件时间的时态表 join, 我们可以 join 上版本表中的不同版本:
CREATE TABLE orders (
order_id STRING,
product_id STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time -- defines the necessary event time
) WITH (
...
);
-- 设置会话的时间区间, changelog 里的数据库操作时间是以 epoch 开始的毫秒数存储的,
-- 在从毫秒转化为时间戳时,Flink SQL 会使用会话的时间区间
-- 因此,请根据 changelog 中的数据库操作时间设置合适的时间区间
SET table.local-time-zone=UTC;
-- 声明一张版本表
CREATE TABLE product_changelog (
product_id STRING,
product_name STRING,
product_price DECIMAL(10, 4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- 注意:自动从毫秒数转为时间戳
PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key constraint
WATERMARK FOR update_time AS update_time -- (2) defines the event time by watermark
) WITH (
'connector' = 'kafka',
'topic' = 'products',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
-- 基于事件时间的时态表 Join
SELECT
order_id,
order_time,
product_name,
product_time,
price
FROM orders AS O
LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P
ON O.product_id = P.product_id;
order_id order_time product_name product_time price
======== ========== ============ ============ =====
o_001 00:01:00 scooter 00:01:00 11.11
o_002 00:03:00 basketball 00:02:00 23.11
o_003 12:00:00 scooter 12:00:00 12.99
o_004 12:00:00 basketball 12:00:00 19.99
o_005 18:00:00 NULL NULL NULL
基于事件时间的时态表 Join 通常用在通过 changelog 丰富流上数据的场景。
注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。
注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。
基于处理时间的时态表 join 使用任意表 (左侧输入/探针侧) 的 处理时间 去关联 (右侧输入/构建侧) 版本表(Versioned table) 的最新版本. 基于处理时间的时态表 join 当前只支持关联 版本表或版本视图,且支持 版本表或版本视图当前只能是 append-only 流。
如果将处理时间作为时间属性,过去 时间属性将无法与时态表一起使用。根据定义,处理时间总会是当前时间戳。 因此,关联时态表的调用将始终返回底层表的最新已知版本,并且底层表中的任何更新也将立即覆盖当前值。
可以将处理时间的时态 Join 视作简单的 HashMap ,HashMap 中存储来自构建侧的所有记录。 当来自构建侧的新插入的记录与旧值具有相同的 Key 时,旧值会被覆盖。 探针侧的每条记录将总会根据 HashMap 的最新/当前状态来计算。
接下来的示例展示了订单流 Orders 该如何与实时变化的汇率表 Lates 进行基于处理时间的 时态 Join 操作,LatestRates 总是表示 HBase 表 Rates 的最新内容。
1、LatestRates
10:15> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:30> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:52> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 116 <==== changed from 114 to 116
Yen 1
2、订单表
SELECT * FROM Orders;
amount currency
====== =========
2 Euro <== arrived at time 10:15
1 US Dollar <== arrived at time 10:30
2 Euro <== arrived at time 10:52
3、汇率转换
amount currency rate amount*rate
====== ========= ======= ============
2 Euro 114 228 <== arrived at time 10:15
1 US Dollar 102 102 <== arrived at time 10:30
2 Euro 116 232 <== arrived at time 10:52
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r -- proctime 表示基于处理时间的join
ON r.currency = o.currency
维表join案例 https://chbxw.blog.csdn.net/article/details/127555641
https://zhuanlan.zhihu.com/p/214053444
https://www.cnblogs.com/Springmoon-venn/p/14862865.html
Lookup Join 其实就是 维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。
我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,Mysql,HBase 中,这就是 Lookup Join 的由来
接下来我们讲一个小例子,首先定义一下stream source,我们使用flink 1.11提供的datagen来生成数据。
我们来模拟生成用户的数据,这里只生成的用户的id,范围在1-100之间。
CREATE TABLE datagen (
userid int,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='2',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='10'
)
然后再创建一个mysql维表信息:
-- flink sql
CREATE TABLE dim_mysql (
id int,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://chb1:3306/chb_test',
'table-name' = 'userinfo',
'username' = 'root',
'password' = '123456'
)
-- mysql
CREATE TABLE userinfo (
id int,
name varchar(30),
age int,
PRIMARY KEY (id)
)
insert into userinfo (id, name) values (1, 'chb', 20), (2, 'ling',18), (3, 'zhangsan', 33), (4, 'lisi',44);
最后执行sql查询,流表关联维表:(注意 使用 Processing Time Temporal Join 的格式)
SELECT * FROM datagen
LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime
ON datagen.userid = dim_mysql.id;
userid time userid name age
9 2022-11-01 16:22:49.669 (NULL) (NULL) (NULL)
3 2022-11-01 16:22:49.670 3 zhangsan 33
4 2022-11-01 16:22:50.669 4 lisi 44
2 2022-11-01 16:22:50.670 2 ling 18
CREATE TABLE show_log (
log_id BIGINT,
show_params as ARRAY['a', 'b', 'c']
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '2'
);
SELECT
log_id,
show_param
FROM show_log
-- array 炸开语法
CROSS JOIN UNNEST(show_params) AS t (show_param)
应用场景(支持 Batch\Streaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑。
⭐ Table Function 使用分类:
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins
Flink 对线面试官(五):2w 字详述双流 Join 3 种解决方案 + 2 种优化方案
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作
Flink Temporal Join Versioned Table Demo