• 【SQL篇】一、Flink动态表与流的关系以及DDL语法



    在这里插入图片描述

    1、启动SQL客户端

    启动Flink(基于yarn-session模式为例):

    /opt/module/flink-1.17.0/bin/yarn-session.sh -d
    
    • 1

    再启动sql的客户端:

    /opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session
    
    • 1

    简单看下:

    show databases;
    
    • 1

    2、SQL客户端常用配置

    设置结果的显示模式,默认table,还可以设置为tableau、changelog

    SET sql-client.execution.result-mode=changelog;
    
    • 1

    设置执行环境,默认streaming,也可以设置batch

    SET execution.runtime-mode=streaming;
    
    • 1

    设置默认并行度:

    SET parallelism.default=1;
    
    • 1

    设置状态TTL:

    SET table.exec.state.ttl=1000;
    
    • 1

    通过SQL文件初始化,可以发现,exit退出客户端时,刚创建的库表都被清空了,这个SQL初始化文件就是在启动客户端时你想执行的SQL语句

    # 创建SQL文件
    
    vim conf/sql-client-init.sql
    
    SET sql-client.execution.result-mode=tableau;
    CREATE DATABASE mydatabase;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 启动时,-i指定SQL文件
    
    /opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
    
    • 1
    • 2
    • 3

    3、动态表和持续查询

    和MySQL等关系型表不同的是,无限流下,会有源源不断的数据过来进入表中,即动态表,来一条数据,往表中插入一条数据。对应的,想获取最新结果就要写条SQL去不间断的查询,即持续查询(每次数据到来都会触发查询操作),持续查询的结果也是一个动态表。

    关系型表流处理的动态表
    处理的数据对象字段元组的有界集合字段元组的无限序列
    查询时对数据的访问可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入
    查询终止条件生成固定大小的结果集后终止永不停止, 根据持续收到的数据不断更新查询结果

    在这里插入图片描述

    如图,持续查询的流程为:

    • 流(stream)被转换为动态表(dynamic table)
    • 对动态表进行持续查询(continuous query),生成新的动态表
    • 生成的动态表被转换成流

    如此,就通过执行SQL实现了对数据流的处理。

    4、将流转为动态表

    把流看作一张表,来一条数据,insert一次,比如有个记录用户点击事件的无限流:

    在这里插入图片描述

    5、用SQL持续查询

    代码中定义一条查询SQL:

    Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");
    
    • 1

    此时,结果表(动态),可能是简单的insert,如Bob这条数据,也可能是对旧数据的更新update,如Alice,这就是更新查询。 此时,结果表转DataStream调用toChangelogStream()方法。
    在这里插入图片描述

    修改查询SQL,使用TUMBLE加一个开窗,每个窗口触发时,输出结果,此时对结果表就只有insert追加数据,没有update,即追加查询。

    在这里插入图片描述

    6、动态表转为流

    仅追加(Append-only)流
    
    • 1

    动态表仅仅通过insert来修改,转为流时,对应一个仅追加的流,流中的每条数据,就是动态表的每行数据。

    撤回(Retract)流
    
    • 1

    流中有添加消息add和撤回消息retract两种,对应表中:

    • insert就是add消息
    • delete就是retract
    • update就是被改行的retract+新结果的add

    在这里插入图片描述

    更新插入(Upsert)流
    
    • 1

    流中有更新插入消息upsert和删除消息delete两种,对应表中:

    • update和insert是upsert消息
    • delete为delete消息

    在这里插入图片描述

    最后,注意,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流两种。

    7、时间属性

    在表中加个时间字段,数据类型为TimeStamp,分为事件时间和处理时间。事件时间通过watermark语句来定义:

    CREATE TABLE EventTable(
      user STRING,
      url STRING,
      ts TIMESTAMP(3),
      WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
      ...
    );
    
    # TIMESTAMP后的3为精确度
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    以上,ts字段为事件时间属性,且基于ts设置5s的水位线延迟,注意,延迟秒数5必须加单引号。时间戳类型需要转为秒或者毫秒时,可:

    # ...
    ts BIGINT,
    time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
    # ...
    3即精确到毫秒
    
    • 1
    • 2
    • 3
    • 4
    • 5

    定义处理时间属性用procTime函数:

    CREATE TABLE EventTable(
      user STRING,
      url STRING,
      ts AS PROCTIME()
    ) WITH (
      ...
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    8、DDL-数据库相关

    建库:

    CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
      [COMMENT database_comment]
      WITH (key1=val1, key2=val2, ...)
    
    
    • 1
    • 2
    • 3
    • 4

    查询所有库:

    SHOW DATABASES
    
    • 1

    查当前库:

    SHOW CURRENT DATABASE
    
    • 1

    修改库的某些属性:

    ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
    
    • 1

    删库:

    DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
    
    • 1

    注意,

    • RESTRICT:删除非空数据库会触发异常。默认启用
    • CASCADE:删除非空数据库也会删除所有相关的表和函数,慎用

    切换当前库:

    USE database_name;
    
    • 1

    9、DDL-表相关

    建表:

    CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
      # 字段
      (
        { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
        # 定义watermark
        [ <watermark_definition> ]
        [ <table_constraint> ][ , ...n]
      )   
      # 注释  
      [COMMENT table_comment]
      # 分区
      [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
      # Flink特色
      WITH (key1=val1, key2=val2, ...)
      [ LIKE source_table [( <like_options> )] | AS select_query ]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    关于表中的字段:physical_column就是常规列。metadata_column是元数据列,可访问到数据源本身的一些元数据,必须加METADATA关键字标识,如:读取数据写入Kafka时,Kafka引擎给数据打上的时间戳标记:

    CREATE TABLE MyTable (
      `user_id` BIGINT,
      `name` STRING,
      `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  # !!!
    ) WITH (
      'connector' = 'kafka'
      ...
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    自定义的列名称和 Connector 中定义 metadata 字段的名称一样时,后面的FROM省略:

    CREATE TABLE MyTable (
    `user_id` BIGINT,
    `name` STRING,
    `timestamp` TIMESTAMP_LTZ(3) METADATA  # !!!
    ) WITH (
    'connector' = 'kafka'
    ...
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致时,会自动强转,因此这两个类型必须可以强转:

    CREATE TABLE MyTable (
    `user_id` BIGINT,
    `name` STRING,
    -- 将时间戳强转为 BIGINT
    `timestamp` BIGINT METADATA
    ) WITH (
    'connector' = 'kafka'
    ...
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    默认metadata_column列可读可写,加VIRTUAL表示只读:

    CREATE TABLE MyTable (
      `timestamp` BIGINT METADATA, 
      `offset` BIGINT METADATA VIRTUAL,  # !!!!
      `user_id` BIGINT,
      `name` STRING,
    ) WITH (
      'connector' = 'kafka'
      ...
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    computed_column即计算列,把几列的计算结果做为新列,这在关系型SQL中一般在查询语句中完成,而不存成一个新列。

    CREATE TABLE MyTable (
      `user_id` BIGINT,
      `price` DOUBLE,
      `quantity` DOUBLE,
      `cost` AS price * quanitity   # !!!
    ) WITH (
      'connector' = 'kafka'
      ...
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    主键的定义,只支持 not enforced:

    CREATE TABLE MyTable (
    `user_id` BIGINT,
    `name` STRING,
    PARYMARY KEY(user_id) not enforced   # !!!
    ) WITH (
    'connector' = 'kafka'
    ...
    );
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    with子句,用于指定这个表相关的外部系统的相关配置,如Kafka:

    CREATE TABLE KafkaTable (
    `user_id` BIGINT,
    `name` STRING,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'csv'
    )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    like子句,即在现有表的基础上,创建另一种表:

    CREATE TABLE Orders (
        `user` BIGINT,
        product STRING,
        order_time TIMESTAMP(3)
    ) WITH ( 
        'connector' = 'kafka',
        'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TABLE Orders_with_watermark (
        -- Add watermark definition
        WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
    ) WITH (
        -- Overwrite the startup-mode
        'scan.startup.mode' = 'latest-offset'
    )
    LIKE Orders;  # !!!!
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    举例:新表中的value字段加偏引号是因为value和关键字冲突了

    CREATE TABLE test(
        id INT, 
        ts BIGINT, 
        vc INT
    ) WITH (
    'connector' = 'print'
    );
    
    CREATE TABLE test1 (
        `value` STRING
    )
    LIKE test;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    create-table-as-select,即CTAS语句,通过查询结果创建表:

    CREATE TABLE my_ctas_table
    WITH (
        'connector' = 'kafka',
        ...
    )
    AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
    
    # 注意此时不能自己来定义列
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    查所有表:

    SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
    
    • 1

    查某张表信息:

    { DESCRIBE | DESC } [catalog_name.][db_name.]table_name
    
    • 1

    修改表名:

    ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
    
    • 1

    修改表属性:

    ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
    
    • 1

    删表:

    DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
    
    • 1
  • 相关阅读:
    JAVA中如何写注释
    一篇博客学会一个Point 1 | 条件概率 Conditional probability
    一阶滤波器(一阶巴特沃斯滤波器)
    【0day】用友CRM系统目录遍历漏洞学习
    LLMs之HFKR:HFKR(基于大语言模型实现异构知识融合的推荐算法)的简介、原理、性能、实现步骤、案例应用之详细攻略
    GitHub操作
    深度学习基础:超分辨率网络整理之EDVR网络
    顺序的分数 Ordered Fractions
    安卓开发笔记——ListView加载性能优化ViewHolder
    聊一聊对一个 C# 商业程序的反反调试
  • 原文地址:https://blog.csdn.net/llg___/article/details/134240821