• Hudi Spark源码学习总结-spark.read.format(“hudi“).load(2)


    前言

    补充上一篇文章Hudi Spark源码学习总结-spark.read.format(“hudi”).load,由于上篇文章篇幅已经比较长了,所以单独写一篇补充一下,没有读过的可以先阅读一下,我们在上篇文章讲到resolveBaseFileOnlyRelation返回的是HadoopFsRelation,那么如果返回BaseFileOnlyRelation呢?

    起因

    其实除了HadoopFsRelationBaseFileOnlyRelation还有IncrementalRelationMergeOnReadSnapshotRelationMergeOnReadIncrementalRelationHoodieBootstrapRelation,之所有要单独看一下BaseFileOnlyRelation,是因为我们从提交历史上可以看出,一开始默认的就是HadoopFsRelation后来添加了自定义的BaseFileOnlyRelation,然后现在回退成了HadoopFsRelation,查看对应PR了解了一下原因

    添加BaseFileOnlyRelation的PR:[HUDI-3338] custom relation instead of HadoopFsRelation,原因是:目前,HadoopFsRelation将使用实际分区路径的值作为分区字段的值。但是,与普通表不同,Hudi在Parquet文件中保留分区值。在某些情况下,实际分区路径的值和分区字段的值是不同的。因此,这里实现了BaseFileOnlyViewRelation,通过它,Hudi可以管理自己的关系。
    回退为HadoopFsRelation的PR:[HUDI-3902] Fallback to HadoopFsRelation in cases non-involving Schema Evolution,原因是:Spark的一些优化规则(以及一些其他处理)是基于HadoopFsRelation的使用,这导致当我们依赖自定义Relation时,这些优化没有得到应用。我们可以在[HUDI-3338]的PR的下面可以看到,有commiter提出,使用BaseFileOnlyRelation后查询性能相比于之前的HadoopFsRelation下降了大约一倍,这是因为Spark仅在少数地方做了查询优化,其中一个地方就是我们在上篇文章中讲到的FileSourceStrategy,我们看到[HUDI-3902]虽然回退到HadoopFsRelation,但是貌似并没有处理分区路径值的问题,所以后面又提了一个PR:
    PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns,这个PR添加了自定义FileFormat:Spark24HoodieParquetFileFormat,它重写了ParquetFileFormatbuildReaderWithPartitionValues方法。其中的关键点是添加了参数shouldAppendPartitionValues来判断是否将分区路径中的值添加为分区字段的值。

            if (shouldAppendPartitionValues) {
              vectorizedReader.initBatch(partitionSchema, file.partitionValues)
            } else {
              vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    BaseFileOnlyRelation

    那么接下来看一下BaseFileOnlyRelation后面的处理逻辑吧,为了方便调试,我们直接把resolveBaseFileOnlyRelation的返回值改为BaseFileOnlyRelation

    qe.executedPlan

    我们先看一下qe.executedPlan的返回值是啥吧,对比一下和HadoopFsRelation不同,这样我们只需要分析不同点就可以了

    CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,RowDataSourceScanExec())))
    
    • 1

    其中RowDataSourceScanExecRelationBaseFileOnlyRelation,rdd为Spark2HoodieFileScanRDD,这两个都是和Hudi相关,我们对比HadoopFsRelation发现由之前的FileSourceScanExec改成了RowDataSourceScanExec,除了这一点,其他的都一样,那么我们需要是分析一下是在哪里返回RowDataSourceScanExec的。

    DataSourceStrategy

    它会在DataSourceStrategy匹配到第二个,因为BaseFileOnlyRelationPrunedFilteredScan的子类,可以看一下它的定义

    class BaseFileOnlyRelation(sqlContext: SQLContext,
                               metaClient: HoodieTableMetaClient,
                               optParams: Map[String, String],
                               userSchema: Option[StructType],
                               globPaths: Seq[Path])
      extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
    
    abstract class HoodieBaseRelation(val sqlContext: SQLContext,
                                      val metaClient: HoodieTableMetaClient,
                                      val optParams: Map[String, String],
                                      schemaSpec: Option[StructType])
      extends BaseRelation
        with FileRelation
        with PrunedFilteredScan
        with Logging
        with SparkAdapterSupport {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with CastSupport {
      import DataSourceStrategy._
    
      def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
        case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
          pruneFilterProjectRaw(
            l,
            projects,
            filters,
            (requestedColumns, allPredicates, _) =>
              toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
    
        case PhysicalOperation(projects, filters,
                               l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
          pruneFilterProject(
            l,
            projects,
            filters,
            (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
    
        case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
          pruneFilterProject(
            l,
            projects,
            filters,
            (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
            ......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    pruneFilterProject

      private def pruneFilterProject(
          relation: LogicalRelation,
          projects: Seq[NamedExpression],
          filterPredicates: Seq[Expression],
          scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = {
        pruneFilterProjectRaw(
          relation,
          projects,
          filterPredicates,
          (requestedColumns, _, pushedFilters) => {
            scanBuilder(requestedColumns, pushedFilters.toArray)
          })
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    pruneFilterProjectRaw

      private def pruneFilterProjectRaw(
        relation: LogicalRelation,
        projects: Seq[NamedExpression],
        filterPredicates: Seq[Expression],
        scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan = {
    
        val projectSet = AttributeSet(projects.flatMap(_.references))
        val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
    
        val candidatePredicates = filterPredicates.map { _ transform {
          case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
        }}
    
        val (unhandledPredicates, pushedFilters, handledFilters) =
          selectFilters(relation.relation, candidatePredicates)
    
        // Combines all Catalyst filter `Expression`s that are either not convertible to data source
        // `Filter`s or cannot be handled by `relation`.
        val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
    
        if (projects.map(_.toAttribute) == projects &&
            projectSet.size == projects.size &&
            filterSet.subsetOf(projectSet)) {
          // When it is possible to just use column pruning to get the right projection and
          // when the columns of this projection are enough to evaluate all filter conditions,
          // just do a scan followed by a filter, with no extra project.
          val requestedColumns = projects
            // Safe due to if above.
            .asInstanceOf[Seq[Attribute]]
            // Match original case of attributes.
            .map(relation.attributeMap)
    
          val scan = RowDataSourceScanExec(
            relation.output,
            requestedColumns.map(relation.output.indexOf),
            pushedFilters.toSet,
            handledFilters,
            scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
            relation.relation,
            relation.catalogTable.map(_.identifier))
          filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
        } else {
          // A set of column attributes that are only referenced by pushed down filters.  We can
          // eliminate them from requested columns.
          val handledSet = {
            val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
            val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
            AttributeSet(handledPredicates.flatMap(_.references)) --
              (projectSet ++ unhandledSet).map(relation.attributeMap)
          }
          // Don't request columns that are only referenced by pushed filters.
          val requestedColumns =
            (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
    
          val scan = RowDataSourceScanExec(
            relation.output,
            requestedColumns.map(relation.output.indexOf),
            pushedFilters.toSet,
            handledFilters,
            scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
            relation.relation,
            relation.catalogTable.map(_.identifier))
          execution.ProjectExec(
            projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    在这方法里我们看到它会构造RowDataSourceScanExec并返回,其中它的relation参数为BaseFileOnlyRelation,而它的rdd是通过调用HoodieBaseRelation重写buildScan方法返回的RDD[Row],值为Spark2HoodieFileScanRDD,这正是查询Hudi表数据的核心逻辑

    buildScan

    override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
        // NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES
        //
        //       In case list of requested columns doesn't contain the Primary Key one, we
        //       have to add it explicitly so that
        //          - Merging could be performed correctly
        //          - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
        //            Spark still fetches all the rows to execute the query correctly
        //
        //       *Appending* additional columns to the ones requested by the caller is not a problem, as those
        //       will be "projected out" by the caller's projection;
        //
        // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
        //       PROJECTION
        val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns)
        // NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro
        //       schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and
        //       could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions
        //       w/ more than 2 types are involved)
        val sourceSchema = optimizerPrunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema)
        val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
          projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns)
    
        val filterExpressions = convertToExpressions(filters)
        val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
    
        val fileSplits = collectFileSplits(partitionFilters, dataFilters)
    
        val tableAvroSchemaStr = tableAvroSchema.toString
    
        val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
        val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))
    
        if (fileSplits.isEmpty) {
          sparkSession.sparkContext.emptyRDD
        } else {
          val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters)
    
          // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
          // Please check [[needConversion]] scala-doc for more details
          rdd.asInstanceOf[RDD[Row]]
        }
      }
      protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
                                        tableSchema: HoodieTableSchema,
                                        requiredSchema: HoodieTableSchema,
                                        requestedColumns: Array[String],
                                        filters: Array[Filter]): RDD[InternalRow] = {
        val (partitionSchema, dataSchema, requiredDataSchema) =
          tryPrunePartitionColumns(tableSchema, requiredSchema)
    
        val baseFileReader = createBaseFileReader(
          spark = sparkSession,
          partitionSchema = partitionSchema,
          dataSchema = dataSchema,
          requiredDataSchema = requiredDataSchema,
          filters = filters,
          options = optParams,
          // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
          //       to configure Parquet reader appropriately
          hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
        )
    
        // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller.
        //       This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the
        //       data file, but instead would be parsed from the partition path. In that case output of the file-reader will have
        //       different ordering of the fields than the original required schema (for more details please check out
        //       [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema
        //       back into the one expected by the caller
        val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema)
    
        // SPARK-37273 FileScanRDD constructor changed in SPARK 3.3
        sparkAdapter.createHoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits.map(_.filePartition), requiredSchema.structTypeSchema)
          .asInstanceOf[HoodieUnsafeRDD]
      }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    collectFromPlan

    我们在上篇文章中讲过这个方法最终会调用RowDataSourceScanExecinputRDDs

    RowDataSourceScanExec.inputRDDs

      override def inputRDDs(): Seq[RDD[InternalRow]] = {
        rdd :: Nil
      }
    
    • 1
    • 2
    • 3

    直接返回rddrdd是通过HoodieBaseRelation.buildScan生成的Spark2HoodieFileScanRDD

    总结

    本文总结了使用HadoopFsRelationBaseFileOnlyRelation的原因以及使用BaseFileOnlyRelation时查询Hudi的逻辑,知道了在使用BaseFileOnlyRelation是通过buildScan实现查询的。

    我在文章https://shzhangji.com/cnblogs/2018/12/09/spark-datasource-api-v2/学习Spark DataSource时,了解到Spark DataSource V1是通过调用buildScan方法来获取数据源的RDD,所以也想看一下BaseFileOnlyRelationbuildScan方法是否也会在查询时用到以及什么情况下会用到,现在还不确定它和Spark DataSource V1的关系是啥,后面在学习总结Spark DataSource源码时,再来对比一下。

  • 相关阅读:
    excel高级绘图技巧100讲(一)-用甘特图来展示项目进度情况
    【云原生 | Docker】腾讯云部署Django项目 (服务器选型、git配置、docker三分钟部署)
    匈牙利算法 求二分图最大匹配
    五种定时任务方案(Timer+ScheduleExecutorService+spring task+多线程执行+quartz)
    第三章:最新版零基础学习 PYTHON 教程(第七节 - Python 运算符—Python 成员身份和身份运算符)
    工业级PoE交换机的工作原理
    【HTML】HTML5网页作业----模仿京东,模仿站点
    LCR 159.库存管理 III
    jQuery网页开发案例:jQuery常用API--jQuery 尺寸、位置操作及 电梯导航案例和节流阀(互斥锁)
    git(实现代码存档和同步)
  • 原文地址:https://blog.csdn.net/dkl12/article/details/126346978