• Flink Catalog解读


    01 引言

    我们知道 FlinkTable(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,这都类似于我们平常使用的关系型数据库里面的概念。

    相对于关系型数据库的这些概念,Flink 里还有一个 Catalog(目录) 的概念,本文来讲解下。

    在这里插入图片描述

    02 Catalog

    2.1 Catalog概述

    数据处理最关键的方面之一是管理元数据

    • 元数据可以是临时的,例如在Flink中临时表、或者通过 TableEnvironment 注册的 UDF
    • 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。

    Catalog在Flink中提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

    2.2 Catalog分类

    Catalog目前分为以下几类:

    分类描述缺陷
    GenericInMemoryCatalog基于内存实现的 Catalog所有元数据只在 session 的生命周期内可用
    JdbcCatalog可以将 Flink 通过 JDBC 协议连接到关系数据库JDBC Catalog只实现了PostgresCatalog
    HiveCatalog作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。
    自定义 Catalog通过实现 Catalog 接口来开发自定义 Catalog-

    2.3 Catalog API

    2.3.1 数据库操作

    // create database
    catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
    
    // drop database
    catalog.dropDatabase("mydb", false);
    
    // alter database
    catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
    
    // get database
    catalog.getDatabase("mydb");
    
    // check if a database exist
    catalog.databaseExists("mydb");
    
    // list databases in a catalog
    catalog.listDatabases("mycatalog");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2.3.2 表操作

    // create table
    catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // drop table
    catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
    
    // alter table
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // rename table
    catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
    
    // get table
    catalog.getTable("mytable");
    
    // check if a table exist or not
    catalog.tableExists("mytable");
    
    // list tables in a database
    catalog.listTables("mydb");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.3.3 视图操作

    // create view
    catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
    
    // drop view
    catalog.dropTable(new ObjectPath("mydb", "myview"), false);
    
    // alter view
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
    
    // rename view
    catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
    
    // get view
    catalog.getTable("myview");
    
    // check if a view exist or not
    catalog.tableExists("mytable");
    
    // list views in a database
    catalog.listViews("mydb");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.3.4 分区操作

    // create view
    catalog.createPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // drop partition
    catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
    
    // alter partition
    catalog.alterPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // get partition
    catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // check if a partition exist or not
    catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // list partitions of a table
    catalog.listPartitions(new ObjectPath("mydb", "mytable"));
    
    // list partitions of a table under a give partition spec
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // list partitions of a table by expression filter
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2.3.5 函数操作

    catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // drop function
    catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
    
    // alter function
    catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // get function
    catalog.getFunction("myfunc");
    
    // check if a function exist or not
    catalog.functionExists("myfunc");
    
    // list functions in a database
    catalog.listFunctions("mydb");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.4 Catalog 示例(SQL Client的方式)

    ① 首先需要注册Catalog:用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中,创建方式如下(可以使用Flink里面的Factory工厂模式动态加载):

    tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
    
    • 1

    ② 指定使用的内容:Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF,代码如下:

    Flink SQL> USE CATALOG myCatalog;
    Flink SQL> USE myDB;
    
    • 1
    • 2

    也可以通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息,代码如下:

    Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
    
    • 1

    ③ 其它常规命令

    -- 列出可用的 Catalog
    Flink SQL> show catalogs;
    
    -- 列出可用的数据库 
    Flink SQL> show databases;
    
    -- 列出可用的表
    Flink SQL> show tables;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    03 文末

    本文主要讲解了Flink Catalog的概念以及用法,如果大家有兴趣可以进一步去官网查看相关的文档,这里我列出相关比较核心的文档:

    接下来我的计划是编写 “如何自定义Catalog” ,以及Catalog的应用场景(有兴趣可先阅读《Ververica Platform-阿里巴巴全新Flink企业版揭秘》)相关的博客,谢谢大家的阅读,希望能帮助到大家,本文完!

  • 相关阅读:
    【塔防】1,游戏架构
    升压芯片很简单(三),FSB628升压芯片大串讲
    K8S篇之k8s常用操作指令
    Unity3D,阿里云服务器,平台配置
    51单片机仿真软件 Proteus 8 Pro 安装步骤
    请求报错:javax.net.ssl.SSLHandshakeException: No appropriate protocol
    有六家机器视觉公司今年11月份初放假到明年春节后,除夕不放假看住企业不跑路,不倒闭,明年大家日子会越来越甜
    调整数组顺序使奇数位于偶数前面(二)
    Spring Data MongoDB中查询指定返回特定字段
    float 浮动
  • 原文地址:https://blog.csdn.net/qq_20042935/article/details/125925410