上面文章Hive增量查询Hudi表提到Hudi表有读优化视图和实时视图,其实当时并没有完全掌握,所以现在单独学习总结。Hudi官网文档中文称之为视图,其实英文为query types翻译过来为查询类型
Hudi 支持下面三种视图
Snapshot Queries 快照查询/实时视图 Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features. 在此视图上的查询可以查看给定提交或压缩操作时表的最新快照。对于读时合并表(MOR表) 该视图通过动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟)。对于写时复制表(COW表),它提供了现有parquet表的插入式替换,同时提供了插入/删除和其他写侧功能。
Incremental Queries 增量查询/增量视图,也就是上篇文章讲的增量查询 Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines. 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。
Read Optimized Queries 读优化查询/读优化视图 : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table. 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。 该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非Hudi列式数据集相比,具有相同的列式查询性能。
Table Type | Supported Query types |
---|---|
Copy On Write | Snapshot Queries + Incremental Queries |
Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries |
也就是读优化视图只有在MOR表中存在,这点在上篇文章中也提到过,这次会从源码层面分析两种表类型的区别以及如何实现的。
另外关于这一点官网中文文档写错了,大家注意别被误导,估计是因为旧版本,且中文文档没有人维护贡献,就没人贡献修改了~,稍后我有时间会尝试提个PR修复一下,错误截图:
2022.06.30更新:已提交PR https://github.com/apache/hudi/pull/6008
简单从源码层面分析同步Hive表时两种表类型的区别,Hudi同步Hive元数据的工具类为HiveSyncTool
,如何利用HiveSyncTool
同步元数据,先进行一个简单的示例,这里用Spark进行示例,因为Spark有获取hadoopConf的API,代码较少,方便示例,其实纯Java也是可以实现的
val basePath = new Path(pathStr)
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
val hiveConf: HiveConf = new HiveConf()
hiveConf.addResource(fs.getConf)
val tableMetaClient = HoodieTableMetaClient.builder.setConf(fs.getConf).setBasePath(pathStr).build
val recordKeyFields = tableMetaClient.getTableConfig.getRecordKeyFields
var keys = ""
if (recordKeyFields.isPresent) {
keys = recordKeyFields.get().mkString(",")
}
var partitionPathFields: util.List[String] = null
val partitionFields = tableMetaClient.getTableConfig.getPartitionFields
if (partitionFields.isPresent) {
import scala.collection.JavaConverters._
partitionPathFields = partitionFields.get().toList.asJava
}
val hiveSyncConfig = getHiveSyncConfig(pathStr, hiveDatabaseName, tableName, partitionPathFields, keys)
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
def getHiveSyncConfig(basePath: String, dbName: String, tableName: String,
partitionPathFields: util.List[String] = null, keys: String = null): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
hiveSyncConfig.syncMode = HiveSyncMode.HMS.name
hiveSyncConfig.createManagedTable = true
hiveSyncConfig.databaseName = dbName
hiveSyncConfig.tableName = tableName
hiveSyncConfig.basePath = basePath
hiveSyncConfig.partitionValueExtractorClass = classOf[MultiPartKeysValueExtractor].getName
if (partitionPathFields != null && !partitionPathFields.isEmpty) hiveSyncConfig.partitionFields = partitionPathFields
if (!StringUtils.isNullOrEmpty(keys)) hiveSyncConfig.serdeProperties = "primaryKey = " + keys //Spark SQL 更新表时需要该属性确认主键字段
hiveSyncConfig
}
这里利用tableMetaClient来获取表的主键和分区字段,因为同步元数据时Hudi表文件肯定已经存在了,当然如果知道表的主键和分区字段也可以自己指定,这里自动获取会更方便一些。
其实主要是获取配置文件,构造同步工具类HiveSyncTool
,然后利用syncHoodieTable
同步元数据,建Hive表
接下来看一下源码,首先new HiveSyncTool
时,会根据表类型,当表类型为COW时,this.snapshotTableName = cfg.tableName
,snapshotTableName 也就是实时视图等于表名,而读优化视图为空,当为MOR表示,实时视图为tableName_rt
,而对于读优化视图,默认情况下为tableName_ro
,
当配置skipROSuffix=true时
,等于表名,这里可以发现当skipROSuffix=true时
,MOR表的读优化视图为表名而COW表的实时视图为表名,感觉这里有点矛盾,可能是因为MOR表的读优化视图和COW表的实时视图查询均由HoodieParquetInputFormat
实现,具体看后面的源码分析
private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
protected final HiveSyncConfig cfg;
protected HoodieHiveClient hoodieHiveClient = null;
protected String snapshotTableName = null;
protected Option<String> roTableName = null;
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(configuration.getAllProperties(), fs);
try {
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
} catch (RuntimeException e) {
if (cfg.ignoreExceptions) {
LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
} else {
throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
}
}
this.cfg = cfg;
// Set partitionFields to empty, when the NonPartitionedExtractor is used
if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
cfg.partitionFields = new ArrayList<>();
}
if (hoodieHiveClient != null) {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
// 快照查询/实时视图等于表名
this.snapshotTableName = cfg.tableName;
// 读优化查询/读优化视图为空
this.roTableName = Option.empty();
break;
case MERGE_ON_READ:
// 快照查询/实时视图等于 表名+SUFFIX_SNAPSHOT_TABLE即 tableName_rt
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
// 读优化查询/读优化视图 skipROSuffix默认为false 默认情况下 tableName_ro
// 当配置skipROSuffix=true时,等于表名
this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}
}
接下来再看一下,上篇文章中提到的两个视图的实现类HoodieParquetInputFormat
和HoodieParquetRealtimeInputFormat
@Override
public void syncHoodieTable() {
try {
if (hoodieHiveClient != null) {
doSync();
}
} catch (RuntimeException re) {
throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
} finally {
if (hoodieHiveClient != null) {
hoodieHiveClient.close();
}
}
}
protected void doSync() {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
// COW表只有snapshotTableName,也就是实时视图,查询时是由`HoodieParquetInputFormat`实现
syncHoodieTable(snapshotTableName, false, false);
break;
case MERGE_ON_READ:
// sync a RO table for MOR
// MOR 表的读优化视图,以`_RO`结尾,`READ_OPTIMIZED`的缩写,查询时由`HoodieParquetInputFormat`实现
syncHoodieTable(roTableName.get(), false, true);
// sync a RT table for MOR
// MOR 表的实时视图,以`_RT`结尾,`REAL_TIME`的缩写,查询时由`HoodieParquetRealtimeInputFormat`实现
syncHoodieTable(snapshotTableName, true, false);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}
可以看到,两个表的区别为:1、COW只同步1个表的元数据:实时视图,MOR表同步两个表的元数据,读优化视图和实时视图 2、除了表名外,参数也不一样,这也就决定了查询时用哪个实现类来实现
由于这篇文章不是主要讲解同步Hive元数据的源码,所以这里只贴主要实现部分,以后会单独总结一篇同步Hive元数据源码的文章。
protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
boolean readAsOptimized) {
syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
}
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
boolean readAsOptimized, MessageType schema) {
Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized);
String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
}
public static String getInputFormatClassName(HoodieFileFormat baseFileFormat, boolean realtime) {
switch (baseFileFormat) {
case PARQUET:
if (realtime) {
return HoodieParquetRealtimeInputFormat.class.getName();
} else {
return HoodieParquetInputFormat.class.getName();
}
case HFILE:
if (realtime) {
return HoodieHFileRealtimeInputFormat.class.getName();
} else {
return HoodieHFileInputFormat.class.getName();
}
case ORC:
return OrcInputFormat.class.getName();
default:
throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat);
}
可以看到对于存储类型为PARQUET时,当useRealtimeInputFormat
为true时,那么inputFormat的实现类为HoodieParquetRealtimeInputFormat
,当为false时,实现类为HoodieParquetInputFormat
,至于另外一个参数readAsOptimized
,是否为读优化,这个参数是Spark SQL读取时用来判断该表为实时视图还是读优化视图,相关源码
// 同步元数据建表时添加参数:`hoodie.query.as.ro.table=true/false`
sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
Spark读取Hive表时,用来判断,在类`org.apache.hudi.DataSourceOptionsHelper`
def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
Map(
QUERY_TYPE.key -> queryType
) ++ translateConfigurations(parameters)
}
体现在建表语句里则为:
WITH SERDEPROPERTIES ( |
| 'hoodie.query.as.ro.table'='false',
inputFormat的语句:
STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
完整的建表语句在后面的示例中
这里利用Apache Hudi 入门学习总结中写Hudi并同步到Hive表的程序来验证
由于之前的文章中已经有COW表的建表语句了,这里直接copy过来
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_table_1`( |
| `_hoodie_commit_time` string, |
| `_hoodie_commit_seqno` string, |
| `_hoodie_record_key` string, |
| `_hoodie_partition_path` string, |
| `_hoodie_file_name` string, |
| `id` int, |
| `name` string, |
| `value` int, |
| `ts` int) |
| PARTITIONED BY ( |
| `dt` string) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| WITH SERDEPROPERTIES ( |
| 'hoodie.query.as.ro.table'='false', |
| 'path'='/tmp/test_hudi_table_1', |
| 'primaryKey'='id') |
| STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://cluster1/tmp/test_hudi_table_1' |
| TBLPROPERTIES ( |
| 'last_commit_time_sync'='20220512101500', |
| 'spark.sql.sources.provider'='hudi', |
| 'spark.sql.sources.schema.numPartCols'='1', |
| 'spark.sql.sources.schema.numParts'='1', |
| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
| 'spark.sql.sources.schema.partCol.0'='dt', |
| 'transient_lastDdlTime'='1652320902') |
+----------------------------------------------------+
可以看到'hoodie.query.as.ro.table'='false'
,对于COW表的视图为实时视图,inputFormat为org.apache.hudi.hadoop.HoodieParquetInputFormat
我们将之前的save2HudiSyncHiveWithPrimaryKey
方法中加个表类型的参数option(TABLE_TYPE.key(), MOR_TABLE_TYPE_OPT_VAL)
,将表名库名修改一下:
val databaseName = "test"
val tableName1 = "test_hudi_table_df_mor"
val primaryKey = "id"
val preCombineField = "ts"
val partitionField = "dt"
val tablePath1 = "/tmp/test_hudi_table_df_mor"
同步Hive表成功后,show tables
,发现建了两张表test_hudi_table_df_mor_ro
和test_hudi_table_df_mor_rt
,通过上面的源码分析部分,我们知道_ro
为读优化表,_rt
为实时表,我们再看一下建表语句:
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_table_df_mor_ro`( |
| `_hoodie_commit_time` string, |
| `_hoodie_commit_seqno` string, |
| `_hoodie_record_key` string, |
| `_hoodie_partition_path` string, |
| `_hoodie_file_name` string, |
| `id` int, |
| `name` string, |
| `value` int, |
| `ts` int) |
| PARTITIONED BY ( |
| `dt` string) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| WITH SERDEPROPERTIES ( |
| 'hoodie.query.as.ro.table'='true', |
| 'path'='/tmp/test_hudi_table_df_mor', |
| 'primaryKey'='id') |
| STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://cluster1/tmp/test_hudi_table_df_mor' |
| TBLPROPERTIES ( |
| 'last_commit_time_sync'='20220629145934', |
| 'spark.sql.sources.provider'='hudi', |
| 'spark.sql.sources.schema.numPartCols'='1', |
| 'spark.sql.sources.schema.numParts'='1', |
| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
| 'spark.sql.sources.schema.partCol.0'='dt', |
| 'transient_lastDdlTime'='1656486059') |
+----------------------------------------------------+
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_table_df_mor_rt`( |
| `_hoodie_commit_time` string, |
| `_hoodie_commit_seqno` string, |
| `_hoodie_record_key` string, |
| `_hoodie_partition_path` string, |
| `_hoodie_file_name` string, |
| `id` int, |
| `name` string, |
| `value` int, |
| `ts` int) |
| PARTITIONED BY ( |
| `dt` string) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| WITH SERDEPROPERTIES ( |
| 'hoodie.query.as.ro.table'='false', |
| 'path'='/tmp/test_hudi_table_df_mor', |
| 'primaryKey'='id') |
| STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://cluster1/tmp/test_hudi_table_df_mor' |
| TBLPROPERTIES ( |
| 'last_commit_time_sync'='20220629145934', |
| 'spark.sql.sources.provider'='hudi', |
| 'spark.sql.sources.schema.numPartCols'='1', |
| 'spark.sql.sources.schema.numParts'='1', |
| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
| 'spark.sql.sources.schema.partCol.0'='dt', |
| 'transient_lastDdlTime'='1656486059') |
+----------------------------------------------------+
可以看到_ro
和_rt
有两个区别,一个是hoodie.query.as.ro.table
,另外一个是INPUTFORMAT,对于Hive查询来说,只有INPUTFORMAT有用,hoodie.query.as.ro.table
是Spark查询时用来判断是否为读优化表的,因为MOR表只有一次写入,所以只有parquet文件,没有增量文件.log
,所以两个表查询出来的结构是一样的,后面用Spark SQL示例两者的区别
Hudi Spark SQL建表,不了解的可以参考:Hudi Spark SQL总结,之所以再提一下Spark SQL建表,是因为我发现他和DF写数据再同步建表有些许差别
create table test_hudi_table_cow (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'cow'
);
建表完成后,在Hive里查看Hive表的建表语句
show create table test_hudi_table_cow;
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_table_cow`( |
| `_hoodie_commit_time` string, |
| `_hoodie_commit_seqno` string, |
| `_hoodie_record_key` string, |
| `_hoodie_partition_path` string, |
| `_hoodie_file_name` string, |
| `id` int, |
| `name` string, |
| `price` double, |
| `ts` bigint) |
| PARTITIONED BY ( |
| `dt` string) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| WITH SERDEPROPERTIES ( |
| 'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow', |
| 'preCombineField'='ts', |
| 'primaryKey'='id', |
| 'type'='cow') |
| STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_cow' |
| TBLPROPERTIES ( |
| 'last_commit_time_sync'='20220628152846', |
| 'spark.sql.create.version'='2.4.5', |
| 'spark.sql.sources.provider'='hudi', |
| 'spark.sql.sources.schema.numPartCols'='1', |
| 'spark.sql.sources.schema.numParts'='1', |
| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
| 'spark.sql.sources.schema.partCol.0'='dt', |
| 'transient_lastDdlTime'='1656401195') |
+----------------------------------------------------+
我们发现,Spark SQL建的表中没有hoodie.query.as.ro.table
,我看了一下源码发现(上面有提到),Spark查询时
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
QUERY_TYPE
的默认值为QUERY_TYPE_SNAPSHOT_OPT_VAL
,也就是快照查询,COW只有快照查询也就是默认值没有问题,QUERY_TYPE
有三种类型:QUERY_TYPE_SNAPSHOT_OPT_VAL
, QUERY_TYPE_READ_OPTIMIZED_OPT_VA
L, QUERY_TYPE_INCREMENTAL_OPT_VAL
,分别对应实时查询,读优化查询,增量查询,至于怎么利用Spark实现这些查询,这里不涉及
create table test_hudi_table_mor (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'mor'
);
我们用Spark创建MOR表后,show tables看一下发现只有test_hudi_table_mor表,没有对应的_rt
、_ro
表,其实SparkSQL建表的时候还没用到Hive同步工具类HiveSyncTool
,SparkSQL有自己的一套建表逻辑,而只有在写数据时才会用到HiveSyncTool
,这也就是上面讲到的SparkSQL和DF同步建出来的表有差异的原因,接下来我们插入一条数据,来看一下结果
insert into test_hudi_table_mor values (1,'hudi',10,100,'2021-05-05');
我们发现多了两张表,因为这两张表,是insert 数据然后利用同步工具类HiveSyncTool
创建的表,所以和程序中用DF写数据同步建的表是一样的,区别是内部表和外部表的区别,其实SparkSQL的逻辑如果表路径不等于库路径+表名,那么为外部表,这是合理的,而我们用DF建的表是因为我们程序中指定了内部表的参数,这样我们drop其中一张表就可以删掉数据,而用SparkSQL建的表,其实多了一张表内部表test_hudi_table_mor
,我们可以通过drop这张表来删除数据。
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `test_hudi_table_mor_ro`( |
| `_hoodie_commit_time` string COMMENT '', |
| `_hoodie_commit_seqno` string COMMENT '', |
| `_hoodie_record_key` string COMMENT '', |
| `_hoodie_partition_path` string COMMENT '', |
| `_hoodie_file_name` string COMMENT '', |
| `id` int COMMENT '', |
| `name` string COMMENT '', |
| `price` double COMMENT '', |
| `ts` bigint COMMENT '') |
| PARTITIONED BY ( |
| `dt` string COMMENT '') |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| WITH SERDEPROPERTIES ( |
| 'hoodie.query.as.ro.table'='true', |
| 'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor') |
| STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
| TBLPROPERTIES ( |
| 'last_commit_time_sync'='20220629153816', |
| 'spark.sql.sources.provider'='hudi', |
| 'spark.sql.sources.schema.numPartCols'='1', |
| 'spark.sql.sources.schema.numParts'='1', |
| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
| 'spark.sql.sources.schema.partCol.0'='dt', |
| 'transient_lastDdlTime'='1656488248') |
+----------------------------------------------------+
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `test_hudi_table_mor_rt`( |
| `_hoodie_commit_time` string COMMENT '', |
| `_hoodie_commit_seqno` string COMMENT '', |
| `_hoodie_record_key` string COMMENT '', |
| `_hoodie_partition_path` string COMMENT '', |
| `_hoodie_file_name` string COMMENT '', |
| `id` int COMMENT '', |
| `name` string COMMENT '', |
| `price` double COMMENT '', |
| `ts` bigint COMMENT '') |
| PARTITIONED BY ( |
| `dt` string COMMENT '') |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| WITH SERDEPROPERTIES ( |
| 'hoodie.query.as.ro.table'='false', |
| 'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor') |
| STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor' |
| TBLPROPERTIES ( |
| 'last_commit_time_sync'='20220629153816', |
| 'spark.sql.sources.provider'='hudi', |
| 'spark.sql.sources.schema.numPartCols'='1', |
| 'spark.sql.sources.schema.numParts'='1', |
| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"double","nullable":true,"metadata":{}},{"name":"ts","type":"long","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
| 'spark.sql.sources.schema.partCol.0'='dt', |
| 'transient_lastDdlTime'='1656488248') |
+----------------------------------------------------+
我们再插入一条数据和更新一条数据,目的是为了生成log文件,来看两个表的不同
insert into test_hudi_table_mor values (2,'hudi',11,110,'2021-05-05');
update test_hudi_table_mor set name='hudi_update' where id =1;
select * from test_hudi_table_mor_ro;
+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt |
+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
| 20220629153718 | 20220629153718_0_1 | id:1 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet | 1 | hudi | 10.0 | 100 | 2021-05-05 |
| 20220629153803 | 20220629153803_0_2 | id:2 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2 | hudi | 11.0 | 110 | 2021-05-05 |
+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+-------+--------+------+-------------+
select * from test_hudi_table_mor_rt;
+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | price | ts | dt |
+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
| 20220629153816 | 20220629153816_0_1 | id:1 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0 | 1 | hudi_update | 10.0 | 100 | 2021-05-05 |
| 20220629153803 | 20220629153803_0_2 | id:2 | dt=2021-05-05 | bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet | 2 | hudi | 11.0 | 110 | 2021-05-05 |
+----------------------+-----------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------------+--------+------+-------------+
我们发现_ro
只能将新插入的查出来,而没有将更新的那条数据查出来,而_rt
是将最新的数据都查出来,我们再插入和更新时看一下存储文件
hadoop fs -ls hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05
Found 4 items
-rw-rw----+ 3 spark hadoop 975 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.bc415cdb-2b21-4d09-a3f6-a779357aa819-0_20220629153803.log.1_0-186-10660
-rw-rw----+ 3 spark hadoop 93 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/.hoodie_partition_metadata
-rw-rw----+ 3 spark hadoop 435283 2022-06-29 15:37 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-125-7240_20220629153718.parquet
-rw-rw----+ 3 spark hadoop 434991 2022-06-29 15:38 hdfs://cluster1/warehouse/tablespace/managed/hive/test.db/test_hudi_table_mor/dt=2021-05-05/bc415cdb-2b21-4d09-a3f6-a779357aa819-0_0-154-8848_20220629153803.parquet
发现,insert时是生成新的parquet文件,而更新时是生成.log文件,所以_ro
表将新插入的数据也出来了,因为_ro
只能查parquet文件(基本文件)中的数据,而_rt
表可以动态合并最新文件切片的基本文件(例如parquet)和增量文件(例如avro)来提供近实时数据集(几分钟的延迟),至于MOR表的写入逻辑(什么条件下写增量文件)和合并逻辑(什么情况下合并增量文件为parquet),这里不深入讲解,以后我会单独总结。