• FlinkSql详解


    动态表

    和动态表对应的是静态表——常规的数据库中的表或批处理中的表等,其在查询时数据不再变化。动态表是随时间变化的,即使是在查询的时候。怎么理解了?流上的数据是源源不断的,一条数据的到来会触发一次查询,这次查询在执行时还有下一条数据到来,对表本身数据是在变化的,所以称为动态表。

    连续查询 

            物化视图其实就是⼀条 SQL 查询,就像常规的虚拟视图 VIEW ⼀样。但与虚拟视图不同是,物化视图会缓存查询的结果,因此在请求访问视图时不需要对查询进⾏重新计算,可以直接获取物化视图的结果,⼩伙伴萌可以认为 物化视图其实就是把结果缓存了下来。
            映射到我们的流任务中,输⼊、处理逻辑、输出这⼀套流程也是⼀个物化视图的概念。相⽐批处理来说,流处理中,我们的数据源表的数据是源源不断的。那么从输⼊、处理、输出的整个物化视图的维护流程也必须是实时的。
            因此我们就需要引⼊⼀种 实时视图维护(Eager View Maintenance 的技术去做到:⼀旦更新了物化视图的数 据源表就⽴即更新视图的结果,从⽽保证输出的结果也是最新的。
    这种 实时视图维护(Eager View Maintenance 的技术就叫做 连续查询
    注意:
    1. ⭐ 连续查询(Continuous Query) 不断的消费动态输⼊表的的数据,不断的更新动态结果表的数
    据。
    2. ⭐ 连续查询(Continuous Query) 的产出的结果 = 批处理模式在输⼊表的上执⾏的相同查询的结 果。相同的 SQL,对应于同⼀个输⼊数据,虽然执⾏⽅式不同,但是流处理和批处理的结果是永远都 会相同的。

    Flink SQL 中的执⾏流程总共包含了几个步骤

    1. ⭐ 第⼀步:获取数据

    将数据输⼊流转换为 SQL 中的动态输⼊表。这⾥的转化其实就是指将输⼊流映射(绑定)为⼀ 个动态输⼊表。上图虽然分开画了,但是可以理解为⼀个东⻄。
    1. CREATE TEMPORARY TABLE kafka_user_path (
    2. `user` string,
    3. `url` string
    4. `cTime` BIGINT,
    5. `my_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    6. `my_date` AS CAST(`my_time` AS DATE)
    7. ) WITH (
    8. 'connector' = 'kafka',
    9. 'properties.bootstrap.servers' = '****',
    10. 'properties.group.id' = '***',
    11. 'topic' = '***',
    12. 'scan.startup.mode' = 'earliest-offset',
    13. 'format' = 'json'
    14. );

    2. ⭐ 第⼆步:处理数据

    在动态输⼊表上执⾏⼀个连续查询,然后⽣成⼀个新的动态结果表。
    (1)普通聚合函数
    1. CREATE TEMPORARY VIEW view_user_path AS
    2. SELECT user
    3. ,count(url) as cnt
    4. FROM kafka_user_path
    5. group by user;

    当查询开始,clicks 表(左侧)是空的。
    1. ⭐ 当第⼀⾏数据[Mary,./home] 输⼊后,会计算结果 [Mary, 1] 插⼊ 结果表
    2. ⭐ 当第⼆⾏数据[Bob, ./cart] 输⼊后,会计算结果 [Bob, 1],并插⼊ 结果表。
    3. ⭐ 当第三⾏数据[Mary, ./prod?id=1]输⼊后,会计算出[Mary, 2](user 为 Mary 的数据总共来过两条,所以为 2),并 更新(update )结果表 ,[Mary, 1] 更新成 [Mary, 2]。
    4. ⭐ 最后,当第四⾏数据[Liz, ./home] 输⼊后,会计算结果 [Liz, 1],并插⼊结果表。
    (2)窗口聚合函数
    1. select user
    2. ,tumble_end(cTime, interval '1' hours) as endT
    3. ,count(url) as cnt
    4. from kafka_user_path
    5. group by user
    6. ,tumble(cTime, interval '1' hours)

     我们的滚动窗⼝的步⻓为1⼩时,即时间粒度上⾯的分组为1⼩时。其中时间戳在 12:00:00 - 12:59:59 之间有四条数据。13:00:00 - 13:59:59 有三条数据。14:00:00 - 14:59:59 之间有四条数据。

    1. ⭐ 当 12:00:00 - 12:59:59 数据输⼊之后,1 ⼩时的窗⼝,连续查询(Continuous Query)计算的结果如右图所示,将 [Mary, 3],[Bob, 1] 插⼊(insert)结果表 。

    2. ⭐ 当 13:00:00 - 13:59:59 数据输⼊之后,1 ⼩时的窗⼝,连续查询(Continuous Query)计算的结果如右图所示,将 [Bob, 1],[Liz, 2] 插⼊(insert)结果表 。

    3. ⭐ 当 14:00:00 - 14:59:59 数据输⼊之后,1 ⼩时的窗⼝,连续查询(Continuous Query)计算的结果如右图所示,将 [Mary, 1],[Bob, 2],[Liz, 1] 插⼊(insert)结果表 。

    ⽽这个查询只有 插⼊(insert)结果表 这个⾏为。

    3. ⭐ 第三步:⽣成的动态结果表被转换回数据输出流。

            连续查询(Continuous Query)的输出结果表是⼀个 changelog。其可以像普通数据库表⼀样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是⼀个只有⼀⾏、不断更新 changelog 表,也可能是⼀个 insert-only 的 changelog 表。

    1. CREATE TEMPORARY TABLE `mysql_user_path` (
    2. `user` string COMMENT '用户',
    3. `endT` string COMMENT '时间窗口',
    4. `cnt` BIGINT COMMENT 'url个数',
    5. PRIMARY KEY (`user`,`endT`,`cnt`) NOT ENFORCED
    6. ) WITH (
    7. 'connector' = 'rds',
    8. 'userName' = '****',
    9. 'tableName' = '***',
    10. 'batchSize' = '4096',
    11. 'url' = '*****'
    12. );
    13. BEGIN STATEMENT SET;
    14. insert into mysql_user_path select user,endT,cnt from view_user_path;
    15. END;
    注释:像这样 在输出结果表中设置了 PRIMARY KEY,那么当PRIMARY KEY相同的时候在输出的结果表中就会发生update的操作。

    FlinkSql中输出流的分类

            动态表可像传统表一样被INSERT、UPDATE、DELETE修改。可能只有一行的表被持续更新;或者是没有UPDATE、DELETE更改的只插入表。当将动态表转化为流或将其写入外部系统,这些更改(修改)需要被编码,Flink的Table API & SQL支持三种方式编码动态表上的更改(修改)。

    Append-only流:仅使用INSERT更改进行修改的动态表可通过发出插入的行来转化为流。

    Retract流:Retract流包含两种类型消息(add消息和retract消息),通过将动态表的INSERT更改作为add消息、将DELETE更改作为retract消息、将UPDATE更改分解为旧记录的retract消息和新记录的add消息。

    Upsert流:Upsert流包含两种类型消息(upset消息和delete消息),动态表转化为upsert流需要有主键(可复合),具有主键的动态表通过将INSERT、UPDATE更改编码为upset消息,将DELETE更改编码为delete消息。upset流与retract流主要区别是UPDATE更改使用单一消息(主键)进行编码,因此效率更高,但要求外部系统支持主键

  • 相关阅读:
    【计网】传输层
    Unity 6 是下一个 LTS 版本即将发布
    把yes/no取消掉
    如何在Web前端实现CAD图文字全文搜索功能之技术分享
    软件工程与计算总结(九)软件体系结构基础
    逐步理解深度信念网络
    Windows10系统安装VMWare
    Python 3速查表
    关于Flask高级_Session有效期设置方法
    Java宝典-抽象类和接口
  • 原文地址:https://blog.csdn.net/qq_42456324/article/details/127412633