• 17.Table Api基础概念讲解


    概念

    Table API 和 SQL 集成在一个联合 API 中。该 API 的核心概念是Table用作查询的输入和输出。
    Table Api支持需要maven如下:官网 Maven介绍

    
      org.apache.flink
      flink-table-api-java-bridge_2.11
      1.13.6
      provided
    
    下面两个是 再本地IDEA上跑Table Api需要添加的依赖
    
      org.apache.flink
      flink-table-planner-blink_2.11
      1.13.6
      provided
    
    
      org.apache.flink
      flink-streaming-scala_2.11
      1.13.6
      provided
    
    
    运行的实时记得注释掉:provided,
    打包的时候记得加上provided
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    1.Table API 和 DataStream Api关系

    先说结论: DataStream API 是传统的流数据,Table Api是对DataStream
    Api大部分功能的抽象,所谓抽象就是在DataStream Api的基础上又包装了一层,但是注意并未把DataStream
    Api所有的功能都进行了抽象化,有一些近底层逻辑还是需要用DataStream Api.(比如算子状态的管理)

    Both Table API and DataStream API are equally important when it comes to defining a data processing pipeline.
    DataStream API在相对低级的命令式编程API中提供流处理的原语(即时间、状态和数据流管理)。Table API抽象了许多内部控件,并提供了结构化和声明式API。(结构化也就意味着可以用sql来进行查询操作,明显更方便)

    所以说Table API and DataStream API
    二者不能分开来看待,它们关系密切。只不过各自的侧重点不同,DataStream Api 是传统的流处理模式,一些复杂的功能都有,
    而Table Api在DataStream Api的基础上进行了进一步的抽象化, 使得DataStream 流数据拥有了元数据信息,
    而这些元数据信息是Table Api可以用sql查询的基础。 因此DataStream Api 和Table
    Api是可以互相转化的,而在实际开发中二者往往结合使用。 如果用到一些复杂的功能往往切换到DataStream
    Api处理,而对接对外数据库往往用Table Api 因为代码量更少且更清晰。当然Table Api能实现的功能DataStream
    Api都能实现, DataStream Api的算计基本上可以实现所有的功能。

    1.1 问题1:既然DataStream Api已经很完善了,那么为什么还要Table Api呢?

    1.为了好用效率也高。 同样的需求DataStream Api需要四十行代码,
    那有可能Table Api一个sql就搞定了。3转换为Table
    2.照顾到快速开发,我们可能只需要学sql友好的Table Api就行了

    1.2 什么时候用DataStream Api,什么时候用Table Api?

    你想用谁用谁就是了,哪那么多问题哈哈。 不过一般来说在连接mysql, Hive的时候一般用Table Api因为方便代码易于理解, 一些sql语法支持的逻辑用Table Api就行。 总之优先用Table Api. 不过涉及到水位线,时间时间什么的还是要用DataStream Api,因为Table Api不支持。 而且二者是可以无缝转换的,需要怎么用就在代码中直接转化即可。 举个例子,连接mysql 用Table Api, 连接之后设置水位线窗口的时候 转化成DataStream Api去设置, 设置好水位线之后再转化为Table Api去查询。

    2.DataStream Api和Table Api流的转化

    主要涉及到两个方法:fromDataStream 和toDataStream

    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class Convert {
        public static void main(String[] args) throws Exception {
    
            // Table Api 环境和DataStreamApi环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            // create a DataStream Api支持的 流数据
            DataStream dataStream = env.fromElements("Alice", "Bob", "John");
    
            // 将DataStream  Api支持的流数据 转成 Table Api支持的流数据
            Table inputTable = tableEnv.fromDataStream(dataStream);
    
            // register the Table object as a view and query it 用table对象inputTable 建立一个临时表InputTableTemp
            tableEnv.createTemporaryView("InputTableTemp", inputTable);
            //查询临时表InputTableTemp
            Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTableTemp");
    
            // 将Table Api支持的流数据 转换成 DataStream  Api支持的流数据
            DataStream resultStream = tableEnv.toDataStream(resultTable);
            
            resultStream.print();
            env.execute();
            
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    2.Table 环境的两种创建方式

    2.1 第一种:直接创建

    EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .inStreamingMode()
        //.inBatchMode()
        .build();
    
    TableEnvironment tEnv = TableEnvironment.create(settings);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.2 第二种:基于流环境创建

    这种创建很常见,意在建立DataStream API和 Table API 的连接,两种数据结构可以互相转换交互。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
    
    • 1
    • 2
    • 3

    2.3 TableEnvironment 介绍

    TableEnvironment是表API和SQL集成的入口点,它主要负责:

    1. 注册桥接的catlogs
    2. 在内部catlog注册table
    3. 加载可插拔模块
    4. 执行 SQL 查询
    5. 注册用户定义的(标量、表或聚合)函数
    6. DataStream和 Table的流转换

    2.4 临时表和永久表的概念

    表可以是临时的,并且与单个 Flink 会话的生命周期相关联,也可以是永久的独立于会话而存在。
    临时表也称为视图,
    永久表一个对应的重要概念是:Catalog 提供了元数据,用于将外部系统的数据和flink的table做嵌合,使得flink可以无障的 将mysql/hive等外部数据库的数据映射成flink的Table。 总之:Catalog ,访问存储在数据库或其他外部系统中的数据所需的函数和信息。
    更多callog参看:catlog
    临时表的创建:

    Table projTable = tableEnv.from("X").select(...);
    
    //把projTable 注册为临时表
    tableEnv.createTemporaryView("projectedTable", projTable);
    
    
    注意: Table对象与VIEW关系数据库系统中的 VIEW类似,
    即定义 的查询Table未优化,但在另一个查询引用已注册的 时,
    将被内联Table。
    如果多个查询引用同一个已注册的Table,则会为每个引用的查询内联并执行多次,
    即注册的结果Table不会被共享。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.catlog创建表格

    A TableEnvironment maintains a map of catalogs of tables which are created with an identifier。每个**标识符(identifier)**由 3 部分组成:catalog name, database name and object name。如果未指定catlog或databases,则将使用当前默认值(请参阅表标识符扩展部分中的示例)。
    table可以是永久表,或者是临时表。
    注意:每一个TableEnvironment 都有一个唯一的标识符, 此标识符代表了当前的环境, 标识符在当前TableEnvironment 中创建了catlogs<=>Tables的映射。标识符由:catalog name, database name and object name组成。

    3.shadowing()

    可以使用与现有永久表相同的**标识符(identifier)**注册临时表。临时表会影响永久表,只要临时表存在,就无法访​​问永久表。所有具有该标识符的查询都将针对临时表执行。适合用于测试。

    4.connect tables

    这是一个十分重要的中能,顾名思义就是提供了Flink Table到外部存储系统的连接。然后生成flink的Table对象, flink对不同的外部系统都提供了连接器。(比如文件系统,hdfs系统,kafka系统,mysql系统)
    此外连接器不是Table Api专用的, DataStream Api和DataSet Api也有自己的连接器,此处不讲解连接器,现在只需要理解连接器的概念即可,下面是一个Flink Table Api对应的文件系统连接器语法:

     tableEnv.executeSql("""CREATE TABLE input (community_name STRING,
     address STRING,
     old_people_num INT,
     live_alone_num INT,
     empty_nester_num INT) 
     WITH ('connector' = 'filesystem','path' = 'D:\\flink\\',
     'format'='csv','csv.field-delimiter'=',')""");    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    需要注意两点

    1.即便是对同样的外部系统(比如mysql),DataStream Api ,DataSet Api,Table Api三大模块连接器的语法也都不一样。 所以读者需要知道
    连接器是基于三大模块分别实现的, 故而语法存在区别。
    2.另外三大模块提供的连接器也不是完全一致的,有些连接器可能只存在于某个模块,而在另外的模块不存在,具体支持和语法还需要参考官网。连接器

    5.extend table identifiers(标识符拓展)

    table总是用由三部分组成的identifier注册,包括catlog、database和table name。 用户可以将这个知识作为命名空间来理解。catlot->databases->Table
    用户可以修改:catlot或者databases达到切换命名空间的目的,然后基于Table对象创建临时表。 默认情况下不指定的时候,系统会默认设置catlog和database.

    TableEnvironment tEnv = ...;
    tEnv.useCatalog("custom_catalog");
    tEnv.useDatabase("custom_database");
    
    Table table = ...;
    
    // register the view named 'exampleView' in the catalog named 'custom_catalog'
    // in the database named 'custom_database' 
    tableEnv.createTemporaryView("exampleView", table);
    
    // register the view named 'exampleView' in the catalog named 'custom_catalog'
    // in the database named 'other_database' 
    tableEnv.createTemporaryView("other_database.exampleView", table);
    
    // register the view named 'example.View' in the catalog named 'custom_catalog'
    // in the database named 'custom_database' 
    tableEnv.createTemporaryView("`example.View`", table);
    
    // register the view named 'exampleView' in the catalog named 'other_catalog'
    // in the database named 'other_database' 
    tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    6.Table Api 于Sql的关系

    其实这两个都是属于Table中的知识点, 二者其实没啥区别,在flink知识点中属于同一个模块。二者都在TableEnv 中调用,区别就是写法不一样,Table Api是用方法调用的方式查询数据,而Sql Api是直接写sql字符串进行查询。
    格式:

    //Table Api写法
    1.Table resultTable = tableEnv.sqlQuery("select * from mytable where `name`='鸡蛋'");
    //Sql Api写法
    2.Table resultTable = tableEnv.from("mytable").select($("*")).filter($("name").isEqual("牛奶"));
    
    • 1
    • 2
    • 3
    • 4

    7.table API 程序transform 和execute

    正常来说,中间的算子并不会立即触发执行,程序的执行分为翻译和执行两步骤

    Table Api 和Sql Api在以下情况下才会执行:

    1. TableEnvironment.executeSql()叫做。此方法用于执行给定的语句,一旦调用此方法,就会立即翻译 sql 查询。
    2. Table.executeInsert()叫做。该方法用于将表格内容插入给定的接收器路径,一旦调用该方法,就会立即翻译表格 API。
    3. Table.execute()叫做。该方法用于将表格内容采集到本地客户端,调用该方法后立即翻译表格API。//此处的execute()和StreamExecutionEnvironment.execute()不是一个东西此处的execute是table数据的输出打印,类似于spark的show方法
    4. StatementSet.execute()叫做。A Table(通过 发送到接收器StatementSet.addInsert())或
      INSERT 语句(通过
      指定StatementSet.addInsertSql())将StatementSet首先被缓冲。它们被翻译一次StatementSet.execute()被调用。所有接收器都将优化为一个
      DAG。
    5. ATable在转换为 a 时被翻译DataStream(请参阅与 DataStream 集成)。翻译后,它是一个常规的
      DataStream 程序,并在StreamExecutionEnvironment.execute()被调用时执行。//注意DataStream
      程序的触发需要流环境对象调用execute(),而Table Api和DataSet环境的执行不需要调用execute()

    8.流转换方法:fromDataStre介绍

    1. fromDataStream(DataStream) 返回值为Table对象:将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印。
    2. fromDataStream(DataStream, Schema)返回值为Table对象:将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型可在Schema添加时间属性、水印策略、列名,主键等。
    3. createTemporaryView(String, DataStream)无返回值:创建视图临时表。不传播事件时间和水印
    4. createTemporaryView(String, DataStream, Schema)无返回值:创建视图临时表。可传播事件时间和水印(Schema可以定义事件时间和水位线)

    9.流转换:toDataStream

    1.toDataStream(DataStream):将table转换为DataStream。默认流数据用Row对象封装. 。水印也被传播。
    2.toDataStream(DataStream, Class)Table中的流数据转成对应的Class类的数据。

    10.学会用printSchem和tableEnv.execute().print()

    通过观察打印出来的schema,我们可以检测是否是可以正常使用sql的table对象

    
    public class Prodect{
        private String name;//产品名称
        private Integer count;//产品销售数量
        private Double price;//产品的价格
    
        public Prodect(String name, Integer count, Double price) {
            this.name = name;
            this.count = count;
            this.price = price;
        }
    
        public Prodect() {
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Integer getCount() {
            return count;
        }
    
        public void setCount(Integer count) {
            this.count = count;
        }
    
        public Double getPrice() {
            return price;
        }
    
        public void setPrice(Double price) {
            this.price = price;
        }
    
        @Override
        public String toString() {
            return "Prodect{" +
                    "name='" + name + '\'' +
                    ", count=" + count +
                    ", price=" + price +
                    '}';
        }
    }
    
    public class SchemeDemo {
        public static void main(String[] args) {
    
            // Table Api 环境和DataStreamApi环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            // create a DataStream Api支持的 流数据
    
            DataStream ds2 = env.fromElements(
                    new Prodect("鸡蛋",1,2.1),
                    new Prodect("鸡蛋",3,2.3),
                    new Prodect("鸡蛋",2,2.2),
                    new Prodect("牛奶",1,2.6),
                    new Prodect("牛奶",2,2.9),
                    new Prodect("牛奶",3,2.8));
    
    
            Table table2 = tableEnv.fromDataStream(ds2).as("name","count","price");
            table2.printSchema();
            table2.execute().print();
    
    
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    结果:
    (
    name STRING,
    count INT,
    price DOUBLE
    )//打印出这种结果就是正常的,意思是此table有三个字段,name,count,price

    ±—±-------------------------------±------------±-------------------------------+
    | op | name | count | price |
    ±—±-------------------------------±------------±-------------------------------+
    | +I | 鸡蛋 | 1 | 2.1 |
    | +I | 鸡蛋 | 3 | 2.3 |
    | +I | 鸡蛋 | 2 | 2.2 |
    | +I | 牛奶 | 1 | 2.6 |
    | +I | 牛奶 | 2 | 2.9 |
    | +I | 牛奶 | 3 | 2.8 |
    ±—±-------------------------------±------------±-------------------------------+

  • 相关阅读:
    有被惊艳到!阿里达摩院面向开发者公布的Java全体系成长路线,从P5-P8职级全系
    【微服务 | 学成在线】项目易错重难点分析(媒资管理模块篇·上)
    安装、测试和训练OpenPCDet:一篇详尽的指南
    spring boot 整合多数据源
    Google Earth Engine(GEE)——一个简单的sentinel-2影像函数下载APP
    Java集合类--List集合,Set集合,Map集合
    SQL Server Query Store Settings (查询存储设置)
    国自然中标越来越难,怎样才能赢在起跑线上?
    ShellBrowser WPF 组件 6.3.1 Crack
    【Mysql】日期函数 、group_concat函数
  • 原文地址:https://blog.csdn.net/qq_36066039/article/details/125874177