• 如何创建与引擎独立的Iceberg表


    本文首发微信公众号:码上观世界

    创建表是引擎的必备基本能力,引擎有很多,Hive、Spark、Flink、Trino等等,我们姑且只关注这些,创建的表按照是否跟引擎绑定,分为两大类:managed table和external table。以这里举例的引擎为例,它们都可以将表元数据维护在Hive Metastore中,对引擎来讲,这些表以external table的形式存在。

    在本文中,我们将话题限制在Hive、Spark、Flink、Trino如何创建Iceberg表,并且保证各引擎都可以无障碍互相访问。这样,我们的话题分为3个部分:

    • 不同引擎创建的Iceberg表为什么不兼容?

    • 如何屏蔽不同引擎创建Iceberg表的差异?

    • 创建独立引擎的Iceberg表的步骤

    不同引擎创建的Iceberg表为什么不兼容

    Hive、Spark、Flink、Trino创建的Iceberg表元数据都存储在HMS中,也就是复用了HMS的存储模型,主要涉及的表的相互关系如下:

    bb95c290d3c85728a335fc7941302be1.png

    以Hive创建的Iceberg表为例,来看看其存储内容:

    1. CREATE TABLE default.sample_hive_table_1(
    2. id bigint, name string
    3. )
    4. PARTITIONED BY(
    5. dept string
    6. )
    7. STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

    首先在TBLS中存储了一条记录:记录了当前表的ID,名称,类型,数据库ID,序列化ID,拥有者等信息。在TABLE_PARAMS中存储了相关参数:

    8b20cc5d2fd728212ab8c3c01b33913c.png

    storage_handler记录了当前表需要用Iceberg handler来处理。Iceberg表的元数据信息,通过metadata_location和previous_metadata_location指定,前者代表最新变更记录,后者代表上一次的变更记录。

    在字段表COLUMNS_V2中记录了字段信息:

    09aa40d8cc596c0f5d01297c87e5600d.png

    由图可知,其并没有存储分区字段,分区信息存储在元数据文件中。这个表只保存表最新版本的字段信息。

    SDS表的内容:

    SD_ID


    INPUT_FORMAT

    OUTPUT_FORMAT

    LOCATION

    898


    org.apache.iceberg.mr.hive.HiveIcebergInputFormat

    org.apache.iceberg.mr.hive.HiveIcebergOutputFormat

    s3a://faas-ethan/warehouse/default/sample_hive_table_1

    SERDES和SERDES_PARAMS存储序列化相关的信息,重要的字段只有一个:SERDES.SLIB=org.apache.iceberg.mr.hive.HiveIcebergSerDe

    如果你用其他引擎来创建Iceberg表,会发现元数据存储上的几个差别:

    首先是序列化信息变成了:

    SERDES.SLIB=org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

    其次是SDS的存储格式变成了其他HDFS文件格式(以下为示例格式):

    SD_ID


    INPUT_FORMAT

    OUTPUT_FORMAT

    LOCATION

    898


    org.apache.hadoop.mapred.FileInputFormat

    org.apache.hadoop.mapred.FileOutputFormat

    s3a://faas-ethan/warehouse/default/sample_hive_table_1

    最后是没有Iceberg的storage_handler属性,由于这些格式跟Hive创建的Iceberg表的差异,导致Hive引擎无法识别其他引擎创建的表,而其他几种引擎之间是互通的,且能访问Hive 创建的Iceberg表,互通关系表现为:

    74ca2c4e04e7fc45fa68aa793da5299d.png

    其中红色箭头表示单项互通。

    如何屏蔽不同引擎创建Iceberg表的差异

    最简单的办法是直接修改元数据:

    1. INSERT INTO TABLE_PARAMS
    2. (TBL_ID, PARAM_KEY, PARAM_VALUE)
    3. VALUES(898, 'storage_handler', 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
    4. update SDS set INPUT_FORMAT =NULL ,OUTPUT_FORMAT =NULL where SD_ID =898
    5. update SERDES set slib='org.apache.iceberg.mr.hive.HiveIcebergSerDe' where serde_ID =898

    为了做到Iceberg表元数据的真正中立,也可以将SDS的INPUT_FORMAT 和 OUTPUT_FORMAT 字段都置NULL。

    这是通过事后修补的办法来做到的,能否在创建表之前就修改呢?并且不修改各引擎的实现代码。实际上,引擎创建Iceberg表,是通过iceberg API来实现的,交互逻辑大概是这个样子:

    623ad62251f2959fe6b2fd9cd4cc59ac.png

    那是不是有办法直接通过Iceberg API创建HMS表呢?of course!

    更进一步,我们将上述逻辑模型,抽象成如下结构,引擎变成了通用了Restful API,理想情况下,使用方只需要传入创建表的相关参数等信息即可:

    af6ffcb153ae82b35ef930d08abf42cf.png

    这样一来,不论是Hive表还是Iceberg表,都可以通过这个模型完成表的创建。接下来,我们看如何抽象这样的表模型。

    创建独立引擎的Iceberg表的步骤

    通过上面元数据的分析,可以将创建Iceberg表的元数据信息分为下面几类:

    表名称和类型信息,用一个复合字段来表示:

    1. "name": {
    2. "catalogName": "mycatalog",
    3. "databaseName": "test_iceberg_db",
    4. "tableName": "test_iceberg_table",
    5. "type": "TABLE"
    6. }

    域字段列表信息,用一个复合数组表示:

    1. "fields": [
    2. {
    3. "comment": "primary key",
    4. "defaultValue": 0,
    5. "isIndexKey": true,
    6. "isNullable": false,
    7. "isSortKey": true,
    8. "name": "id",
    9. "type": "int"
    10. },
    11. {
    12. "comment": "data value",
    13. "defaultValue": "",
    14. "isIndexKey": false,
    15. "isNullable": false,
    16. "isSortKey": false,
    17. "name": "data",
    18. "source_type": "string",
    19. "type": "string"
    20. }
    21. ]

    元数据信息,用一个复合结构表示:

    1. "metadata": {
    2. "table_type": "ICEBERG",
    3. "location": "s3a://faas-ethan/warehouse/test_iceberg_db/test_iceberg_table"
    4. }

    序列化信息,用一个复合结构表示:

    1. "serde": {
    2. "inputFormat": "org.apache.iceberg.mr.hive.HiveIcebergInputFormat",
    3. "outputFormat": "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat",
    4. "parameters": {
    5. "k1":"v1"
    6. },
    7. "serializationLib": "string",
    8. "uri": "your_hive_table_locaton"
    9. }

    有了表的模型之后,就可以按照下面创建Hive表的创建模板来走了:

    1. List fields = new ArrayList();
    2. //创建Fields
    3. fields.add(new FieldSchema("colname", serdeConstants.STRING_TYPE_NAME, "comment"));
    4. //创建StorageDescriptor
    5. StorageDescriptor sd = new StorageDescriptor();
    6. sd.setCols(fields);
    7. sd.setSerdeInfo(new SerDeInfo());
    8. //创建Table对象
    9. Table tbl = new Table();
    10. tbl.setDbName(DB_NAME);
    11. tbl.setTableName(TBL_NAME);
    12. tbl.setSd(sd);
    13. //请求HMS持久化Table
    14. hiveMetaStoreClient.createTable(tbl);

    上面示例代码将创建Hive的模板流程划分为4个步骤:

    • 创建Fields

    • 创建StorageDescriptor

    • 创建Table对象

    • 请求HMS持久化Table

    总结

    本文讲述了不同引擎,主要是Hive、Spark、Flink和Trino,在创建Iceberg表上存在的兼容性问题及其产生的原因,然后给出了解决办法。最后,通过抽象一种创建跟引擎无关的Iceberg表的表示方法及其创建步骤。该方法的重要作用是能够实现通用的元数据管理。

  • 相关阅读:
    力扣:115.不同的子序列
    祥云杯2022 pwn - protocol
    王道 第四章网络层
    Redis 学习-上
    解决telnet不是内部或外部以及验证某个端口是否开放
    简单聊聊Https的来龙去脉
    前台查看日志功能
    Java 多线程:基础
    CNPM、NPM 和 Yarn:JavaScript 包管理器的比较
    3. Android逆向-基于Frida的工具Objection
  • 原文地址:https://blog.csdn.net/naisongwen/article/details/126434273