补充上一篇文章Hudi Spark源码学习总结-spark.read.format(“hudi”).load,由于上篇文章篇幅已经比较长了,所以单独写一篇补充一下,没有读过的可以先阅读一下,我们在上篇文章讲到resolveBaseFileOnlyRelation返回的是HadoopFsRelation,那么如果返回BaseFileOnlyRelation呢?
其实除了HadoopFsRelation、BaseFileOnlyRelation还有IncrementalRelation、MergeOnReadSnapshotRelation、MergeOnReadIncrementalRelation、HoodieBootstrapRelation,之所有要单独看一下BaseFileOnlyRelation,是因为我们从提交历史上可以看出,一开始默认的就是HadoopFsRelation后来添加了自定义的BaseFileOnlyRelation,然后现在回退成了HadoopFsRelation,查看对应PR了解了一下原因
添加BaseFileOnlyRelation的PR:[HUDI-3338] custom relation instead of HadoopFsRelation,原因是:目前,HadoopFsRelation将使用实际分区路径的值作为分区字段的值。但是,与普通表不同,Hudi在Parquet文件中保留分区值。在某些情况下,实际分区路径的值和分区字段的值是不同的。因此,这里实现了BaseFileOnlyViewRelation,通过它,Hudi可以管理自己的关系。
回退为HadoopFsRelation的PR:[HUDI-3902] Fallback to HadoopFsRelation in cases non-involving Schema Evolution,原因是:Spark的一些优化规则(以及一些其他处理)是基于HadoopFsRelation的使用,这导致当我们依赖自定义Relation时,这些优化没有得到应用。我们可以在[HUDI-3338]的PR的下面可以看到,有commiter提出,使用BaseFileOnlyRelation后查询性能相比于之前的HadoopFsRelation下降了大约一倍,这是因为Spark仅在少数地方做了查询优化,其中一个地方就是我们在上篇文章中讲到的FileSourceStrategy,我们看到[HUDI-3902]虽然回退到HadoopFsRelation,但是貌似并没有处理分区路径值的问题,所以后面又提了一个PR:
PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns,这个PR添加了自定义FileFormat:Spark24HoodieParquetFileFormat,它重写了ParquetFileFormat的buildReaderWithPartitionValues方法。其中的关键点是添加了参数shouldAppendPartitionValues来判断是否将分区路径中的值添加为分区字段的值。
if (shouldAppendPartitionValues) {
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
} else {
vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
}
那么接下来看一下BaseFileOnlyRelation后面的处理逻辑吧,为了方便调试,我们直接把resolveBaseFileOnlyRelation的返回值改为BaseFileOnlyRelation
我们先看一下qe.executedPlan的返回值是啥吧,对比一下和HadoopFsRelation不同,这样我们只需要分析不同点就可以了
CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,RowDataSourceScanExec())))
其中RowDataSourceScanExec的Relation为BaseFileOnlyRelation,rdd为Spark2HoodieFileScanRDD,这两个都是和Hudi相关,我们对比HadoopFsRelation发现由之前的FileSourceScanExec改成了RowDataSourceScanExec,除了这一点,其他的都一样,那么我们需要是分析一下是在哪里返回RowDataSourceScanExec的。
它会在DataSourceStrategy匹配到第二个,因为BaseFileOnlyRelation是PrunedFilteredScan的子类,可以看一下它的定义
class BaseFileOnlyRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String],
schemaSpec: Option[StructType])
extends BaseRelation
with FileRelation
with PrunedFilteredScan
with Logging
with SparkAdapterSupport {
case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with CastSupport {
import DataSourceStrategy._
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
case PhysicalOperation(projects, filters,
l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
......
private def pruneFilterProject(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = {
pruneFilterProjectRaw(
relation,
projects,
filterPredicates,
(requestedColumns, _, pushedFilters) => {
scanBuilder(requestedColumns, pushedFilters.toArray)
})
}
private def pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan = {
val projectSet = AttributeSet(projects.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val candidatePredicates = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
}}
val (unhandledPredicates, pushedFilters, handledFilters) =
selectFilters(relation.relation, candidatePredicates)
// Combines all Catalyst filter `Expression`s that are either not convertible to data source
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val requestedColumns = projects
// Safe due to if above.
.asInstanceOf[Seq[Attribute]]
// Match original case of attributes.
.map(relation.attributeMap)
val scan = RowDataSourceScanExec(
relation.output,
requestedColumns.map(relation.output.indexOf),
pushedFilters.toSet,
handledFilters,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// A set of column attributes that are only referenced by pushed down filters. We can
// eliminate them from requested columns.
val handledSet = {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
}
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
val scan = RowDataSourceScanExec(
relation.output,
requestedColumns.map(relation.output.indexOf),
pushedFilters.toSet,
handledFilters,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
}
在这方法里我们看到它会构造RowDataSourceScanExec并返回,其中它的relation参数为BaseFileOnlyRelation,而它的rdd是通过调用HoodieBaseRelation重写buildScan方法返回的RDD[Row],值为Spark2HoodieFileScanRDD,这正是查询Hudi表数据的核心逻辑
override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES
//
// In case list of requested columns doesn't contain the Primary Key one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
// Spark still fetches all the rows to execute the query correctly
//
// *Appending* additional columns to the ones requested by the caller is not a problem, as those
// will be "projected out" by the caller's projection;
//
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
// PROJECTION
val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns)
// NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro
// schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and
// could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions
// w/ more than 2 types are involved)
val sourceSchema = optimizerPrunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns)
val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
val tableAvroSchemaStr = tableAvroSchema.toString
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters)
// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
rdd.asInstanceOf[RDD[Row]]
}
}
protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): RDD[InternalRow] = {
val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredDataSchema = requiredDataSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
)
// NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller.
// This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the
// data file, but instead would be parsed from the partition path. In that case output of the file-reader will have
// different ordering of the fields than the original required schema (for more details please check out
// [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema
// back into the one expected by the caller
val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema)
// SPARK-37273 FileScanRDD constructor changed in SPARK 3.3
sparkAdapter.createHoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits.map(_.filePartition), requiredSchema.structTypeSchema)
.asInstanceOf[HoodieUnsafeRDD]
}
我们在上篇文章中讲过这个方法最终会调用RowDataSourceScanExec的inputRDDs
override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
直接返回rdd,rdd是通过HoodieBaseRelation.buildScan生成的Spark2HoodieFileScanRDD
本文总结了使用HadoopFsRelation和BaseFileOnlyRelation的原因以及使用BaseFileOnlyRelation时查询Hudi的逻辑,知道了在使用BaseFileOnlyRelation是通过buildScan实现查询的。
我在文章https://shzhangji.com/cnblogs/2018/12/09/spark-datasource-api-v2/学习
Spark DataSource时,了解到Spark DataSource V1是通过调用buildScan方法来获取数据源的RDD,所以也想看一下BaseFileOnlyRelation的buildScan方法是否也会在查询时用到以及什么情况下会用到,现在还不确定它和Spark DataSource V1的关系是啥,后面在学习总结Spark DataSource源码时,再来对比一下。