• Hudi查询类型/视图总结


    前言

    上面文章Hive增量查询Hudi表提到Hudi表有读优化视图和实时视图,其实当时并没有完全掌握,所以现在单独学习总结。Hudi官网文档中文称之为视图,其实英文为query types翻译过来为查询类型

    Query types

    Hudi 支持下面三种视图

    • Snapshot Queries 快照查询/实时视图 Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features. 在此视图上的查询可以查看给定提交或压缩操作时表的最新快照。对于读时合并表(MOR表) 该视图通过动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟)。对于写时复制表(COW表),它提供了现有parquet表的插入式替换,同时提供了插入/删除和其他写侧功能。

    • Incremental Queries 增量查询/增量视图,也就是上篇文章讲的增量查询 Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines. 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。

    • Read Optimized Queries 读优化查询/读优化视图 : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table. 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。 该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非Hudi列式数据集相比,具有相同的列式查询性能。

    表类型

    Table TypeSupported Query types
    Copy On WriteSnapshot Queries + Incremental Queries
    Merge On ReadSnapshot Queries + Incremental Queries + Read Optimized Queries

    也就是读优化视图只有在MOR表中存在,这点在上篇文章中也提到过,这次会从源码层面分析两种表类型的区别以及如何实现的。
    另外关于这一点官网中文文档写错了,大家注意别被误导,估计是因为旧版本,且中文文档没有人维护贡献,就没人贡献修改了~,稍后我有时间会尝试提个PR修复一下,错误截图:

    2022.06.30更新:已提交PR https://github.com/apache/hudi/pull/6008

    源码

    简单从源码层面分析同步Hive表时两种表类型的区别,Hudi同步Hive元数据的工具类为HiveSyncTool,如何利用HiveSyncTool同步元数据,先进行一个简单的示例,这里用Spark进行示例,因为Spark有获取hadoopConf的API,代码较少,方便示例,其实纯Java也是可以实现的

    val basePath = new Path(pathStr)
    val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
    val hiveConf: HiveConf = new HiveConf()
    hiveConf.addResource(fs.getConf)
    val tableMetaClient = HoodieTableMetaClient.builder.setConf(fs.getConf).setBasePath(pathStr).build
    val recordKeyFields = tableMetaClient.getTableConfig.getRecordKeyFields
    var keys = ""
    if (recordKeyFields.isPresent) {
    keys = recordKeyFields.get().mkString(",")
    }
    var partitionPathFields: util.List[String] = null
    val partitionFields = tableMetaClient.getTableConfig.getPartitionFields
    if (partitionFields.isPresent) {
        import scala.collection.JavaConverters._
        partitionPathFields = partitionFields.get().toList.asJava
    }
    val hiveSyncConfig = getHiveSyncConfig(pathStr, hiveDatabaseName, tableName, partitionPathFields, keys)
    new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
    
    def getHiveSyncConfig(basePath: String, dbName: String, tableName: String,
                partitionPathFields: util.List[String] = null, keys: String = null): HiveSyncConfig = {
        val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
        hiveSyncConfig.syncMode = HiveSyncMode.HMS.name
        hiveSyncConfig.createManagedTable = true
        hiveSyncConfig.databaseName = dbName
        hiveSyncConfig.tableName = tableName
        hiveSyncConfig.basePath = basePath
        hiveSyncConfig.partitionValueExtractorClass = classOf[MultiPartKeysValueExtractor].getName
        if (partitionPathFields != null && !partitionPathFields.isEmpty) hiveSyncConfig.partitionFields = partitionPathFields
        if (!StringUtils.isNullOrEmpty(keys)) hiveSyncConfig.serdeProperties = "primaryKey = " + keys //Spark SQL 更新表时需要该属性确认主键字段
        hiveSyncConfig
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    这里利用tableMetaClient来获取表的主键和分区字段,因为同步元数据时Hudi表文件肯定已经存在了,当然如果知道表的主键和分区字段也可以自己指定,这里自动获取会更方便一些。
    其实主要是获取配置文件,构造同步工具类HiveSyncTool,然后利用syncHoodieTable同步元数据,建Hive表

    接下来看一下源码,首先new HiveSyncTool时,会根据表类型,当表类型为COW时,this.snapshotTableName = cfg.tableName,snapshotTableName 也就是实时视图等于表名,而读优化视图为空,当为MOR表示,实时视图为tableName_rt,而对于读优化视图,默认情况下为tableName_ro,
    当配置skipROSuffix=true时,等于表名,这里可以发现当skipROSuffix=true时,MOR表的读优化视图为表名而COW表的实时视图为表名,感觉这里有点矛盾,可能是因为MOR表的读优化视图和COW表的实时视图查询均由HoodieParquetInputFormat实现,具体看后面的源码分析

      private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
      public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
      public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
    
      protected final HiveSyncConfig cfg;
      protected HoodieHiveClient hoodieHiveClient = null;
      protected String snapshotTableName = null;
      protected Option<String> roTableName = null;
    
      public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
        super(configuration.getAllProperties(), fs);
    
        try {
          this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
        } catch (RuntimeException e) {
          if (cfg.ignoreExceptions) {
            LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
          } else {
            throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
          }
        }
    
        this.cfg = cfg;
        // Set partitionFields to empty, when the NonPartitionedExtractor is used
        if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
          LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
          cfg.partitionFields = new ArrayList<>();
        }
        if (hoodieHiveClient != null) {
         switch (hoodieHiveClient.getTableType()) {
            case COPY_ON_WRITE:
              // 快照查询/实时视图等于表名
              this.snapshotTableName = cfg.tableName;
              // 读优化查询/读优化视图为空
              this.roTableName = Option.empty();
              break;
            case MERGE_ON_READ:
              // 快照查询/实时视图等于 表名+SUFFIX_SNAPSHOT_TABLE即 tableName_rt
              this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
              // 读优化查询/读优化视图 skipROSuffix默认为false 默认情况下 tableName_ro
              // 当配置skipROSuffix=true时,等于表名
              this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
                  Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
              break;
            default:
              LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
              throw new InvalidTableException(hoodieHiveClient.getBasePath());
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    接下来再看一下,上篇文章中提到的两个视图的实现类HoodieParquetInputFormatHoodieParquetRealtimeInputFormat

      @Override
      public void syncHoodieTable() {
        try {
          if (hoodieHiveClient != null) {
            doSync();
          }
        } catch (RuntimeException re) {
          throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
        } finally {
          if (hoodieHiveClient != null) {
            hoodieHiveClient.close();
          }
        }
      }
      protected void doSync() {
        switch (hoodieHiveClient.getTableType()) {
          case COPY_ON_WRITE:
            // COW表只有snapshotTableName,也就是实时视图,查询时是由`HoodieParquetInputFormat`实现
            syncHoodieTable(snapshotTableName, false, false);
            break;
          case MERGE_ON_READ:
            // sync a RO table for MOR
            // MOR 表的读优化视图,以`_RO`结尾,`READ_OPTIMIZED`的缩写,查询时由`HoodieParquetInputFormat`实现
            syncHoodieTable(roTableName.get(), false, true);
            // sync a RT table for MOR
            // MOR 表的实时视图,以`_RT`结尾,`REAL_TIME`的缩写,查询时由`HoodieParquetRealtimeInputFormat`实现
            syncHoodieTable(snapshotTableName, true, false);
            break;
          default:
            LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
            throw new InvalidTableException(hoodieHiveClient.getBasePath());
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    可以看到,两个表的区别为:1、COW只同步1个表的元数据:实时视图,MOR表同步两个表的元数据,读优化视图和实时视图 2、除了表名外,参数也不一样,这也就决定了查询时用哪个实现类来实现

    由于这篇文章不是主要讲解同步Hive元数据的源码,所以这里只贴主要实现部分,以后会单独总结一篇同步Hive元数据源码的文章。

        protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
                                    boolean readAsOptimized) {
            syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
        }
    
      private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
                              boolean readAsOptimized, MessageType schema) {
        
        Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized);
    
        String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
    
        }
    
        public static String getInputFormatClassName(HoodieFileFormat baseFileFormat, boolean realtime) {
        switch (baseFileFormat) {
          case PARQUET:
            if (realtime) {
              return HoodieParquetRealtimeInputFormat.class.getName();
            } else {
              return HoodieParquetInputFormat.class.getName();
            }
          case HFILE:
            if (realtime) {
              return HoodieHFileRealtimeInputFormat.class.getName();
            } else {
              return HoodieHFileInputFormat.class.getName();
            }
          case ORC:
            return OrcInputFormat.class.getName();
          default:
            throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    可以看到对于存储类型为PARQUET时,当useRealtimeInputFormat为true时,那么inputFormat的实现类为HoodieParquetRealtimeInputFormat,当为false时,实现类为HoodieParquetInputFormat,至于另外一个参数readAsOptimized,是否为读优化,这个参数是Spark SQL读取时用来判断该表为实时视图还是读优化视图,相关源码

    // 同步元数据建表时添加参数:`hoodie.query.as.ro.table=true/false`
    sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
    
    Spark读取Hive表时,用来判断,在类`org.apache.hudi.DataSourceOptionsHelper`
    
      def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
        // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
        // or else use query type from QUERY_TYPE.
        val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
          .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
          .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
    
        Map(
          QUERY_TYPE.key -> queryType
        ) ++ translateConfigurations(parameters)
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    体现在建表语句里则为:

    WITH SERDEPROPERTIES (                             |
    |   'hoodie.query.as.ro.table'='false',
    
    • 1
    • 2

    inputFormat的语句:

    STORED AS INPUTFORMAT                              |
    |   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
    
    • 1
    • 2

    完整的建表语句在后面的示例中

    示例

    DF

    这里利用Apache Hudi 入门学习总结中写Hudi并同步到Hive表的程序来验证

    COW表

    由于之前的文章中已经有COW表的建表语句了,这里直接copy过来

    +----------------------------------------------------+
    |                   createtab_stmt                   |
    +----------------------------------------------------+
    | CREATE TABLE `test_hudi_table_1`(                  |
    |   `_hoodie_commit_time` string,                    |
    |   `_hoodie_commit_seqno` string,                   |
    |   `_hoodie_record_key` string,                     |
    |   `_hoodie_partition_path` string,                 |
    |   `_hoodie_file_name` string,                      |
    |   `id` int,                                        |
    |   `name` string,                                   |
    |   `value` int,                                     |
    |   `ts` int)                                        |
    | PARTITIONED BY (                                   |
    |   `dt` string)                                     |
    | ROW FORMAT SERDE                                   |
    |   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
    | WITH SERDEPROPERTIES (                             |
    |   'hoodie.query.as.ro.table'='false',              |
    |   'path'='/tmp/test_hudi_table_1',                  |
    |   'primaryKey'='id')                               |
    | STORED AS INPUTFORMAT                              |
    |   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
    | OUTPUTFORMAT                                       |
    |   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
    | LOCATION                                           |
    |   'hdfs://cluster1/tmp/test_hudi_table_1'           |
    | TBLPROPERTIES (                                    |
    |   'last_commit_time_sync'='20220512101500',        |
    |   'spark.sql.sources.provider'='hudi',             |
    |   'spark.sql.sources.schema.numPartCols'='1',      |
    |   'spark.sql.sources.schema.numParts'='1',         |
    |   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
    |   'spark.sql.sources.schema.partCol.0'='dt',       |
    |   'transient_lastDdlTime'='1652320902')            |
    +----------------------------------------------------+
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    可以看到'hoodie.query.as.ro.table'='false',对于COW表的视图为实时视图,inputFormat为org.apache.hudi.hadoop.HoodieParquetInputFormat

    MOR表

    我们将之前的save2HudiSyncHiveWithPrimaryKey方法中加个表类型的参数option(TABLE_TYPE.key(), MOR_TABLE_TYPE_OPT_VAL),将表名库名修改一下:

        val databaseName = "test"
        val tableName1 = "test_hudi_table_df_mor"
        val primaryKey = "id"
        val preCombineField = "ts"
        val partitionField = "dt"
        val tablePath1 = "/tmp/test_hudi_table_df_mor"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    同步Hive表成功后,show tables,发现建了两张表test_hudi_table_df_mor_rotest_hudi_table_df_mor_rt,通过上面的源码分析部分,我们知道_ro为读优化表,_rt为实时表,我们再看一下建表语句:

    +----------------------------------------------------+
    |                   createtab_stmt                   |
    +----------------------------------------------------+
    | CREATE TABLE `test_hudi_table_df_mor_ro`(          |
    |   `_hoodie_commit_time` string,                    |
    |   `_hoodie_commit_seqno` string,                   |
    |   `_hoodie_record_key` string,                     |
    |   `_hoodie_partition_path` string,                 |
    |   `_hoodie_file_name` string,                      |
    |   `id` int,                                        |
    |   `name` string,                                   |
    |   `value` int,                                     |
    |   `ts` int)                                        |
    | PARTITIONED BY (                                   |
    |   `dt` string)                                     |
    | ROW FORMAT SERDE                                   |
    |   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
    | WITH SERDEPROPERTIES (                             |
    |   'hoodie.query.as.ro.table'='true',               |
    |   'path'='/tmp/test_hudi_table_df_mor',            |
    |   'primaryKey'='id')                               |
    | STORED AS INPUTFORMAT                              |
    |   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
    | OUTPUTFORMAT                                       |
    |   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
    | LOCATION                                           |
    |   'hdfs://cluster1/tmp/test_hudi_table_df_mor'     |
    | TBLPROPERTIES (                                    |
    |   'last_commit_time_sync'='20220629145934',        |
    |   'spark.sql.sources.provider'='hudi',             |
    |   'spark.sql.sources.schema.numPartCols'='1',      |
    |   'spark.sql.sources.schema.numParts'='1',         |
    |   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
    |   'spark.sql.sources.schema.partCol.0'='dt',       |
    |   'transient_lastDdlTime'='1656486059')            |
    +----------------------------------------------------+
    
    +----------------------------------------------------+
    |                   createtab_stmt                   |
    +----------------------------------------------------+
    | CREATE TABLE `test_hudi_table_df_mor_rt`(          |
    |   `_hoodie_commit_time` string,                    |
    |   `_hoodie_commit_seqno` string,                   |
    |   `_hoodie_record_key` string,                     |
    |   `_hoodie_partition_path` string,                 |
    |   `_hoodie_file_name` string,                      |
    |   `id` int,                                        |
    |   `name` string,                                   |
    |   `value` int,                                     |
    |   `ts` int)                                        |
    | PARTITIONED BY (                                   |
    |   `dt` string)                                     |
    | ROW FORMAT SERDE                                   |
    |   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
    | WITH SERDEPROPERTIES (                             |
    |   'hoodie.query.as.ro.table'='false',              |
    |   'path'='/tmp/test_hudi_table_df_mor',            |
    |   'primaryKey'='id')                               |
    | STORED AS INPUTFORMAT                              |
    |   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  |
    | OUTPUTFORMAT                                       |
    |   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
    | LOCATION                                           |
    |   'hdfs://cluster1/tmp/test_hudi_table_df_mor'     |
    | TBLPROPERTIES (                                    |
    |   'last_commit_time_sync'='20220629145934',        |
    |   'spark.sql.sources.provider'='hudi',             |
    |   'spark.sql.sources.schema.numPartCols'='1',      |
    |   'spark.sql.sources.schema.numParts'='1',         |
    |   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
    |   'spark.sql.sources.schema.partCol.0'='dt',       |
    |   'transient_lastDdlTime'='1656486059')            |
    +----------------------------------------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    可以看到_ro_rt有两个区别,一个是hoodie.query.as.ro.table,另外一个是INPUTFORMAT,对于Hive查询来说,只有INPUTFORMAT有用,hoodie.query.as.ro.table是Spark查询时用来判断是否为读优化表的,因为MOR表只有一次写入,所以只有parquet文件,没有增量文件.log,所以两个表查询出来的结构是一样的,后面用Spark SQL示例两者的区别

    Spark SQL

    Hudi Spark SQL建表,不了解的可以参考:Hudi Spark SQL总结,之所以再提一下Spark SQL建表,是因为我发现他和DF写数据再同步建表有些许差别

    COW表

    create table test_hudi_table_cow (
      id int,
      name string,
      price double,
      ts long,
      dt string
    ) using hudi
     partitioned by (dt)
     options (
      primaryKey = 'id',
      preCombineField = 'ts',
      type = 'cow'
     );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    建表完成后,在Hive里查看Hive表的建表语句

    show create table test_hudi_table_cow;
    
    +----------------------------------------------------+
    |                   createtab_stmt                   |
    +----------------------------------------------------+
    | CREATE TABLE `test_hudi_table_cow`(                |
    |   `_hoodie_commit_time` string,                    |
    |   `_hoodie_commit_seqno` string,                   |
    |   `_hoodie_record_key` string,                     |
    |   `_hoodie_partition_path` string,                 |
    |   `_hoodie_file_name` string,                      |
    |   `id` int,                                        |
    |   `name` string,                                   |
    |   `price` double,                                  |
    |   `ts` bigint)                                     |
    | PARTITIONED BY (                                   |
    |   `dt` string)                                     |
    | ROW FORMAT SERDE                                   |
    |   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
    | WITH SERDEPROPERTIES (                             |
    |   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow',  |
    |   'preCombineField'='ts',                          |
    |   'primaryKey'='id',                               |
    |   'type'='cow')                                    |
    | STORED AS INPUTFORMAT                              |
    |   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
    | OUTPUTFORMAT                                       |
    |   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
    | LOCATION                                           |
    |   'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow' |
    | TBLPROPERTIES (                                    |
    |   'last_commit_time_sync'='20220628152846',        |
    |   'spark.sql.create.version'='2.4.5',              |
    |   'spark.sql.sources.provider'='hudi',             |
    |   'spark.sql.sources.schema.numPartCols'='1',      |
    |   'spark.sql.sources.schema.numParts'='1',         |
    |   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
    |   'spark.sql.sources.schema.partCol.0'='dt',       |
    |   'transient_lastDdlTime'='1656401195')            |
    +----------------------------------------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    我们发现,Spark SQL建的表中没有hoodie.query.as.ro.table,我看了一下源码发现(上面有提到),Spark查询时

    val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
        .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
        .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
    
    • 1
    • 2
    • 3

    QUERY_TYPE的默认值为QUERY_TYPE_SNAPSHOT_OPT_VAL,也就是快照查询,COW只有快照查询也就是默认值没有问题,QUERY_TYPE有三种类型:QUERY_TYPE_SNAPSHOT_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_INCREMENTAL_OPT_VAL,分别对应实时查询,读优化查询,增量查询,至于怎么利用Spark实现这些查询,这里不涉及

    MOR表

    create table test_hudi_table_mor (
      id int,
      name string,
      price double,
      ts long,
      dt string
    ) using hudi
     partitioned by (dt)
     options (
      primaryKey = 'id',
      preCombineField = 'ts',
      type = 'mor'
     );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    我们用Spark创建MOR表后,show tables看一下发现只有test_hudi_table_mor表,没有对应的_rt_ro表,其实SparkSQL建表的时候还没用到Hive同步工具类HiveSyncTool,SparkSQL有自己的一套建表逻辑,而只有在写数据时才会用到HiveSyncTool,这也就是上面讲到的SparkSQL和DF同步建出来的表有差异的原因,接下来我们插入一条数据,来看一下结果

    insert into test_hudi_table_mor values (1,'hudi',10,100,'2021-05-05');
    
    • 1

    我们发现多了两张表,因为这两张表,是insert 数据然后利用同步工具类HiveSyncTool创建的表,所以和程序中用DF写数据同步建的表是一样的,区别是内部表和外部表的区别,其实SparkSQL的逻辑如果表路径不等于库路径+表名,那么为外部表,这是合理的,而我们用DF建的表是因为我们程序中指定了内部表的参数,这样我们drop其中一张表就可以删掉数据,而用SparkSQL建的表,其实多了一张表内部表test_hudi_table_mor,我们可以通过drop这张表来删除数据。

    +----------------------------------------------------+
    |                   createtab_stmt                   |
    +----------------------------------------------------+
    | CREATE EXTERNAL TABLE `test_hudi_table_mor_ro`(    |
    |   `_hoodie_commit_time` string COMMENT '',         |
    |   `_hoodie_commit_seqno` string COMMENT '',        |
    |   `_hoodie_record_key` string COMMENT '',          |
    |   `_hoodie_partition_path` string COMMENT '',      |
    |   `_hoodie_file_name` string COMMENT '',           |
    |   `id` int COMMENT '',                             |
    |   `name` string COMMENT '',                        |
    |   `price` double COMMENT '',                       |
    |   `ts` bigint COMMENT '')                          |
    | PARTITIONED BY (                                   |
    |   `dt` string COMMENT '')                          |
    | ROW FORMAT SERDE                                   |
    |   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
    | WITH SERDEPROPERTIES (                             |
    |   'hoodie.query.as.ro.table'='true',               |
    |   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor')  |
    | STORED AS INPUTFORMAT                              |
    |   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
    | OUTPUTFORMAT                                       |
    |   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
    | LOCATION                                           |
    |   'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
    | TBLPROPERTIES (                                    |
    |   'last_commit_time_sync'='20220629153816',        |
    |   'spark.sql.sources.provider'='hudi',             |
    |   'spark.sql.sources.schema.numPartCols'='1',      |
    |   'spark.sql.sources.schema.numParts'='1',         |
    |   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
    |   'spark.sql.sources.schema.partCol.0'='dt',       |
    |   'transient_lastDdlTime'='1656488248')            |
    +----------------------------------------------------+
    
    +----------------------------------------------------+
    |                   createtab_stmt                   |
    +----------------------------------------------------+
    | CREATE EXTERNAL TABLE `test_hudi_table_mor_rt`(    |
    |   `_hoodie_commit_time` string COMMENT '',         |
    |   `_hoodie_commit_seqno` string COMMENT '',        |
    |   `_hoodie_record_key` string COMMENT '',          |
    |   `_hoodie_partition_path` string COMMENT '',      |
    |   `_hoodie_file_name` string COMMENT '',           |
    |   `id` int COMMENT '',                             |
    |   `name` string COMMENT '',                        |
    |   `price` double COMMENT '',                       |
    |   `ts` bigint COMMENT '')                          |
    | PARTITIONED BY (                                   |
    |   `dt` string COMMENT '')                          |
    | ROW FORMAT SERDE                                   |
    |   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
    | WITH SERDEPROPERTIES (                             |
    |   'hoodie.query.as.ro.table'='false',              |
    |   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor')  |
    | STORED AS INPUTFORMAT                              |
    |   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  |
    | OUTPUTFORMAT                                       |
    |   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
    | LOCATION                                           |
    |   'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
    | TBLPROPERTIES (                                    |
    |   'last_commit_time_sync'='20220629153816',        |
    |   'spark.sql.sources.provider'='hudi',             |
    |   'spark.sql.sources.schema.numPartCols'='1',      |
    |   'spark.sql.sources.schema.numParts'='1',         |
    |   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
    |   'spark.sql.sources.schema.partCol.0'='dt',       |
    |   'transient_lastDdlTime'='1656488248')            |
    +----------------------------------------------------+
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    我们再插入一条数据和更新一条数据,目的是为了生成log文件,来看两个表的不同

    insert into test_hudi_table_mor values (2,'hudi',11,110,'2021-05-05');
    update test_hudi_table_mor set name='hudi_update' where id =1;
    
    • 1
    • 2
    select * from test_hudi_table_mor_ro;
    +----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
    | _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                 _hoodie_file_name                  | id  | name  | price  |  ts  |     dt      |
    +----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
    | 20220629153718       | 20220629153718_0_1    | id:1                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet | 1   | hudi  | 10.0   | 100  | 2021-05-05  |
    | 20220629153803       | 20220629153803_0_2    | id:2                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2   | hudi  | 11.0   | 110  | 2021-05-05  |
    +----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    select * from test_hudi_table_mor_rt;
    
    +----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
    | _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                 _hoodie_file_name                  | id  |     name     | price  |  ts  |     dt      |
    +----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
    | 20220629153816       | 20220629153816_0_1    | id:1                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0             | 1   | hudi_update  | 10.0   | 100  | 2021-05-05  |
    | 20220629153803       | 20220629153803_0_2    | id:2                | dt=2021-05-05           | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2   | hudi         | 11.0   | 110  | 2021-05-05  |
    +----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    我们发现_ro只能将新插入的查出来,而没有将更新的那条数据查出来,而_rt是将最新的数据都查出来,我们再插入和更新时看一下存储文件

    hadoop fs -ls hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05
    Found 4 items
    -rw-rw----+  3 spark hadoop        975 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.bc415cdb-2b21-4d09-a3f6-a779357aa819-0_20220629153803.log.1_0-186-10660
    -rw-rw----+  3 spark hadoop         93 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.hoodie_partition_metadata
    -rw-rw----+  3 spark hadoop     435283 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet
    -rw-rw----+  3 spark hadoop     434991 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    发现,insert时是生成新的parquet文件,而更新时是生成.log文件,所以_ro表将新插入的数据也出来了,因为_ro只能查parquet文件(基本文件)中的数据,而_rt表可以动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟),至于MOR表的写入逻辑(什么条件下写增量文件)和合并逻辑(什么情况下合并增量文件为parquet),这里不深入讲解,以后我会单独总结。

  • 相关阅读:
    项目采购管理
    基于ARIMA-BP组合模型的货运量预测研究
    Java编程练习题Demo81-Demo95
    java基础3
    [附源码]计算机毕业设计springboot家庭教育app
    统一身份认证实现,推广的可能性及优缺点?
    打通谷歌办公软件 Bard与ChatGPT走差异化道路
    51单片机/STM32F103/STM32F407学习1_点亮LED灯
    职场小白如何将图片转文字?这个方法建议收藏使用!
    【2017年数据结构真题】
  • 原文地址:https://blog.csdn.net/dkl12/article/details/125607475