• Hudi Spark源码学习总结-spark.read.format(“hudi“).load


    前言

    由于工作原因,之前查询Hudi主要是用Hive来查询的,所以对Hive查询Hudi的逻辑比较了解,但是对于Spark查询Hudi的逻辑不太了解。所以现在想要学习一下Spark查询Hudi的大概逻辑,搞清楚它是如何从Spark的源码跳转到Hudi源码执行Hudi查询的逻辑, 这样既能搞清楚Spark查询表的逻辑,也能搞清楚Spark查询Hudi的逻辑,也便于再后面使用Kyuubi Spark SQL 时出现问题能更好的定位解决。

    版本

    Spark 2.4.4
    Hudi master 0.12.0-SNAPSHOT 最新代码
    (可以借助Spark3 planChangeLog 打印日志信息查看哪些规则生效)

    示例代码

    先用上篇文章写Hudi数据,再进行查询

          import spark.implicits._
          val df = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "ts")
          df.write.format("hudi")
            .option(HoodieWriteConfig.TBL_NAME.key, tableName)
            .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
            .option(RECORDKEY_FIELD.key, "id")
            .option(PRECOMBINE_FIELD.key, "ts")
            .option(PARTITIONPATH_FIELD.key, "")
            .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
            .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
            .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
            .mode(SaveMode.Overwrite)
            .save(tmp.getCanonicalPath)
    
          spark.read.format("hudi").load(tmp.getCanonicalPath).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    spark.read

    返回DataFrameReader,后面的formatload方法都是在这个类中

    def read: DataFrameReader = new DataFrameReader(self)
    
    • 1

    format

    source=“hudi”,后面load时会用到

      def format(source: String): DataFrameReader = {
        this.source = source
        this
      }
    
    • 1
    • 2
    • 3
    • 4

    load

    save方法首先添加path参数,然后判断source是否等于hive,我们这里source等于hudi,所以不满足,接下来通过DataSource.lookupDataSource查找hudi对应的dataSouce类,然后判断它是不是DataSourceV2的子类,再执行后面的逻辑,我们在上篇文章讲过了DataSource.lookupDataSource返回的是Spark2DefaultSource且不是DataSourceV2的子类,所以接下来执行loadV1Source方法

      def load(path: String): DataFrame = {
        // force invocation of `load(...varargs...)`
        option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
      }
    
      /**
       * Loads input in as a `DataFrame`, for data sources that support multiple paths.
       * 对于支持多路径的数据源,将输入加载为`DataFrame`
       * Only works if the source is a HadoopFsRelationProvider.
       * 仅当源是HadoopFsRelationProvider时有效。
       * @since 1.6.0
       */
      @scala.annotation.varargs
      def load(paths: String*): DataFrame = {
        // 首先判断source是否等于hive
        if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
          throw new AnalysisException("Hive data source can only be used with tables, you can not " +
            "read files of Hive data source directly.")
        }
    
        // 通过DataSource.lookupDataSource查找hudi对应的dataSouce类
        val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
        // DataSourceV2是否是cls的父类
        if (classOf[DataSourceV2].isAssignableFrom(cls)) {
          val ds = cls.newInstance().asInstanceOf[DataSourceV2]
          if (ds.isInstanceOf[ReadSupport]) {
            val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
              ds = ds, conf = sparkSession.sessionState.conf)
            val pathsOption = {
              val objectMapper = new ObjectMapper()
              DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
            }
            Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
              ds, sessionOptions ++ extraOptions.toMap + pathsOption,
              userSpecifiedSchema = userSpecifiedSchema))
          } else {
            loadV1Source(paths: _*)
          }
        } else {
          loadV1Source(paths: _*)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    loadV1Source

    loadV1Source方法的核心是后面的sparkSession.baseRelationToDataFrame,先看他的参数DataSource.resolveRelation

      private def loadV1Source(paths: String*) = {
        // Code path for data source v1.
        sparkSession.baseRelationToDataFrame(
          DataSource.apply(
            sparkSession,
            paths = paths,
            userSpecifiedSchema = userSpecifiedSchema,
            className = source,
            options = extraOptions.toMap).resolveRelation())
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    DataSource.resolveRelation

    同样地这里的providingClass.newInstance()DataSource.lookupDataSource返回的为Spark2DefaultSource,它的父类DefaultSource既实现了SchemaRelationProvider也实现了RelationProvider,但是这里的userSpecifiedSchema为None,所以会匹配到第二个,所以relation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)

      /**
       * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
       * [[DataSource]]
       *
       * @param checkFilesExist Whether to confirm that the files exist when generating the
       *                        non-streaming file based datasource. StructuredStreaming jobs already
       *                        list file existence, and when generating incremental jobs, the batch
       *                        is considered as a non-streaming file based data source. Since we know
       *                        that files already exist, we don't need to check them again.
       */
      def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
        val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
          // TODO: Throw when too much is given.
          case (dataSource: SchemaRelationProvider, Some(schema)) =>
            dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
          case (dataSource: RelationProvider, None) =>
            dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
          case (_: SchemaRelationProvider, None) =>
            throw new AnalysisException(s"A schema needs to be specified when using $className.")
          case (dataSource: RelationProvider, Some(schema)) =>
            val baseRelation =
              dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
            if (baseRelation.schema != schema) {
              throw new AnalysisException(s"$className does not allow user-specified schemas.")
            }
            baseRelation
    
          // We are reading from the results of a streaming query. Load files from the metadata log
          // instead of listing them using HDFS APIs.
          case (format: FileFormat, _)
              if FileStreamSink.hasMetadata(
                caseInsensitiveOptions.get("path").toSeq ++ paths,
                sparkSession.sessionState.newHadoopConf()) =>
            ......
    
        relation
      }
      
      // calssName=hudi
      lazy val providingClass: Class[_] =
        DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    dataSource.createRelation

    这里的createRelation是在Spark2DefaultSource的父类DefaultSource中实现的,首先根据传入的路径参数,获取表路径,然后创建HoodieTableMetaClient,获取表类型等表配置,再根据表的配置信息判断返回什么类型的Relation,这里返回resolveBaseFileOnlyRelation
    (可见createRelation是实现Hudi自己的查询逻辑的入口)

      override def createRelation(sqlContext: SQLContext,
                                  parameters: Map[String, String]): BaseRelation = {
        createRelation(sqlContext, parameters, null)
      }
    
      override def createRelation(sqlContext: SQLContext,
                                  optParams: Map[String, String],
                                  schema: StructType): BaseRelation = {
        val path = optParams.get("path")
        val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
    
        if (path.isEmpty && readPathsStr.isEmpty) {
          throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.")
        }
    
        val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
        val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
    
        val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
    
        val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
          HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
        } else {
          Seq.empty
        }
    
        // Add default options for unspecified read options keys.
        val parameters = (if (globPaths.nonEmpty) {
          Map(
            "glob.paths" -> globPaths.mkString(",")
          )
        } else {
          Map()
        }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
    
        // Get the table base path
        // 获取表路径
        val tablePath = if (globPaths.nonEmpty) {
          DataSourceUtils.getTablePath(fs, globPaths.toArray)
        } else {
          DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
        }
        log.info("Obtained hudi table path: " + tablePath)
    
        // 创建HoodieTableMetaClient
        val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
        // 是否为BootstrappedTable,本例为false
        val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
        // 表类型,本例为COPY_ON_WRITE
        val tableType = metaClient.getTableType
        // 查询类型,本例为snapshot
        val queryType = parameters(QUERY_TYPE.key)
        // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
        //       Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
        //       case  we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
        //       from the table itself
        val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
          None
        } else {
          Option(schema)
        }
    
        log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
    
        // 判断有没有完成的commit,即判断是不是空表
        if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { // 如果是空表
          // 返回EmptyRelation
          new EmptyRelation(sqlContext, metaClient)
        } else {
          (tableType, queryType, isBootstrappedTable) match {
            case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
                 (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
                 (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
              resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
    
            case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
              new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
    
            case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
              new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient)
    
            case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
              new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient)
    
            case (_, _, true) =>
              new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
    
            case (_, _, _) =>
              throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
                s"isBootstrappedTable: $isBootstrappedTable ")
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    resolveBaseFileOnlyRelation

      private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
                                              globPaths: Seq[Path],
                                              userSchema: Option[StructType],
                                              metaClient: HoodieTableMetaClient,
                                              optParams: Map[String, String]): BaseRelation = {
        val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
    
        // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
        //       [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
        //       vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
        //
        //       You can check out HUDI-3896 for more details
        if (baseRelation.hasSchemaOnRead) {
          baseRelation
        } else {
          baseRelation.toHadoopFsRelation
        }
      }
    
    class BaseFileOnlyRelation(sqlContext: SQLContext,
                               metaClient: HoodieTableMetaClient,
                               optParams: Map[String, String],
                               userSchema: Option[StructType],
                               globPaths: Seq[Path])
      extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
        ......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    hasSchemaOnRead是在BaseFileOnlyRelation的父类HoodieBaseRelation中定义的

      def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
    
      protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
        val schemaResolver = new TableSchemaResolver(metaClient)
        val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
          None
        } else {
          Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
            case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
            case Failure(e) =>
              logWarning("Failed to fetch internal-schema from the table", e)
              None
          }
        }
    
        val avroSchema = internalSchemaOpt.map { is =>
          AvroInternalSchemaConverter.convert(is, "schema")
        } orElse {
          schemaSpec.map(convertToAvroSchema)
        } getOrElse {
          Try(schemaResolver.getTableAvroSchema) match {
            case Success(schema) => schema
            case Failure(e) =>
              logError("Failed to fetch schema from the table", e)
              throw new HoodieSchemaException("Failed to fetch schema from the table")
          }
        }
    
        (avroSchema, internalSchemaOpt)
      }
    
      private def isSchemaEvolutionEnabled = {
        // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
        //       t/h Spark Session configuration (for ex, for Spark SQL)
        optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
          DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
          sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
            DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
      }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    因为SCHEMA_EVOLUTION_ENABLED的默认值为false,所以internalSchemaOpt返回None,hasSchemaOnRead返回false,所以在resolveBaseFileOnlyRelation返回调用baseRelation.toHadoopFsRelation

    val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
    
      public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
          .key("hoodie.schema.on.read.enable")
          .defaultValue(false)
          .withDocumentation("Enables support for Schema Evolution feature");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    baseRelation.toHadoopFsRelation

    这个方法返回HadoopFsRelation,其中locationHoodieFileIndex,是在其父类HoodieBaseRelation中定义的,fileFomartSpark24HoodieParquetFileFormat

      override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
        internalSchemaOpt.isEmpty
    
     /**
       * NOTE: We have to fallback to [[HadoopFsRelation]] to make sure that all of the Spark optimizations could be
       *       equally applied to Hudi tables, since some of those are predicated on the usage of [[HadoopFsRelation]],
       *       and won't be applicable in case of us using our own custom relations (one of such optimizations is [[SchemaPruning]]
       *       rule; you can find more details in HUDI-3896)
       */
      def toHadoopFsRelation: HadoopFsRelation = {
        // globPaths为空
        if (globPaths.isEmpty) {
          // NOTE: There are currently 2 ways partition values could be fetched:
          //          - Source columns (producing the values used for physical partitioning) will be read
          //          from the data file
          //          - Values parsed from the actual partition path would be appended to the final dataset
          //
          //        In the former case, we don't need to provide the partition-schema to the relation,
          //        therefore we simply stub it w/ empty schema and use full table-schema as the one being
          //        read from the data file.
          //
          //        In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
          //        being a table-schema with all partition columns stripped out
          // shouldExtractPartitionValuesFromPartitionPath 为true
          val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) {
            (fileIndex.partitionSchema, fileIndex.dataSchema)
          } else {
            (StructType(Nil), tableStructSchema)
          }
          // 这里的 location = fileIndex,fileIndex为HoodieFileIndex
          HadoopFsRelation(
            location = fileIndex,
            partitionSchema = partitionSchema,
            dataSchema = dataSchema,
            bucketSpec = None,
            fileFormat = fileFormat,
            optParams)(sparkSession)
        } else {
          val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
          val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
    
          // NOTE: Spark is able to infer partitioning values from partition path only when Hive-style partitioning
          //       scheme is used. Therefore, we fallback to reading the table as non-partitioned (specifying
          //       partitionColumns = Seq.empty) whenever Hive-style partitioning is not involved
          val partitionColumns: Seq[String] = if (tableConfig.getHiveStylePartitioningEnable.toBoolean) {
            this.partitionColumns
          } else {
            Seq.empty
          }
    
          DataSource.apply(
            sparkSession = sparkSession,
            paths = extraReadPaths,
            // Here we should specify the schema to the latest commit schema since
            // the table schema evolution.
            userSpecifiedSchema = userSchema.orElse(Some(tableStructSchema)),
            className = fileFormatClassName,
            options = optParams ++ Map(
              // Since we're reading the table as just collection of files we have to make sure
              // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
              // while keeping previous versions of the files around as well.
              //
              // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
              "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName,
    
              // We have to override [[EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH]] setting, since
              // the relation might have this setting overridden
              DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key -> shouldExtractPartitionValuesFromPartitionPath.toString,
    
              // NOTE: We have to specify table's base-path explicitly, since we're requesting Spark to read it as a
              //       list of globbed paths which complicates partitioning discovery for Spark.
              //       Please check [[PartitioningAwareFileIndex#basePaths]] comment for more details.
              PartitioningAwareFileIndex.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString
            ),
            partitionColumns = partitionColumns
          )
            .resolveRelation()
            .asInstanceOf[HadoopFsRelation]
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
      protected lazy val fileIndex: HoodieFileIndex =
        HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams,
          FileStatusCache.getOrCreate(sparkSession))
    
    • 1
    • 2
    • 3

    HoodieFileIndex继承了SparkHoodieTableFileIndex,实现了Spark SQL源码里的FileIndexlistFiles等方法,这在查询Hudi表文件时有用,也正是实现查询Hudi表逻辑的地方,接下来让我们往下看它的listFiles方法是如何被调用以及如何返回查询结果的

    case class HoodieFileIndex(spark: SparkSession,
                               metaClient: HoodieTableMetaClient,
                               schemaSpec: Option[StructType],
                               options: Map[String, String],
                               @transient fileStatusCache: FileStatusCache = NoopCache)
      extends SparkHoodieTableFileIndex(
        spark = spark,
        metaClient = metaClient,
        schemaSpec = schemaSpec,
        configProperties = getConfigProperties(spark, options),
        queryPaths = HoodieFileIndex.getQueryPaths(options),
        specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
        fileStatusCache = fileStatusCache
      )
        with FileIndex {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
      lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
        metaClient.getTableConfig.getBaseFileFormat match {
          case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
          case HoodieFileFormat.PARQUET =>
            // We're delegating to Spark to append partition values to every row only in cases
            // when these corresponding partition-values are not persisted w/in the data file itself
            val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
            (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
        }
    
      // Spark2Adapter
      override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
        Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    baseRelationToDataFrame

    通过上面的分析我们知道了,resolveRelation返回的是HadoopFsRelation(HoodieFileIndex,partitionSchema,dataSchema,None,Spark24HoodieParquetFileFormat,optParams)(sparkSession),接下来看一下baseRelationToDataFrame

      def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
        Dataset.ofRows(self, LogicalRelation(baseRelation))
      }
    
    object LogicalRelation {
      def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation =
        LogicalRelation(relation, relation.schema.toAttributes, None, isStreaming)
    
      def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
        LogicalRelation(relation, relation.schema.toAttributes, Some(table), false)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Dataset.ofRows在前面的几篇文章中已经分析过多次了,所以我们挑重点进行分析,其中参数logicalPlanLogicalRelation(HadoopFsRelation(HoodieFileIndex,_...),relation.schema.toAttributes, None, false)

      def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
        val qe = sparkSession.sessionState.executePlan(logicalPlan)
        qe.assertAnalyzed()
        new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Dataset.show

    我们知道对于查询上面的Dataset.ofRows只会通过qe.assertAnalyzed()触发analysis阶段,通过分析日志我们发现并没有生效的规则,所以我们重点看一下后面的show,它最后会调用到showString,而showString的核心逻辑在getRows

      def show(): Unit = show(20)
      def show(numRows: Int): Unit = show(numRows, truncate = true)
      def show(numRows: Int, truncate: Boolean): Unit = if (truncate) {
        println(showString(numRows, truncate = 20))
      } else {
        println(showString(numRows, truncate = 0))
      }
    
      private[sql] def showString(
          _numRows: Int,
          truncate: Int = 20,
          vertical: Boolean = false): String = {
        val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
        // Get rows represented by Seq[Seq[String]], we may get one more line if it has more data.
        val tmpRows = getRows(numRows, truncate)
    
        val hasMoreData = tmpRows.length - 1 > numRows
        val rows = tmpRows.take(numRows + 1)
    
        val sb = new StringBuilder
        val numCols = schema.fieldNames.length
        // We set a minimum column width at '3'
        val minimumColWidth = 3
    
        if (!vertical) {
          // Initialise the width of each column to a minimum value
          val colWidths = Array.fill(numCols)(minimumColWidth)
    
          // Compute the width of each column
          for (row <- rows) {
            for ((cell, i) <- row.zipWithIndex) {
              colWidths(i) = math.max(colWidths(i), Utils.stringHalfWidth(cell))
            }
          }
    
          val paddedRows = rows.map { row =>
            row.zipWithIndex.map { case (cell, i) =>
              if (truncate > 0) {
                StringUtils.leftPad(cell, colWidths(i) - Utils.stringHalfWidth(cell) + cell.length)
              } else {
                StringUtils.rightPad(cell, colWidths(i) - Utils.stringHalfWidth(cell) + cell.length)
              }
            }
          }
    
          // Create SeparateLine
          val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()
    
          // column names
          paddedRows.head.addString(sb, "|", "|", "|\n")
          sb.append(sep)
    
          // data
          paddedRows.tail.foreach(_.addString(sb, "|", "|", "|\n"))
          sb.append(sep)
        } else {
          // Extended display mode enabled
          val fieldNames = rows.head
          val dataRows = rows.tail
    
          // Compute the width of field name and data columns
          val fieldNameColWidth = fieldNames.foldLeft(minimumColWidth) { case (curMax, fieldName) =>
            math.max(curMax, Utils.stringHalfWidth(fieldName))
          }
          val dataColWidth = dataRows.foldLeft(minimumColWidth) { case (curMax, row) =>
            math.max(curMax, row.map(cell => Utils.stringHalfWidth(cell)).max)
          }
    
          dataRows.zipWithIndex.foreach { case (row, i) =>
            // "+ 5" in size means a character length except for padded names and data
            val rowHeader = StringUtils.rightPad(
              s"-RECORD $i", fieldNameColWidth + dataColWidth + 5, "-")
            sb.append(rowHeader).append("\n")
            row.zipWithIndex.map { case (cell, j) =>
              val fieldName = StringUtils.rightPad(fieldNames(j),
                fieldNameColWidth - Utils.stringHalfWidth(fieldNames(j)) + fieldNames(j).length)
              val data = StringUtils.rightPad(cell,
                dataColWidth - Utils.stringHalfWidth(cell) + cell.length)
              s" $fieldName | $data "
            }.addString(sb, "", "\n", "\n")
          }
        }
    
        // Print a footer
        if (vertical && rows.tail.isEmpty) {
          // In a vertical mode, print an empty row set explicitly
          sb.append("(0 rows)\n")
        } else if (hasMoreData) {
          // For Data that has more than "numRows" records
          val rowsString = if (numRows == 1) "row" else "rows"
          sb.append(s"only showing top $numRows $rowsString\n")
        }
    
        sb.toString()
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    getRows

    getRows的核心是在newDf.select(castCols: _*).take(numRows + 1),它会触发planning

      private[sql] def getRows(
          numRows: Int,
          truncate: Int): Seq[Seq[String]] = {
        val newDf = toDF()
        val castCols = newDf.logicalPlan.output.map { col =>
          // Since binary types in top-level schema fields have a specific format to print,
          // so we do not cast them to strings here.
          if (col.dataType == BinaryType) {
            Column(col)
          } else {
            Column(col).cast(StringType)
          }
        }
        val data = newDf.select(castCols: _*).take(numRows + 1)
    
        // For array values, replace Seq and Array with square brackets
        // For cells that are beyond `truncate` characters, replace it with the
        // first `truncate-3` and "..."
        schema.fieldNames.toSeq +: data.map { row =>
          row.toSeq.map { cell =>
            val str = cell match {
              case null => "null"
              case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
              case _ => cell.toString
            }
            if (truncate > 0 && str.length > truncate) {
              // do not show ellipses for strings shorter than 4 characters.
              if (truncate < 4) str.substring(0, truncate)
              else str.substring(0, truncate - 3) + "..."
            } else {
              str
            }
          }: Seq[String]
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    df.select

    先看它的select,其中select中的logicalPlanLogicalRelation(HadoopFsRelation(HoodieFileIndex,_...),所以withPlanlogicalPlanProject(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))

      def select(cols: Column*): DataFrame = withPlan {
        Project(cols.map(_.named), logicalPlan)
      }
    
      @inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
        Dataset.ofRows(sparkSession, logicalPlan)
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    take

    接下来到了take

      def take(n: Int): Array[T] = head(n)
      def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
    
    • 1
    • 2

    limit

    先看参数limit

      def limit(n: Int): Dataset[T] = withTypedPlan {
        Limit(Literal(n), logicalPlan)
      }
    
    
    • 1
    • 2
    • 3
    • 4

    其中Literal(n)apply方法返回Literal(21, IntegerType)

    object Limit {
      def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
        GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
      }
    
      def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
        p match {
          case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
          case _ => None
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Limit的apply返回GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))),所以limit返回Dataset(sparkSession, GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,…)))))

      @inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
        Dataset(sparkSession, logicalPlan)
      }
    
    • 1
    • 2
    • 3

    withAction

    withAction在前面的几篇文章也讲过了,其中executedPlan都会触发一遍完整的Spark SQL的parsinganalysisoptimizationplanning,并且最终会在withNewExecutionId调用方法action,具体为上面的collectFromPlan,让我们先看看有哪些重要的规则或者策略生效了

      private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
        try {
          qe.executedPlan.foreach { plan =>
            plan.resetMetrics()
          }
          val start = System.nanoTime()
          val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
            action(qe.executedPlan)
          }
          val end = System.nanoTime()
          sparkSession.listenerManager.onSuccess(name, qe, end - start)
          result
        } catch {
          case e: Exception =>
            sparkSession.listenerManager.onFailure(name, qe, e)
            throw e
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    plan 入口

    这里提一下plan入口,之前没有注意到,这里是把optimizedPlanReturnAnswer包装了一下(因为后面分析会用到)

      lazy val sparkPlan: SparkPlan = {
        SparkSession.setActiveSession(sparkSession)
        // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
        //       but we will implement to choose the best plan.
        planner.plan(ReturnAnswer(optimizedPlan)).next()
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    根据ReturnAnswer注释可知,对于takecollect等操作,仅会在逻辑查询计划的顶部进行规则转化

    /** 
     * 在计划take()或collect()操作时,在调用查询计划器之前,此特殊节点插入逻辑计划的顶部。
     * When planning take() or collect() operations, this special node that is inserted at the top of
     * the logical plan before invoking the query planner.
     * 规则可以在此节点上进行模式匹配,以便应用仅在逻辑查询计划顶部生效的转换。
     * Rules can pattern-match on this node in order to apply transformations that only take effect
     * at the top of the logical query plan.
     */
    case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
      override def output: Seq[Attribute] = child.output
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    strategies

    先看一下有哪些策略

    0 = {SparkStrategies$PythonEvals$@13376} 
    1 = {DataSourceV2Strategy$@13377} 
    2 = {FileSourceStrategy$@13378} 
    3 = {DataSourceStrategy@13388} 
    4 = {SparkStrategies$SpecialLimits$@13380} 
    5 = {SparkStrategies$Aggregation$@13381} 
    6 = {SparkStrategies$Window$@13382} 
    7 = {SparkStrategies$JoinSelection$@13383} 
    8 = {SparkStrategies$InMemoryScans$@13384} 
    9 = {SparkStrategies$BasicOperators$@13385} 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    planner.plan

    再看一下plan方法,之前对它的理解仅限于他会遍历策略调用aplly,现在对它的理解稍微多了一点,它首先遍历一遍策略返回candidates,再遍历candidates,调用collectPlaceholders拆开被PlanLater包装的子计划返回placeholders,如果placeholders非空,则再递归调用plan方法应用其子节点

      def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
        // Obviously a lot to do here still...
    
        // Collect physical plan candidates.
        val candidates = strategies.iterator.flatMap(_(plan))
    
        // The candidates may contain placeholders marked as [[planLater]],
        // so try to replace them by their child plans.
        val plans = candidates.flatMap { candidate =>
          val placeholders = collectPlaceholders(candidate)
    
          if (placeholders.isEmpty) {
            // Take the candidate as is because it does not contain placeholders.
            Iterator(candidate)
          } else {
            // Plan the logical plan marked as [[planLater]] and replace the placeholders.
            placeholders.iterator.foldLeft(Iterator(candidate)) {
              case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
                // Plan the logical plan for the placeholder.
                val childPlans = this.plan(logicalPlan)
    
                candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
                  childPlans.map { childPlan =>
                    // Replace the placeholder by the child plan
                    candidateWithPlaceholders.transformUp {
                      case p if p.eq(placeholder) => childPlan
                    }
                  }
                }
            }
          }
        }
    
        val pruned = prunePlans(plans)
        assert(pruned.hasNext, s"No plan for $plan")
        pruned
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    SpecialLimits

    第一遍遍历因为有ReturnAnswer包装所以在SpecialLimits匹配成功

      /**
       * Plans special cases of limit operators.
       */
      object SpecialLimits extends Strategy {
        override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
          case ReturnAnswer(rootPlan) => rootPlan match {
            case Limit(IntegerLiteral(limit), Sort(order, true, child))
                if limit < conf.topKSortFallbackThreshold =>
              TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
            case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
                if limit < conf.topKSortFallbackThreshold =>
              TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
            case Limit(IntegerLiteral(limit), child) =>
              CollectLimitExec(limit, planLater(child)) :: Nil
            case other => planLater(other) :: Nil
          }
          case Limit(IntegerLiteral(limit), Sort(order, true, child))
              if limit < conf.topKSortFallbackThreshold =>
            TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
          case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
              if limit < conf.topKSortFallbackThreshold =>
            TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
          case _ => Nil
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    匹配Limit时,会调用它的unapply,它会匹配到第三个,返回CollectLimitExec(21,PlanLater(Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))

      def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
        p match {
          case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
          case _ => None
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    其中PlanLater只是把plan包装了一下,在planner.plan方法中会通过collectPlaceholders拆开

      override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
        plan.collect {
          case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    FileSourceStrategy

    拆开PlanLater,为Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)),在FileSourceStrategy匹配时调用PhysicalOperationunapply方法,所以这里会在第一个匹配成功,返回ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))),因为这里为子节点,所以加上父节点,最后返回的是CollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))

    object PhysicalOperation extends PredicateHelper {
      type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan)
    
      def unapply(plan: LogicalPlan): Option[ReturnType] = {
        val (fields, filters, child, _) = collectProjectsAndFilters(plan)
        Some((fields.getOrElse(child.output), filters, child))
      }
    
      private def collectProjectsAndFilters(plan: LogicalPlan):
          (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) =
        plan match {
          case Project(fields, child) if fields.forall(_.deterministic) =>
            val (_, filters, other, aliases) = collectProjectsAndFilters(child)
            val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
            (Some(substitutedFields), filters, other, collectAliases(substitutedFields))
    
          case Filter(condition, child) if condition.deterministic =>
            val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
            val substitutedCondition = substitute(aliases)(condition)
            (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
    
          case other =>
            (None, Nil, other, Map.empty)
        }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
      def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
        case PhysicalOperation(projects, filters,
          l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
          // Filters on this relation fall into four categories based on where we can use them to avoid
          // reading unneeded data:
          //  - partition keys only - used to prune directories to read
          //  - bucket keys only - optionally used to prune files to read
          //  - keys stored in the data only - optionally used to skip groups of data in files
          //  - filters that need to be evaluated again after the scan
          val filterSet = ExpressionSet(filters)
    
          // The attribute name of predicate could be different than the one in schema in case of
          // case insensitive, we should change them to match the one in schema, so we do not need to
          // worry about case sensitivity anymore.
          val normalizedFilters = filters.map { e =>
            e transform {
              case a: AttributeReference =>
                a.withName(l.output.find(_.semanticEquals(a)).get.name)
            }
          }
    
          val partitionColumns =
            l.resolve(
              fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
          val partitionSet = AttributeSet(partitionColumns)
          val partitionKeyFilters =
            ExpressionSet(normalizedFilters
              .filterNot(SubqueryExpression.hasSubquery(_))
              .filter(_.references.subsetOf(partitionSet)))
    
          logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
    
          val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
          val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
            genBucketSet(normalizedFilters, bucketSpec.get)
          } else {
            None
          }
    
          val dataColumns =
            l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
    
          // Partition keys are not available in the statistics of the files.
          val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
    
          // Predicates with both partition keys and attributes need to be evaluated after the scan.
          val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
          logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
    
          val filterAttributes = AttributeSet(afterScanFilters)
          val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
          val requiredAttributes = AttributeSet(requiredExpressions)
    
          val readDataColumns =
            dataColumns
              .filter(requiredAttributes.contains)
              .filterNot(partitionColumns.contains)
          val outputSchema = readDataColumns.toStructType
          logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
    
          val outputAttributes = readDataColumns ++ partitionColumns
    
          val scan =
            FileSourceScanExec(
              fsRelation,
              outputAttributes,
              outputSchema,
              partitionKeyFilters.toSeq,
              bucketSet,
              dataFilters,
              table.map(_.identifier))
    
          val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
          val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
          val withProjections = if (projects == withFilter.output) {
            withFilter
          } else {
            execution.ProjectExec(projects, withFilter)
          }
    
          withProjections :: Nil
    
        case _ => Nil
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    prepareForExecution

    上面返回的sparkPlanCollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))

      lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
    
      /** A sequence of rules that will be applied in order to the physical plan before execution. */
      protected def preparations: Seq[Rule[SparkPlan]] = Seq(
        PlanSubqueries(sparkSession),
        EnsureRequirements(sparkSession.sessionState.conf),
        CollapseCodegenStages(sparkSession.sessionState.conf),
        ReuseExchange(sparkSession.sessionState.conf),
        ReuseSubquery(sparkSession.sessionState.conf))
    
      protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
        preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    CollapseCodegenStages

      def apply(plan: SparkPlan): SparkPlan = {
        if (conf.wholeStageEnabled) { // 默认true
          WholeStageCodegenId.resetPerQuery()
          insertWholeStageCodegen(plan)
        } else {
          plan
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    首先CollectLimitExec匹配到other,所以递归调用childrenProjectExec实现了CodegenSupport,返回WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))),因为ProjectExec是子节点,所以最终返回CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))))

      private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match {
        // For operators that will output domain object, do not insert WholeStageCodegen for it as
        // domain object can not be written into unsafe row.
        case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
          plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
        case plan: CodegenSupport if supportCodegen(plan) =>
          WholeStageCodegenExec(insertInputAdapter(plan))(WholeStageCodegenId.getNextStageId())
        case other =>
          other.withNewChildren(other.children.map(insertWholeStageCodegen))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
      extends UnaryExecNode with CodegenSupport {
    
    • 1
    • 2

    collectFromPlan

      private def collectFromPlan(plan: SparkPlan): Array[T] = {
        // This projection writes output to a `InternalRow`, which means applying this projection is not
        // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
        val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
        plan.executeCollect().map { row =>
          // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
          // parameter of its `get` method, so it's safe to use null here.
          objProj(row).get(0, null).asInstanceOf[T]
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    executeCollect

    这里的plan为CollectLimitExec,调用child.executeTake,这里child为WholeStageCodegenExec,它没有重写executeTake,所以调用父类SparkPlanexecuteTake

    case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
      override def output: Seq[Attribute] = child.output
      override def outputPartitioning: Partitioning = SinglePartition
      override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
      private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
      protected override def doExecute(): RDD[InternalRow] = {
        val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
        val shuffled = new ShuffledRowRDD(
          ShuffleExchangeExec.prepareShuffleDependency(
            locallyLimited, child.output, SinglePartition, serializer))
        shuffled.mapPartitionsInternal(_.take(limit))
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    executeTake

    核心getByteArrayRdd

      /**
       * Runs this query returning the first `n` rows as an array.
       *
       * This is modeled after `RDD.take` but never runs any job locally on the driver.
       */
      def executeTake(n: Int): Array[InternalRow] = {
        if (n == 0) {
          return new Array[InternalRow](0)
        }
    
        val childRDD = getByteArrayRdd(n).map(_._2)
    
        val buf = new ArrayBuffer[InternalRow]
        val totalParts = childRDD.partitions.length
        var partsScanned = 0
        while (buf.size < n && partsScanned < totalParts) {
          // The number of partitions to try in this iteration. It is ok for this number to be
          // greater than totalParts because we actually cap it at totalParts in runJob.
          var numPartsToTry = 1L
          if (partsScanned > 0) {
            // If we didn't find any rows after the previous iteration, quadruple and retry.
            // Otherwise, interpolate the number of partitions we need to try, but overestimate
            // it by 50%. We also cap the estimation in the end.
            val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
            if (buf.isEmpty) {
              numPartsToTry = partsScanned * limitScaleUpFactor
            } else {
              val left = n - buf.size
              // As left > 0, numPartsToTry is always >= 1
              numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
              numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
            }
          }
    
          val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
          val sc = sqlContext.sparkContext
          val res = sc.runJob(childRDD,
            (it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p)
    
          buf ++= res.flatMap(decodeUnsafeRows)
    
          partsScanned += p.size
        }
    
        if (buf.size > n) {
          buf.take(n).toArray
        } else {
          buf.toArray
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    getByteArrayRdd

      private def getByteArrayRdd(n: Int = -1): RDD[(Long, Array[Byte])] = {
        execute().mapPartitionsInternal { iter =>
          var count = 0
          val buffer = new Array[Byte](4 << 10)  // 4K
          val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
          val bos = new ByteArrayOutputStream()
          val out = new DataOutputStream(codec.compressedOutputStream(bos))
          // `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is
          // not hit.
          while ((n < 0 || count < n) && iter.hasNext) {
            val row = iter.next().asInstanceOf[UnsafeRow]
            out.writeInt(row.getSizeInBytes)
            row.writeToStream(out, buffer)
            count += 1
          }
          out.writeInt(-1)
          out.flush()
          out.close()
          Iterator((count, bos.toByteArray))
        }
      }
    
        final def execute(): RDD[InternalRow] = executeQuery {
        if (isCanonicalizedPlan) {
          throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
        }
        doExecute()
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    doExecute

    WholeStageCodegenExec.doExecute,核心:child.asInstanceOf[CodegenSupport].inputRDDs(),这里的child为ProjectExec

      override def doExecute(): RDD[InternalRow] = {
        val (ctx, cleanedSource) = doCodeGen()
        // try to compile and fallback if it failed
        val (_, maxCodeSize) = try {
          CodeGenerator.compile(cleanedSource)
        } catch {
          case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
            // We should already saw the error message
            logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString")
            return child.execute()
        }
    
        // Check if compiled code has a too large function
        if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
          logInfo(s"Found too long generated codes and JIT optimization might not work: " +
            s"the bytecode size ($maxCodeSize) is above the limit " +
            s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
            s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
            s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
          child match {
            // The fallback solution of batch file source scan still uses WholeStageCodegenExec
            case f: FileSourceScanExec if f.supportsBatch => // do nothing
            case _ => return child.execute()
          }
        }
    
        val references = ctx.references.toArray
    
        val durationMs = longMetric("pipelineTime")
    
        val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
        assert(rdds.size <= 2, "Up to two input RDDs can be supported")
        if (rdds.length == 1) {
          rdds.head.mapPartitionsWithIndex { (index, iter) =>
            val (clazz, _) = CodeGenerator.compile(cleanedSource)
            val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
            buffer.init(index, Array(iter))
            new Iterator[InternalRow] {
              override def hasNext: Boolean = {
                val v = buffer.hasNext
                if (!v) durationMs += buffer.durationMs()
                v
              }
              override def next: InternalRow = buffer.next()
            }
          }
        } else {
          // Right now, we support up to two input RDDs.
          rdds.head.zipPartitions(rdds(1)) { (leftIter, rightIter) =>
            Iterator((leftIter, rightIter))
            // a small hack to obtain the correct partition index
          }.mapPartitionsWithIndex { (index, zippedIter) =>
            val (leftIter, rightIter) = zippedIter.next()
            val (clazz, _) = CodeGenerator.compile(cleanedSource)
            val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
            buffer.init(index, Array(leftIter, rightIter))
            new Iterator[InternalRow] {
              override def hasNext: Boolean = {
                val v = buffer.hasNext
                if (!v) durationMs += buffer.durationMs()
                v
              }
              override def next: InternalRow = buffer.next()
            }
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    inputRDDs

    ProjectExec.inputRDDs, childFileSourceScanExec

      override def inputRDDs(): Seq[RDD[InternalRow]] = {
        child.asInstanceOf[CodegenSupport].inputRDDs()
      }
    
    • 1
    • 2
    • 3

    FileSourceScanExec.inputRDD

    • relation: HadoopFsRelation
    • relation.fileFormat: Spark24HoodieParquetFileFormat
    • relation.bucketSpec: None
      private lazy val inputRDD: RDD[InternalRow] = {
        // Update metrics for taking effect in both code generation node and normal node.
        updateDriverMetrics()
        val readFile: (PartitionedFile) => Iterator[InternalRow] =
          relation.fileFormat.buildReaderWithPartitionValues(
            sparkSession = relation.sparkSession,
            dataSchema = relation.dataSchema,
            partitionSchema = relation.partitionSchema,
            requiredSchema = requiredSchema,
            filters = pushedDownFilters,
            options = relation.options,
            hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    
        relation.bucketSpec match {
          case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
            createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
          case _ =>
            createNonBucketedReadRDD(readFile, selectedPartitions, relation)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    createNonBucketedReadRDD

      /**
       * Create an RDD for non-bucketed reads.
       * The bucketed variant of this function is [[createBucketedReadRDD]].
       *
       * @param readFile a function to read each (part of a) file.
       * @param selectedPartitions Hive-style partition that are part of the read.
       * @param fsRelation [[HadoopFsRelation]] associated with the read.
       */
      private def createNonBucketedReadRDD(
          readFile: (PartitionedFile) => Iterator[InternalRow],
          selectedPartitions: Array[PartitionDirectory],
          fsRelation: HadoopFsRelation): RDD[InternalRow] = {
        val defaultMaxSplitBytes =
          fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
        val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
        val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
        val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
        val bytesPerCore = totalBytes / defaultParallelism
    
        val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
        logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
          s"open cost is considered as scanning $openCostInBytes bytes.")
    
        val splitFiles = selectedPartitions.flatMap { partition =>
          partition.files.flatMap { file =>
            val blockLocations = getBlockLocations(file)
            if (fsRelation.fileFormat.isSplitable(
                fsRelation.sparkSession, fsRelation.options, file.getPath)) {
              (0L until file.getLen by maxSplitBytes).map { offset =>
                val remaining = file.getLen - offset
                val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
                val hosts = getBlockHosts(blockLocations, offset, size)
                PartitionedFile(
                  partition.values, file.getPath.toUri.toString, offset, size, hosts)
              }
            } else {
              val hosts = getBlockHosts(blockLocations, 0, file.getLen)
              Seq(PartitionedFile(
                partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
            }
          }
        }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
    
        val partitions = new ArrayBuffer[FilePartition]
        val currentFiles = new ArrayBuffer[PartitionedFile]
        var currentSize = 0L
    
        /** Close the current partition and move to the next. */
        def closePartition(): Unit = {
          if (currentFiles.nonEmpty) {
            val newPartition =
              FilePartition(
                partitions.size,
                currentFiles.toArray) // Copy to a new Array.
            partitions += newPartition
          }
          currentFiles.clear()
          currentSize = 0
        }
    
        // Assign files to partitions using "Next Fit Decreasing"
        splitFiles.foreach { file =>
          if (currentSize + file.length > maxSplitBytes) {
            closePartition()
          }
          // Add the given file to the current partition.
          currentSize += file.length + openCostInBytes
          currentFiles += file
        }
        closePartition()
    
        new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    selectedPartitions

    • relation.location: HoodieFileIndex
      @transient private lazy val selectedPartitions: Array[PartitionDirectory] = {
        val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
        val startTime = System.nanoTime()
        val ret = relation.location.listFiles(partitionFilters, dataFilters)
        val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
        metadataTime = timeTakenMs
        ret
      }.toArray
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    HoodieFileIndex.listFiles

    涉及方法:lookupCandidateFilesInMetadataTablerefreshdoRefreshloadPartitionPathFiles等,其中在初始化new时会调用refreshdoRefresh

      /**
       * Invoked by Spark to fetch list of latest base files per partition.
       *
       * @param partitionFilters partition column filters
       * @param dataFilters      data columns filters
       * @return list of PartitionDirectory containing partition to base files mapping
       */
      override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
        // Look up candidate files names in the col-stats index, if all of the following conditions are true
        //    - Data-skipping is enabled
        //    - Col-Stats Index is present
        //    - List of predicates (filters) is present
        val candidateFilesNamesOpt: Option[Set[String]] =
          lookupCandidateFilesInMetadataTable(dataFilters) match {
            case Success(opt) => opt
            case Failure(e) =>
              logError("Failed to lookup candidate files in File Index", e)
    
              spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match {
                case DataSkippingFailureMode.Fallback.value => Option.empty
                case DataSkippingFailureMode.Strict.value   => throw new HoodieException(e);
              }
          }
    
        logDebug(s"Overlapping candidate files from Column Stats Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
    
        if (queryAsNonePartitionedTable) {
          // Read as Non-Partitioned table
          // Filter in candidate files based on the col-stats index lookup
          val candidateFiles = allFiles.filter(fileStatus =>
            // NOTE: This predicate is true when {@code Option} is empty
            candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
          )
    
          logInfo(s"Total files : ${allFiles.size}; " +
            s"candidate files after data skipping: ${candidateFiles.size}; " +
            s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
    
          Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
        } else {
          // Prune the partition path by the partition filters
          val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
          var totalFileSize = 0
          var candidateFileSize = 0
    
          val result = prunedPartitions.map { partition =>
            val baseFileStatuses: Seq[FileStatus] =
              cachedAllInputFileSlices.get(partition).asScala
                .map(fs => fs.getBaseFile.orElse(null))
                .filter(_ != null)
                .map(_.getFileStatus)
    
            // Filter in candidate files based on the col-stats index lookup
            val candidateFiles = baseFileStatuses.filter(fs =>
              // NOTE: This predicate is true when {@code Option} is empty
              candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
    
            totalFileSize += baseFileStatuses.size
            candidateFileSize += candidateFiles.size
            PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles)
          }
    
          logInfo(s"Total base files: $totalFileSize; " +
            s"candidate files after data skipping : $candidateFileSize; " +
            s"skipping percent ${if (allFiles.nonEmpty && totalFileSize > 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
    
          result
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    doRefresh初始化的cachedAllInputFileSlices实现了获取Hudi文件列表的逻辑

          cachedAllInputFileSlices = partitionFiles.keySet().stream()
             .collect(Collectors.toMap(
                 Function.identity(),
                 partitionPath ->
                     queryInstant.map(instant ->
                         fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
                     )
                       .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
                       .collect(Collectors.toList())
                 )
             );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Spark24HoodieParquetFileFormat

    PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns 大家可以自己研究一下

    总结

    这样基本上理清了spark.read.format(“hudi”).load 从Spark源码到Hudi源码的逻辑,限于能力和精力有些地方还没有完全搞清楚,不过只要搞懂整体流程,细节上在有需要时再慢慢研究。

  • 相关阅读:
    基于YOLOv8模型的水果目标检测系统(PyTorch+Pyside6+YOLOv8模型)
    php练习06
    SwiftUI 为不同视图限制不同的屏幕旋转方向
    刷题记录(M. MaratonIME returns home,牛可乐和魔法封印,NC24866 [USACO 2009 Dec S]Music Notes)
    flutter 移动应用程序中打开URL
    一起学数据结构(11)——快速排序及其优化
    独立站推广引流的8个渠道
    【STL】常见拷贝算法、替换算法、算数生成算法、集合算法
    element-plus 表格-定位到指定行
    Python机器学习、深度学习提升气象、海洋、水文领域实践应用
  • 原文地址:https://blog.csdn.net/dkl12/article/details/126285869