• FlinkSQL系列02-Table表对象和SQL表视图


    表概念

    表标识

    表标识由3部分组成:

    • catalog name (常用于标识不同的“源”, 比如 hive catalog, inner catalog 等)
    • database name(通常语义中的“库”)
    • table name(通常语义中的“表”)
    // 指定使用地表标识
    TableEnvironment tEnv = ...;
    tEnv.useCatalog("a_catalog");
    tEnv.useDatabase("db1");
    
    • 1
    • 2
    • 3
    • 4
    // 将 table 注册到指定标识中
    Table table = ...;// 注册在默认 catalog 的默认 database 中
    tableEnv.createTemporaryView("a_view", table);// 注册在默认 catalog 的指定 database 中
    tableEnv.createTemporaryView("db2.a_view", table);// 注册在指定 catalog 的指定 database 中
    tableEnv.createTemporaryView("x_catalog.db3.a_view", table);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    表和视图

    Flinksql 中的表,可以是 virtual 的(view 视图)和 regular 的(table 常规表)

    • table 描述了一个物理上的外部数据源, 如文件、数据库表、kafka 消息 topic
    • view 则基于表创建, 代表一个或多个表上的一段计算逻辑(就是对一段查询计划的逻辑封装)

    不管是 table 还是 view, 在 tableAPI 中得到的都是 Table 对象

    临时表和永久表

    • 临时表(视图)
      创建时带 temporary 关键字(crate temporary view,createtemporary table);
      schema 只维护在所属 flink session 运行时内存中;
      当所属的 flink session 结束后表信息将不复存在; 且该表无法在 flink session 间共享;
    • 永久表(视图)
      创建时不带 temporary 关键字 (create view , create table);
      schema 可记录在外部持久化的元数据管理器中(比如 hive 的 metastore) ;
      当所属 flink session 结束后, 该表信息不会丢失; 且在不同 flink session 中都可访问到该表的信息;

    区别:schema 信息是否被持久化存储

    表定义

    创建 table 表对象

    Table 对象获取方式解析:

    • 从已注册的表

      Table t1 = tenv.from("t1"); // 通过已经在 env 的 catalog 中注册的表名, 获得 Table 对象
      
      • 1
    • 从 TableDescriptor(连接器/format/schema/options)

      Table table = tenv.from(TableDescriptor
                  .forConnector("kafka")  // 指定连接器
                  .schema(Schema.newBuilder()  // 指定表结构
                          .column("id", DataTypes.INT())
                          .column("name", DataTypes.STRING())
                          .column("age", DataTypes.INT())
                          .column("gender", DataTypes.STRING())
                          .build())
                  .format("json")  // 指定数据源的数据格式
                  .option("topic", "doit30-3")  // 连接器及format格式的相关参数
                  .option("properties.bootstrap.servers", "doit01:9092")
                  .option("properties.group.id", "g2")
                  .option("scan.startup.mode", "earliest-offset")
                  .option("json.fail-on-missing-field", "false")
                  .option("json.ignore-parse-errors", "true")
                  .build());
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    • 从 DataStream

      // 1 不指定schema,将流创建成Table对象,表的schema是默认的,往往不符合我们的要求
      Table table1 = tenv.fromDataStream(kfkStream);
      /*table1.execute().print();*/
      
      // 2 为了获得更理想的表结构,可以先把数据流中的数据转成javabean类型
      SingleOutputStreamOperator<Person> javaBeanStream = kfkStream.map(json -> JSON.parseObject(json, Person.class));
      Table table2 = tenv.fromDataStream(javaBeanStream);
      /*table2.execute().print();*/
      
      // 3 手动指定 schema定义,来将一个javabean流,转成Table对象
      Table table3 = tenv.fromDataStream(javaBeanStream,
             Schema.newBuilder()
                     .column("id",DataTypes.BIGINT())
                     .column("name",DataTypes.STRING())
                     .column("age",DataTypes.INT())
                     .column("gender",DataTypes.STRING())
                     .build());
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • 从 Table 对象上的查询 api 生成

      Table table = table3.select($("guid"), $("uuid"));
      
      • 1
    • 从测试数据

      Table table = tenv.fromValues(
                  DataTypes.ROW(
                          DataTypes.FIELD("id",DataTypes.INT()),
                          DataTypes.FIELD("name",DataTypes.STRING()),
                          DataTypes.FIELD("age",DataTypes.DOUBLE())
                  ),
                  Row.of(1, "zs", 18.2),
                  Row.of(2, "bb", 28.2),
                  Row.of(3, "cc", 16.2),
                  Row.of(4, "dd", 38.2)
          );
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

    创建 sql 表视图

    • 从已存在的 datastream 注册

      tenv.createTemporaryView("t_person", javaBeanStream);
      tenv.executeSql("select gender,max(age) as max_age from t_person group by gender").print();
      
      • 1
      • 2
    • 从已存在的 Table 对象注册

      tenv.createTemporaryView("t1",table);
      
      • 1
    • 从 TableDescriptor(连接器) 注册

      tenv.createTable("table_a",  // 表名
              TableDescriptor.forConnector("filesystem")
                      .schema(Schema.newBuilder()
                              .column("id", DataTypes.INT())
                              .column("name", DataTypes.STRING())
                              .column("age", DataTypes.INT())
                              .column("gender", DataTypes.STRING())
                              .build())
                      .format("csv")
                      .option("path", "data/sqldemo/a.txt")
                      .option("csv.ignore-parse-errors", "true")
                      .build());
      
      
      tenv.executeSql("select * from table_a").print();
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    • 执行 Sql 的 DDL 语句来注册

      tenv.executeSql(
      "CREATE TABLE age_info "
      + "( "
      + " id INT, "
      + " name string, "
      + " gender string, "
      + " age int, "
      + " PRIMARY KEY (id) NOT ENFORCED "
      + ") WITH ( "
      + " 'connector' = 'mysql-cdc', "
      + " 'hostname' = 'hdp01', "
      + " 'port' = '3306', "
      + " 'username' = 'root', "
      + " 'password' = '123456', "
      + " 'database-name' = 'abc', "
      + " 'table-name' = 'age_info' "
      + " ) ");
      tenv.executeSql("select * from age_info").print();
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    catalog

    什么是 catalog

    catalog 就是一个元数据空间,简单说就是记录、获取元数据(表定义信息)的实体;

    flinksql 在运行时,可以拥有多个 catalog,它们由 catalogManager 模块来注册、管理;

    1. 环境创建之初,就会初始化一个默认的元数据空间

      • 空间名称: default_catalog
      • 空间实现类: GenericInMemoryCatalog
    2. 用户还可以向环境中注册更多的 catalog,如下代码新增注册了一个 hivecatalog

      // 创建了一个 hive 元数据空间的实现对象
      HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/hiveconf/dir");
      // 将 hive 元数据空间对象注册到 环境中
      tenv.registerCatalog("mycatalog",hiveCatalog);
      
      • 1
      • 2
      • 3
      • 4

    如何理解 hive catalog

    • flinksql 利用 hive catalog 来建表(查询、修改、删除表),本质上只是利用了 hive 的 metastore 服务;
    • 更具体来说,flinksql 只是把 flinksql 的表定义信息,按照 hive 元数据的形式,托管到 hive 的 metastore 中而已;
    • 当然,hive 中也能看到这些托管的表信息,但是,并不能利用它底层 mapreduce 或 spark 引擎来查询这些表;因为 mapreduce 或者 spark 引擎,并不能理解 flinksql 表定义中的信息,也无法为这些定义信息提供相应的组件去读取数据
  • 相关阅读:
    Linux:安装IDEA开发工具
    leetcode 148. Sort List 排序链表(中等)
    光耦合器继电器与传统继电器:哪种最适合您的项目?
    嵌入式开发:注释C代码的10个技巧
    WLAN射频资源调优技术
    C. Nice Garland
    Nexus3 部署备份与恢复
    RTX3090+win10+CUDA11.6+cudnn8.5.0+pytorch1.12.1 环境——个人配置经验
    顶部动态菜单栏的使用
    太阳能路灯的根本结构及作业原理
  • 原文地址:https://blog.csdn.net/qq_17310871/article/details/126560562