• StarRocks数据库FE——Catalog层


    ​仓外挂湖是指以 MPP 数据库为基础,使用可插拔架构,通过开放接口对接外部存储实现统一存储,在存储底层共享一份数据,计算、存储完全分离,实现从强管理到兼容开放存储和多引擎。实现方向为增加存储能力,提升查询引擎效率。总的来看,“仓外挂湖”路径本质是在仓的基础上增加湖的多类型存储等能力,需解决以下五大技术难点:

    统一元数据管理。打通不同数据系统,具备数据共享和跨库分析的能力,并支持互联互通、计算下推、协同计算,实现数据多平台之间透明流动。仓外挂湖路径目前主要是将对接外部存储如Hadoop、对象存储等的元数据进行采集,统一存储、管理。

    存储开放性。仓外挂湖路径的存储开放性主要表现在:存储介质兼容方面,将非数仓自身存储如 Hadoop、云对象存储等的数据纳入管理;数据格式方面,采用开放、标准化的数据格式,既包含 Hudi、Iceberg、Delta Lake 等开放格式,也包括 Parquet、ORC、CSV 等存储格式的支持。

    扩展查询引擎。仓外挂湖路径保留原 MPP 计算引擎计算能力的基础之上,主要是增加批处理和实时数据处理的能力。其中批处理方面是融合更轻量级、高效率的计算能力,而实时处理方面则是通过微批以及增量计算的方式,增强流的计算能力。

    存算分离。仓外挂湖需进行存算分离架构改造,而传统的MPP 存算耦合架构,不具备云原生能力。目前,仓外挂湖路径主要基于存算分离架构改造后的云原生 MPP 数据库实现。

    弹性伸缩。基于 K8S、Docker 等容器化技术对 MPP 体系的组件、服务进行容器化改造。目前该路径有实现计算层、存储层弹性伸缩,少量产品实现了根据业务负载自动弹性伸缩计算资源。

    本篇文章借助StarRocks FE的源码研究一下如何对接大数据系统的元数据。为了更简单地直接分析开放数据湖上的数据,StarRocks 提供了统一 Catalog 管理的能力,用户可以通过一键创建 Apache Hive/Apache Hudi/Apache Iceberg(以下简称 Hive/Hudi/Iceberg) 的 Catalog,轻松地分析湖上的所有数据,而无需逐个表进行 schema 建模。此外,通过统一的 Catalog,StarRocks 可以实现对湖上数据的统一管理。

    在这里插入图片描述

    Internal catalog: 内部数据目录,用于管理 StarRocks 所有内部数据。例如,执行 CREATE DATABASE 和 CREATE TABLE 语句创建的数据库和数据表都由 internal catalog 管理。每个 StarRocks 集群都有且只有一个 internal catalog 名为 default_catalog。
    External catalog: 外部数据目录,用于连接外部 metastore。在 StarRocks 中,您可以通过 external catalog 直接查询外部数据,无需进行数据导入或迁移。当前支持创建以下类型的 external catalog:Hive catalog:用于查询 Hive 数据;Iceberg catalog:用于查询 Iceberg 数据;Hudi catalog:用于查询 Hudi 数据;Delta Lake catalog:用于查询 Delta Lake 数据;JDBC catalog:用于查询 JDBC 数据源的数据。

    如上图所示,右上角的Catalog类就是上述Internal Catalog和External Catalog的父类,就是catalog在内存中的对象。这里使用一个CatalogMgr来管理集群中所有的Catalog对象:

    其中有一个字典对象catalogs,存放的是CatalogName到Catalog对象的映射,通过该catalog的名字查找catalogs就是获取对应的Catalog对象。

    其中还有一个对象connectorMgr其类型是ConnectorMgr,该对象引用的就是集群中所有外部元数据连接器的管理类。

    在这里插入图片描述
    CatalogMgr是内存中管理Catalog元数据对象的管理类,其包含private final Map catalogs = new HashMap<>();字典用于存放catalogName到Catalog元数据对象的映射,private final ConnectorMgr connectorMgr;外部元数据连接器管理类(非常重要),private final CatalogProcNode procNode = new CatalogProcNode();和相关函数向优化器提供对catalogs字典的二维访问。首先通过创建Catalog的函数说明CatalogMgr成员的作用。首先createCatalog函数传入形参CreateCatalogStmt,从其中提取catalog名、catalog类型、配置connector的Properties属性;createCatalog函数首先查找catalogs是否已经存在这样的catalogName,如果没有则需要调用connectorMgr.createConnector(new ConnectorContext(catalogName, type, properties))对这样的catalog创建connector(这里connector和catalog不需要关联【仅仅通过catalogName关联】,connector是存储在内存中,而catalog会持久化到磁盘上,恢复后connector需要再次新建);新建ExternalCatalog外部catalog对象,将其put到catalogs中去。这里的InternalCatalog目前在StarRocks中只有默认的一个,不能够动态创建,默认创建好。

        public void createCatalog(CreateCatalogStmt stmt) throws DdlException {
            createCatalog(stmt.getCatalogType(), stmt.getCatalogName(), stmt.getComment(), stmt.getProperties());
        }
    
        public void createCatalog(String type, String catalogName, String comment, Map<String, String> properties) throws DdlException {
            if (Strings.isNullOrEmpty(type)) { throw new DdlException("Missing properties 'type'"); }
    
            readLock();
            try { Preconditions.checkState(!catalogs.containsKey(catalogName), "Catalog '%s' already exists", catalogName);
            } finally {  readUnlock();
            }
    
            writeLock();
            try {
                Preconditions.checkState(!catalogs.containsKey(catalogName), "Catalog '%s' already exists", catalogName);
                Connector connector = connectorMgr.createConnector(new ConnectorContext(catalogName, type, properties));
                if (null == connector) {
                    LOG.error("connector create failed. catalog [{}] encounter unknown catalog type [{}]", catalogName, type);  throw new DdlException("connector create failed");
                }
                long id = isResourceMappingCatalog(catalogName) ? ConnectorTableId.CONNECTOR_ID_GENERATOR.getNextId().asInt() : GlobalStateMgr.getCurrentState().getNextId();
                Catalog catalog = new ExternalCatalog(id, catalogName, comment, properties);
                catalogs.put(catalogName, catalog);
                if (!isResourceMappingCatalog(catalogName)) {
                    GlobalStateMgr.getCurrentState().getEditLog().logCreateCatalog(catalog);
                }
            } finally {
                writeUnLock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    这里再看看加载Catalogs的流程,其实就是读取序列化后的catalog信息,然后调用replayCreateCatalog将其重放到CatalogMgr中去。

        public long loadCatalogs(DataInputStream dis, long checksum) throws IOException, DdlException {
            int catalogCount = 0;
            try {
                String s = Text.readString(dis);
                SerializeData data = GsonUtils.GSON.fromJson(s, SerializeData.class);
                if (data != null) {
                    if (data.catalogs != null) {
                        for (Catalog catalog : data.catalogs.values()) {
                            if (!isResourceMappingCatalog(catalog.getName())) {  replayCreateCatalog(catalog);
                            }
                        }
                        catalogCount = data.catalogs.size();
                    }
                }
                checksum ^= catalogCount;
                LOG.info("finished replaying CatalogMgr from image");
            } catch (EOFException e) {
                LOG.info("no CatalogMgr to replay.");
            }
            return checksum;
        }
        public void replayCreateCatalog(Catalog catalog) throws DdlException {
            String type = catalog.getType();
            String catalogName = catalog.getName();
            Map<String, String> config = catalog.getConfig();
            if (Strings.isNullOrEmpty(type)) { throw new DdlException("Missing properties 'type'");
            }
            // skip unsupport connector type
            if (!ConnectorType.isSupport(type)) {
                LOG.error("Replay catalog [{}] encounter unknown catalog type [{}], ignore it", catalogName, type);
                return;
            }
            readLock();
            try { Preconditions.checkState(!catalogs.containsKey(catalogName), "Catalog '%s' already exists", catalogName);
            } finally { readUnlock();
            }
            Connector connector = connectorMgr.createConnector(new ConnectorContext(catalogName, type, config));
            if (null == connector) {
                LOG.error("connector create failed. catalog [{}] encounter unknown catalog type [{}]", catalogName, type);
                throw new DdlException("connector create failed");
            }
            writeLock();
            try {
                catalogs.put(catalogName, catalog);
            } finally {
                writeUnLock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    如下图所示Catalog类中只保持了catalog id全局唯一,catalog名、catalog Porperties配置。
    在这里插入图片描述

  • 相关阅读:
    华为机试 - 堆栈中的剩余数字
    【LeetCode每日一题:809.情感丰富的文字~~~双指针+计数器】
    MyBatis
    大学生简单环保环境静态HTML网页设计作品 DIV布局环境介绍网页模板代码 DW学生环境网站制作成品下载 HTML5期末大作业
    OOM和JVM最详细介绍
    Deployments
    Linux 企业级夜莺监控分析工具远程访问
    JavaScript对象:我们真的需要模拟类吗?
    8. 微服务之消息队列RabbitMQ以及五种消息队列模式
    Window 安装多个版本的 java 并按需切换
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/132631130