• Flink SQL --维表join



    本文基于flink-1.13.6

    一、维表 join 介绍

    维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中(比如MySQL,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表rowtime所对应的的维表快照(事件时间语义)

    二、Temporal Table Join

    使用语法

    SELECT column-names
    FROM table1  [AS <alias1>]
    [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
    ON table1.column-name1 = table2.key-name1
    
    • 1
    • 2
    • 3
    • 4

    注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。

    样例

    SELECT
      o.amout, o.currency, r.rate, o.amount * r.rate
    FROM
      Orders AS o
      JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
      ON r.currency = o.currency
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    使用说明
    仅支持Blink planner
    仅支持SQL,目前不支持Table API
    目前不支持基于事件时间(event time)的temporal table join
    维表可能会不断变化,JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化
    维表和维表不能进行JOIN
    维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件

    三、维表Join案例

    3.1、背景

    Kafka中有一份用户行为数据,包括pv,buy,cart,fav行为;MySQL中有一份省份区域的维表数据。现将两种表进行JOIN,统计每个区域的购买行为数量。

    3.2、实践

    3.2.1、维表存储在MySQL中

    -- mysql
    CREATE TABLE `dim_province` (
      `province_id` bigint(20) DEFAULT NULL,
      `province_name` varchar(50) DEFAULT NULL,
      `region_name` varchar(50) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    insert into dim_province (province_id, province_name, region_name) 
    values 
    (1, "山东", "华东"),
    (2, "广东", "华南"),
    (3, "河南", "华中"),
    (4, "北京", "华北"),
    (5, "新疆", "西北");
    
    -- flinksql  维度表
    CREATE TABLE dim_province (
        province_id BIGINT,  -- 省份id
        province_name  VARCHAR, -- 省份名称
     region_name VARCHAR -- 区域名称
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
        'connector.table' = 'dim_province',
        'connector.username' = 'root',
        'connector.password' = '123456',
        'connector.lookup.cache.max-rows' = '5000',
        'connector.lookup.cache.ttl' = '10min'
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    注意: 加上useUnicode=true&characterEncoding=UTF-8,否则 flinksql 写到 mysql 产生乱码

    3.2.2、事实数据存在 kafka

    事实表存储在kafka中,数据为用户点击行为,格式为csv,具体数据样例如下:

    1,1002,10002,fav,2022-10-27 16:25:00,2
    1,1004,10002,cart,2022-10-27 16:25:01,3
    6,1004,10004,pv,2022-10-27 16:25:01,3
    3,1002,10001,cart,2022-10-27 16:25:01,1
    4,1001,10004,fav,2022-10-27 16:25:01,4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    创建kafka数据源表,如下:

    CREATE TABLE user_behavior (
        user_id VARCHAR,
        item_id VARCHAR,
        category_id VARCHAR,
        behavior VARCHAR,
        `ts` timestamp(3),
    	 province_id INT,   -- 用户所在的省份id
    	`proctime` as PROCTIME(),   -- 处理时间列
        WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
    ) WITH (
        'connector' = 'kafka', -- 使用 kafka connector
        'topic' = 'user_behavior',  -- kafka topic
        'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
    	'properties.bootstrap.servers' = 'chb1:9092',
    	'properties.group.id' = 'testGroup',
    	'format' = 'csv'
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.2.3、创建MySQL的结果表,表示区域销量

    -- mysql
     CREATE TABLE top_region (
        region_name varchar(50),  -- 区域名称
        buy_cnt BIGINT  -- 销量
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- flinksql
    CREATE TABLE region_sales_sink (
        region_name STRING,  -- 区域名称
        buy_cnt BIGINT,  -- 销量
    	proctime as PROCTIME()
    ) WITH (
    
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
        'connector.table' = 'top_region', -- MySQL中的待插入数据的表
        'connector.username' = 'root',
        'connector.password' = '123456',
        'connector.write.flush.interval' = '1s'
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.2.4、用户行为数据与省份维表数据 join

    CREATE VIEW user_behavior_detail AS
    SELECT
      u.user_id, 
      u.item_id,
      u.category_id,
      u.behavior,  
      p.province_name,
      p.region_name
    FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
    ON u.province_id = p.province_id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.2.5、计算区域的销量,并将计算结果写入MySQL

    -- 结果
    INSERT INTO region_sales_sink
    SELECT 
      region_name,
      COUNT(*) buy_cnt
    FROM user_behavior_detail
    WHERE behavior = 'buy'
    GROUP BY region_name;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    参考:
    Flink SQL— CREATE语句
    Flink Temporal Join Versioned Table Demo

  • 相关阅读:
    day-06 多进程服务器端 -- 进程间通信
    [附源码]Python计算机毕业设计Django企业售后服务管理系统
    【优化调度】基于NSGAII算法的车辆充电调度策略研究含Matlab代码
    堆排序算法
    C语言 const详解
    A股风格因子看板 (2023.11 第11期)
    Blazor实战——Known框架增删改查导
    Ajax跨域与封装
    MFC Windows 程序设计[127]之菜单初体验
    冥想第九百七十七天
  • 原文地址:https://blog.csdn.net/wuxintdrh/article/details/127555641