• Flink之Catalog


    Catalog

    概述

    Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

    数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的 UDF。 元数据也可以是持久化的,例如Hive Metastore中的元数据。

    Catalog提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。

    Catalog分类

    在Flink中,Catalog可以分为4类:GenericInMemoryCatalogJdbcCatalogHiveCatalog用户自定义Catalog

    1.GenericInMemoryCatalog

    GenericInMemoryCatalog是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

    2.JdbcCatalog

    JdbcCatalog使得用户可以将Flink通过JDBC协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前 JDBC Catalog仅有的两种实现。

    3.HiveCatalog

    HiveCatalog有两个用途:作为原Flink元数据的持久化存储,以及作为读写现有Hive元数据的接口。

    Hive Metastore以小写形式存储所有元数据对象名称。而GenericInMemoryCatalog区分大小写。

    4.用户自定义Catalog

    Catalog是可扩展的,用户可以通过实现Catalog接口来开发自定义Catalog。 想要在SQL CLI中使用自定义 Catalog,用户除了需要实现自定义的Catalog 之外,还需要为这个Catalog实现对应的CatalogFactory接口。

    CatalogFactory定义了一组属性,用于SQL CLI启动时配置Catalog。 这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到CatalogFactory并初始化相应的Catalog 实例。

    GenericInMemoryCatalog

    基于内存实现的Catalog,所有元数据只在session的生命周期(一个Flink任务运行生命周期内)内可用。默认自动创建名为default_catalog的内存Catalog,这个Catalog默认只有一个名为default_database的数据库。

    JdbcCatalog

    JdbcCatalog使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。

    这里以JdbcCatalog-MySQL使用为例。

    注意:JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。

    下载JAR包及使用

    下载:flink-connector-jdbc

    下载:mysql-connector-j

    上传JAR包到flink/lib

    cp ./flink-connector-jdbc-3.1.0-1.17.jar /usr/local/program/flink/lib
    
    cp ./mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib
    
    • 1
    • 2
    • 3

    重启操作

    重启flink集群和sql-client

    bin/start-cluster.sh
    
    bin/sql-client.sh
    
    • 1
    • 2
    • 3

    创建Catalog

    JdbcCatalog支持以下选项:

    name:必需,Catalog名称
    
    default-database:连接到的默认数据库
    
    username: Postgres/MySQL帐户的用户名
    
    password:帐号密码
    
    base-url:数据库的jdbc url(不含数据库名)
    	Postgres Catalog:是"jdbc:postgresql://:<端口>"
    	MySQL Catalog:是"jdbc: mysql://:<端口>"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    CREATE CATALOG jdbc_catalog WITH(
        'type' = 'jdbc',
        'default-database' = 'demo',
        'username' = 'root',
        'password' = '123456',
        'base-url' = 'jdbc:mysql://node01:3306'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    查看与使用Catalog

    查看Catalog

    Flink SQL> show catalogs;
    +-----------------+
    |    catalog name |
    +-----------------+
    | default_catalog |
    |    jdbc_catalog |
    +-----------------+
    2 rows in set
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    使用指定Catalog

    Flink SQL> use catalog jdbc_catalog;
    [INFO] Execute statement succeed.
    
    • 1
    • 2

    查看当前的CATALOG

    Flink SQL> SHOW CURRENT CATALOG;
    +----------------------+
    | current catalog name |
    +----------------------+
    |         jdbc_catalog |
    +----------------------+
    1 row in set
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    操作数据库表

    Flink SQL> show current database;
    +-----------------------+
    | current database name |
    +-----------------------+
    |                  demo |
    +-----------------------+
    1 row in set
    
    
    Flink SQL> show tables;
    +------------+
    | table name |
    +------------+
    |    tb_user |
    +------------+
    1 row in set
    
    Flink SQL> select * from tb_user;
    [INFO] Result retrieval cancelled.
    
    Flink SQL> insert into tb_user values(0,'java',20);
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 9d78ec378ad635d291bd730ba86245d8
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    自动初始化catalog

    进入SQL客户端自动初始化catalo,创建vim sql-client-init.sql初始化脚本

    SET sql-client.execution.result-mode = 'tableau';
    
    CREATE CATALOG jdbc_catalog WITH(
        'type' = 'jdbc',
        'default-database' = 'demo',
        'username' = 'root',
        'password' = '123456',
        'base-url' = 'jdbc:mysql://node01:3306'
    );
    
    use catalog jdbc_catalog;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    进入客户端时指定初始化文件

    bin/sql-client.sh  -i ./sql-client-init.sql
    
    • 1

    再查看catalog

    Flink SQL> show catalogs;
    +-----------------+
    |    catalog name |
    +-----------------+
    | default_catalog |
    |    jdbc_catalog |
    +-----------------+
    2 rows in set
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    HiveCatalog

    HiveCatalog有两个用途:

    单纯作为 Flink元数据的持久化存储
    
    作为读写现有Hive元数据的接口
    
    • 1
    • 2
    • 3

    注意:Hive MetaStore以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。

    下载JAR包及使用

    下载:flink-sql-connector-hive

    下载:mysql-connector-j

    上传jar包到flink的lib

    cp ./flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar /usr/local/program/flink/lib/
    
    cp ./mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib
    
    • 1
    • 2
    • 3

    重启操作

    重启flink集群和sql-client

    bin/start-cluster.sh
    
    bin/sql-client.sh
    
    • 1
    • 2
    • 3

    hive metastore服务

    启动外置的hive metastore服务

    Hive metastore必须作为独立服务运行,因此,在Hive的hive-site.xml中添加配置

      <property>
        <name>hive.metastore.uris</name>
        <value>thrift://node01:9083</value>
      </property>
    
    • 1
    • 2
    • 3
    • 4
    # 前台运行
    hive --service metastore
    
    # 后台运行
    hive --service metastore &
    
    • 1
    • 2
    • 3
    • 4
    • 5

    创建Catalog

    创建Catalog参数说明

    配置项必需默认值类型说明
    typeYes(none)StringCatalog类型,创建HiveCatalog时必须设置为’hive’
    nameYes(none)StringCatalog的唯一名称
    hive-conf-dirNo(none)String包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml
    default-databaseNodefaultStringHive Catalog使用的默认数据库
    hive-versionNo(none)StringHiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败
    hadoop-conf-dirNo(none)StringHadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog
    CREATE CATALOG myhive WITH (
        'type' = 'hive',
        'default-database' = 'default',
        'hive-conf-dir' = '/usr/local/program/hive/conf'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5

    查看与使用Catalog

    查看Catalog

    Flink SQL> SHOW CATALOGS;
    +-----------------+
    |    catalog name |
    +-----------------+
    | default_catalog |
    |          myhive |
    +-----------------+
    2 rows in set
    
    
    --查看当前的CATALOG
    SHOW CURRENT CATALOG;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    使用指定Catalog

    Flink SQL> use catalog myhive;
    [INFO] Execute statement succeed.
    
    • 1
    • 2

    Flink与Hive中操作

    Flink中查看

    Flink SQL> SHOW DATABASES;
    +---------------+
    | database name |
    +---------------+
    |       default |
    +---------------+
    1 row in set
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    操作Hive

    # 创建数据库demo
    hive (default)> create database demo;
    
    # 切换数据库
    hive (default)> use demo;
    
    # 创建表tb_user
    hive (demo)> create table tb_user(id int,name string, age int);
    
    # 插入数据
    hive (demo)> insert into tb_user values(1,"test",22);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Flink中再次查看

    Flink SQL> SHOW DATABASES;
    +---------------+
    | database name |
    +---------------+
    |       default |
    |          demo |
    +---------------+
    2 rows in set
    
    Flink SQL> use demo;
    [INFO] Execute statement succeed.
    
    Flink SQL> show tables;
    +------------+
    | table name |
    +------------+
    |    tb_user |
    +------------+
    
    
    Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
    [INFO] Execute statement succeed.
    
    Flink SQL> select * from tb_user;2023-07-09 21:58:25,620 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 1
    
    +----+-------------+--------------------------------+-------------+
    | op |          id |                           name |         age |
    +----+-------------+--------------------------------+-------------+
    | +I |           1 |                           test |          22 |
    +----+-------------+--------------------------------+-------------+
    Received a total of 1 row
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在Flink中插入

    Flink SQL> insert into tb_user values(2,'flink',22);
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 9fe32af97cfb9e507ce84263cae65d23
    
    Flink SQL> select * from tb_user;2023-07-09 22:05:47,521 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 2
    
    +----+-------------+--------------------------------+-------------+
    | op |          id |                           name |         age |
    +----+-------------+--------------------------------+-------------+
    | +I |           1 |                           test |          22 |
    | +I |           2 |                          flink |          22 |
    +----+-------------+--------------------------------+-------------+
    Received a total of 2 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Hive中查询

    hive (demo)> select * from tb_user;
    
    • 1

    自动初始化catalog

    进入SQL客户端自动初始化catalog,创建vim sql-client-init.sql初始化脚本

    SET sql-client.execution.result-mode = 'tableau';
    
    CREATE CATALOG myhive WITH (
        'type' = 'hive',
        'default-database' = 'default',
        'hive-conf-dir' = '/usr/local/program/hive/conf'
    );
    
    use catalog myhive ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    进入客户端时指定初始化文件

    bin/sql-client.sh  -i ./sql-client-init.sql
    
    • 1

    可以发现数据信息任然存在

    Flink SQL> use catalog myhive;
    [INFO] Execute statement succeed.
    
    Flink SQL> show databases;
    +---------------+
    | database name |
    +---------------+
    |       default |
    |          demo |
    +---------------+
    2 rows in set
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    用户自定义Catalog

    实现Catalog

    用户可以通过实现Catalog接口来开发自定义 Catalog

    public class CustomCatalog implements Catalog {
    
        public CustomCatalog(String catalogName, String defaultDatabase) {
            
        }
    
    
        @Override
        public void open() {
            // 实现 Catalog 打开的逻辑
        }
    
        @Override
        public void close() {
            // 实现 Catalog 关闭的逻辑
        }
    
        @Override
        public List listDatabases() {
            // 实现获取数据库列表的逻辑
            return null;
        }
    
        @Override
        public CatalogDatabase getDatabase(String databaseName) {
            // 实现获取指定数据库的逻辑
            return null;
        }
    
        @Override
        public boolean databaseExists(String databaseName) {
            // 实现检查数据库是否存在的逻辑
            return false;
        }
    
        @Override
        public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) {
            // 实现创建数据库的逻辑
        }
    
        @Override
        public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) {
            // 实现删除数据库的逻辑
        }
    
        @Override
        public List listTables(String databaseName) {
            // 实现获取数据库中表的列表的逻辑
            return null;
        }
    
        @Override
        public CatalogBaseTable getTable(ObjectPath tablePath) {
            // 实现获取指定表的逻辑
            return null;
        }
    
        @Override
        public boolean tableExists(ObjectPath tablePath) {
            // 实现检查表是否存在的逻辑
            return false;
        }
    
        @Override
        public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) {
            // 实现创建表的逻辑
        }
    
        @Override
        public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) {
            // 实现删除表的逻辑
        }
    
        @Override
        public List listFunctions(String dbName) {
            // 实现获取数据库中函数的逻辑
            return null;
        }
    
        // 其他方法的实现
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    使用Catalog

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            // 注册自定义 Catalog
            tableEnv.registerCatalog("my_catalog", new CustomCatalog("my_catalog", "default"));
    
            // 使用自定义 Catalog
            tableEnv.useCatalog("my_catalog");
    
            // 执行 SQL 查询或 Table API 操作
            tableEnv.sqlQuery("SELECT * FROM my_table").execute().print();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Catalog API

    数据库操作

        public static void main(String[] args) throws Exception {
            // 创建一个基于内存的Catalog实例
            GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("myCatalog");
            catalog.open();
    
    
            // 创建数据库
            Map, String> properties = new HashMap<>();
            properties.put("key", "value");
            CatalogDatabase database = new CatalogDatabaseImpl(properties, "create comment");
            catalog.createDatabase("mydb", database, false);
    
            // 列出Catalog中的所有数据库
            System.out.println("列出Catalog中的所有数据库 = " + catalog.listDatabases());
    
            // 获取数据库
            CatalogDatabase createDb = catalog.getDatabase("mydb");
            System.out.println("获取数据库,comment =  " + createDb.getComment() + " ,properties = " + createDb.getProperties());
    
            // 修改数据库
            Map, String> properties2 = new HashMap<>();
            properties2.put("key", "value1");
            catalog.alterDatabase("mydb", new CatalogDatabaseImpl(properties2, "alter comment"), false);
    
            // 获取数据库
            CatalogDatabase alterDb = catalog.getDatabase("mydb");
            System.out.println("获取数据库,comment =  " + alterDb.getComment() + " ,properties = " + alterDb.getProperties());
    
            // 检查数据库是否存在
            System.out.println("检查数据库是否存在 = " + catalog.databaseExists("mydb"));
    
            // 删除数据库
            catalog.dropDatabase("mydb", false);
    
            // 关闭 Catalog
            catalog.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    列出Catalog中的所有数据库 = [default, mydb]
    获取数据库,comment =  create comment ,properties = {key=value}
    获取数据库,comment =  alter comment ,properties = {key=value1}
    检查数据库是否存在 = true
    
    • 1
    • 2
    • 3
    • 4

    表操作

    // 创建表
    catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // 删除表
    catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
    
    // 修改表
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // 重命名表
    catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
    
    // 获取表
    catalog.getTable("mytable");
    
    // 检查表是否存在
    catalog.tableExists("mytable");
    
    // 列出数据库中的所有表
    catalog.listTables("mydb");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    视图操作

    // 创建视图
    catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
    
    // 删除视图
    catalog.dropTable(new ObjectPath("mydb", "myview"), false);
    
    // 修改视图
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
    
    // 重命名视图
    catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
    
    // 获取视图
    catalog.getTable("myview");
    
    // 检查视图是否存在
    catalog.tableExists("mytable");
    
    // 列出数据库中的所有视图
    catalog.listViews("mydb");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    分区操作

    // 创建分区
    catalog.createPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // 删除分区
    catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
    
    // 修改分区
    catalog.alterPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // 获取分区
    catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // 检查分区是否存在
    catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // 列出表的所有分区
    catalog.listPartitions(new ObjectPath("mydb", "mytable"));
    
    // 根据给定的分区规范列出表的分区
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // 根据表达式过滤器列出表的分区
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    函数操作

    // 创建函数
    catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // 删除函数
    catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
    
    // 修改函数
    catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // 获取函数
    catalog.getFunction("myfunc");
    
    // 检查函数是否存在
    catalog.functionExists("myfunc");
    
    // 列出数据库中的所有函数
    catalog.listFunctions("mydb");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    Docker的简介
    LeetCode每日一题——1582. 二进制矩阵中的特殊位置
    《数据库》第1章 数据库系统概论
    AAOS CarMediaService 问题分析
    [资源推荐] 复旦大学张奇老师科研分享
    Big Data -- Postgres
    阿里巴巴开源限流组件Sentinel初探之集成Gateway
    代码管理工具知多少?来看看Git怎么用吧
    Softmax 回归 + 损失函数 + 图片分类数据集
    周志华机器学习(6):支持向量机
  • 原文地址:https://blog.csdn.net/qq_38628046/article/details/131692278