• Hudi Spark源码学习总结-df.write.format(“hudi“).save


    前言

    在开始学习Hudi的时候,我们知道通过df.write.format("hudi").save可以实现写Hudi,并且写Hudi的逻辑是在HoodieSparkSqlWriter.write实现的,但是始终有一个疑问:它怎么从df.write.format("hudi").save跳到HoodieSparkSqlWriter.write中的呢?本文就是主要来回答这个问题的。

    版本

    Spark 2.4.4
    Hudi 0.12.0-SNAPSHOT,和上篇文章Hudi Spark SQL源码学习总结-CTAS用的Hudi代码一样

    示例代码

    还是拿源码里的TestCreateTable中的测试语句

          import spark.implicits._
          val df = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "ts")
          df.write.format("hudi")
            .option(HoodieWriteConfig.TBL_NAME.key, tableName)
            .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
            .option(RECORDKEY_FIELD.key, "id")
            .option(PRECOMBINE_FIELD.key, "ts")
            .option(PARTITIONPATH_FIELD.key, "")
            .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
            .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
            .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
            .mode(SaveMode.Overwrite)
            .save(tmp.getCanonicalPath)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    df.write

    返回DataFrameWriter,后面的formatoptionsave等方法都是在这个类中

      def write: DataFrameWriter[T] = {
        if (isStreaming) {
          logicalPlan.failAnalysis(
            "'write' can not be called on streaming Dataset/DataFrame")
        }
        new DataFrameWriter[T](this)
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    format

    source=“hudi”,后面save时会用到

      def format(source: String): DataFrameWriter[T] = {
        this.source = source
        this
      }
    
    • 1
    • 2
    • 3
    • 4

    save

    save方法首先添加path参数,然后判断source是否等于hive,我们这里source等于hudi,所以不满足,接下来通过DataSource.lookupDataSource查找hudi对应的dataSouce类,然后判断它是不是DataSourceV2的子类,再执行后面的逻辑,所以我们需要先看一下DataSource.lookupDataSource

      def save(path: String): Unit = {
        // 添加ptah参数
        this.extraOptions += ("path" -> path)
        save()
      }
    
    
      def save(): Unit = {
        // 首先判断source是否等于hive
        if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
          throw new AnalysisException("Hive data source can only be used with tables, you can not " +
            "write files of Hive data source directly.")
        }
    
        assertNotBucketed("save")
    
        // 通过DataSource.lookupDataSource查找hudi对应的dataSouce类
        val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
        // DataSourceV2是否是cls的父类
        if (classOf[DataSourceV2].isAssignableFrom(cls)) {
          val source = cls.newInstance().asInstanceOf[DataSourceV2]
          source match {
            case ws: WriteSupport =>
              val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
                source,
                df.sparkSession.sessionState.conf)
              val options = sessionOptions ++ extraOptions
    
              val writer = ws.createWriter(
                UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
                new DataSourceOptions(options.asJava))
    
              if (writer.isPresent) {
                runCommand(df.sparkSession, "save") {
                  WriteToDataSourceV2(writer.get, df.logicalPlan)
                }
              }
    
            // Streaming also uses the data source V2 API. So it may be that the data source implements
            // v2, but has no v2 implementation for batch writes. In that case, we fall back to saving
            // as though it's a V1 source.
            case _ => saveToV1Source()
          }
        } else {
          saveToV1Source()
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    DataSource.lookupDataSource

    其实我们在上篇文章中讲isV2Provider时涉及到Spark3.2.1版本的lookupDataSource方法了,spark2.4.4的也差不多,我们再来看一下:其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource org.apache.hudi.Spark2DefaultSource org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat 等,然后过滤shortName=hudi的,只有Spark2DefaultSource满足,所以直接返回Spark2DefaultSource。这里和Spark3.2.1不同的是: Spark2对应Spark2DefaultSource Spark3对应Spark3DefaultSource

      def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
        // hudi
        val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
          case name if name.equalsIgnoreCase("orc") &&
              conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
            classOf[OrcFileFormat].getCanonicalName
          case name if name.equalsIgnoreCase("orc") &&
              conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
            "org.apache.spark.sql.hive.orc.OrcFileFormat"
          case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
            "org.apache.spark.sql.avro.AvroFileFormat"
          case name => name
        }
        // hudi.DefaultSource
        val provider2 = s"$provider1.DefaultSource"
        val loader = Utils.getContextOrSparkClassLoader
        / 这里是去加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,然后返回里面的内容
        // 其中有`org.apache.hudi.DefaultSource` `org.apache.hudi.Spark2DefaultSource` `org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat` 等
        val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
    
        try {
          // 过滤shortName=hudi的,只有Spark2DefaultSource满足,所以直接返回Spark2DefaultSource
          serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
            // the provider format did not match any given registered aliases
            case Nil =>
              try {
                Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
                  case Success(dataSource) =>
                    // Found the data source using fully qualified path
                    dataSource
                  case Failure(error) =>
                    if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
                      throw new AnalysisException(
                        "Hive built-in ORC data source must be used with Hive support enabled. " +
                        "Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
                        "'native'")
                    } else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
                      provider1 == "com.databricks.spark.avro" ||
                      provider1 == "org.apache.spark.sql.avro") {
                      throw new AnalysisException(
                        s"Failed to find data source: $provider1. Avro is built-in but external data " +
                        "source module since Spark 2.4. Please deploy the application as per " +
                        "the deployment section of \"Apache Avro Data Source Guide\".")
                    } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
                      throw new AnalysisException(
                        s"Failed to find data source: $provider1. Please deploy the application as " +
                        "per the deployment section of " +
                        "\"Structured Streaming + Kafka Integration Guide\".")
                    } else {
                      throw new ClassNotFoundException(
                        s"Failed to find data source: $provider1. Please find packages at " +
                          "http://spark.apache.org/third-party-projects.html",
                        error)
                    }
                }
              } catch {
                case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
                  // NoClassDefFoundError's class name uses "/" rather than "." for packages
                  val className = e.getMessage.replaceAll("/", ".")
                  if (spark2RemovedClasses.contains(className)) {
                    throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
                      "Please check if your library is compatible with Spark 2.0", e)
                  } else {
                    throw e
                  }
              }
            case head :: Nil =>
              // there is exactly one registered alias
              head.getClass
            case sources =>
              // There are multiple registered aliases for the input. If there is single datasource
              // that has "org.apache.spark" package in the prefix, we use it considering it is an
              // internal datasource within Spark.
              val sourceNames = sources.map(_.getClass.getName)
              val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
              if (internalSources.size == 1) {
                logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
                  s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
                internalSources.head.getClass
              } else {
                throw new AnalysisException(s"Multiple sources found for $provider1 " +
                  s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
              }
          }
        } catch {
          case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
            // NoClassDefFoundError's class name uses "/" rather than "." for packages
            val className = e.getCause.getMessage.replaceAll("/", ".")
            if (spark2RemovedClasses.contains(className)) {
              throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
                "Please remove the incompatible library from classpath or upgrade it. " +
                s"Error: ${e.getMessage}", e)
            } else {
              throw e
            }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    Spark2DefaultSource

    那么Spark2DefaultSource是不是DataSourceV2的子类呢,需要看一下它的定义

    class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
      override def shortName(): String = "hudi"
    }
    
    class DefaultSource extends RelationProvider
      with SchemaRelationProvider
      with CreatableRelationProvider
      with DataSourceRegister
      with StreamSinkProvider
      with StreamSourceProvider
      with SparkAdapterSupport
      with Serializable {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可见Spark2DefaultSource并不是DataSourceV2的子类,所以接下来执行saveToV1Source方法

    saveToV1Source

    saveToV1Source方法的核心是后面的runCommand,先看他的参数DataSource.planForWriting

        if (SparkSession.active.sessionState.conf.getConf(
          SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) {
          partitioningColumns.foreach { columns =>
            extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
              DataSourceUtils.encodePartitioningColumns(columns))
          }
        }
    
        // Code path for data source v1.
        runCommand(df.sparkSession, "save") {
          DataSource(
            sparkSession = df.sparkSession,
            className = source,
            partitionColumns = partitioningColumns.getOrElse(Nil),
            options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    DataSource.planForWriting

    根据上面的分析可知DataSource.lookupDataSource返回的为Spark2DefaultSource,并且它的父类DefaultSource实现了CreatableRelationProvider,所以返回SaveIntoDataSourceCommand

      def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
        if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
          throw new AnalysisException("Cannot save interval data type into external storage.")
        }
    
        providingClass.newInstance() match {
          case dataSource: CreatableRelationProvider =>
            // data为df.logicalPlan,dataSource为Spark2DefaultSource
            SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
          case format: FileFormat =>
            DataSource.validateSchema(data.schema)
            planForWritingFileFormat(format, mode, data)
          case _ =>
            sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
        }
      }
      
      // calssName=hudi
      lazy val providingClass: Class[_] =
        DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    runCommand

      private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = {
        val qe = session.sessionState.executePlan(command)
        try {
          val start = System.nanoTime()
          // call `QueryExecution.toRDD` to trigger the execution of commands.
          SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
          val end = System.nanoTime()
          session.listenerManager.onSuccess(name, qe, end - start)
        } catch {
          case e: Exception =>
            session.listenerManager.onFailure(name, qe, e)
            throw e
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们在文章Hudi Spark SQL源码学习总结-Create Table中讲到过:withNewExecutionId方法会调用方法体body,这里的bodyqe.toRdd

      def withNewExecutionId[T](
          sparkSession: SparkSession,
          queryExecution: QueryExecution)(body: => T): T = {
        val sc = sparkSession.sparkContext
        val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
        val executionId = SQLExecution.nextExecutionId
        sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
        executionIdToQueryExecution.put(executionId, queryExecution)
        try {
          // sparkContext.getCallSite() would first try to pick up any call site that was previously
          // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
          // streaming queries would give us call site like "run at :0"
          val callSite = sc.getCallSite()
    
          withSQLConfPropagated(sparkSession) {
            sc.listenerBus.post(SparkListenerSQLExecutionStart(
              executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
              SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
            try {
              body
            } finally {
              sc.listenerBus.post(SparkListenerSQLExecutionEnd(
                executionId, System.currentTimeMillis()))
            }
          }
        } finally {
          executionIdToQueryExecution.remove(executionId)
          sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    qe.toRdd

      lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
    
    • 1

    这里的executedPlanExecutedCommandExec,因为SaveIntoDataSourceCommandRunnableCommand也是Command的子类,同样在我之前写的文章Hudi Spark SQL源码学习总结-Create Table我们可知无论是df.logicalPlan还是executedPlan都会触发一遍完整的Spark SQL的parsinganalysisoptimizationplanning,并且在planning阶段的planner.plan方法中会遍历strategies并应用其apply方法,其中有一个BasicOperators,它的apply方法为

      object BasicOperators extends Strategy {
        def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
          case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
          case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
    
          case MemoryPlan(sink, output) =>
            val encoder = RowEncoder(StructType.fromAttributes(output))
            val toRow = encoder.createSerializer()
            LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: Nil
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我们这里的plan为SaveIntoDataSourceCommandRunnableCommand的子类,所以返回ExecutedCommandExec,它是SparkPlan的子类,至于如何触发Spark SQL的parsinganalysisoptimizationplanning,本文就不再讲解了,只贴一下入口相关的代码

      /** Internal version of the RDD. Avoids copies and has no schema */
      lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
    
      lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
    
      lazy val sparkPlan: SparkPlan = {
        SparkSession.setActiveSession(sparkSession)
        // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
        //       but we will implement to choose the best plan.
        planner.plan(ReturnAnswer(optimizedPlan)).next()
      }
    
      lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)  
    
      lazy val withCachedData: LogicalPlan = {
        assertAnalyzed()
        assertSupported()
        sparkSession.sharedState.cacheManager.useCachedData(analyzed)
      }
    
      // 这里的logical为SaveIntoDataSourceCommand,是在runCommand方法中传参传进来的
      lazy val analyzed: LogicalPlan = {
        SparkSession.setActiveSession(sparkSession)
        sparkSession.sessionState.analyzer.executeAndCheck(logical)
      }   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    executedPlan.execute()

    executedPlan.execute()方法是在SparkPlan中定义的,它会调用doExecute方法,SparkPlandoExecute并没有具体实现,所以需要看一下它的具体实现类

      final def execute(): RDD[InternalRow] = executeQuery {
        if (isCanonicalizedPlan) {
          throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
        }
        doExecute()
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    上面我们讲到,这里的SparkPlan的子类是ExecutedCommandExec,它的doExecute会调用sideEffectResult,继而调用cmd.run,这里的cmdSaveIntoDataSourceCommand

    case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
    
      override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
    
      /**
       * A concrete command should override this lazy field to wrap up any side effects caused by the
       * command or any other computation that should be evaluated exactly once. The value of this field
       * can be used as the contents of the corresponding RDD generated from the physical plan of this
       * command.
       *
       * The `execute()` method of all the physical command classes should reference `sideEffectResult`
       * so that the command can be executed eagerly right after the command query is created.
       */
      protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
        val converter = CatalystTypeConverters.createToCatalystConverter(schema)
        cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
      }
    
      override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
    
      override def output: Seq[Attribute] = cmd.output
    
      override def nodeName: String = "Execute " + cmd.nodeName
    
      override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
    
      override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
    
      override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
    
      protected override def doExecute(): RDD[InternalRow] = {
        sqlContext.sparkContext.parallelize(sideEffectResult, 1)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    SaveIntoDataSourceCommand.run

    它的run方法会调用dataSource.createRelation,在上面构造SaveIntoDataSourceCommand时我们知道dataSourceSpark2DefaultSource

    case class SaveIntoDataSourceCommand(
        query: LogicalPlan,
        dataSource: CreatableRelationProvider,
        options: Map[String, String],
        mode: SaveMode) extends RunnableCommand {
    
      override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
    
      override def run(sparkSession: SparkSession): Seq[Row] = {
        dataSource.createRelation(
          sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, query))
    
        Seq.empty[Row]
      }
    
      override def simpleString: String = {
        val redacted = SQLConf.get.redactOptions(options)
        s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    Spark2DefaultSource.createRelation

    Spark2DefaultSourcecreateRelation方法是在其父类DefaultSource中实现的,可见在createRelation中调用了HoodieSparkSqlWriter.write,到此为止,我们终于搞清楚了从df.write.format("hudi").saveHoodieSparkSqlWriter.write的逻辑。

    class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
      override def shortName(): String = "hudi"
    }
    
    /**
      * Hoodie Spark Datasource, for reading and writing hoodie tables
      *
      */
    class DefaultSource extends RelationProvider
      with SchemaRelationProvider
      with CreatableRelationProvider
      with DataSourceRegister
      with StreamSinkProvider
      with StreamSourceProvider
      with SparkAdapterSupport
      with Serializable {
    
      SparkSession.getActiveSession.foreach { spark =>
        val sparkVersion = spark.version
        if (sparkVersion.startsWith("0.") || sparkVersion.startsWith("1.") || sparkVersion.startsWith("2.")) {
          // Enable "passPartitionByAsOptions" to support "write.partitionBy(...)"
          spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions", "true")
        }
      }
    
      private val log = LogManager.getLogger(classOf[DefaultSource])
    
      override def createRelation(sqlContext: SQLContext,
                                  parameters: Map[String, String]): BaseRelation = {
        createRelation(sqlContext, parameters, null)
      }
    
      override def createRelation(sqlContext: SQLContext,
                                  optParams: Map[String, String],
                                  schema: StructType): BaseRelation = {
        // Add default options for unspecified read options keys.
        val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
    
        ......
      }
      ......
      /**
        * This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy
        * relation here because Spark does not really make use of the relation returned, and just returns an empty
        * dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost
        * of creating and returning a parquet relation here.
        *
        * TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
        *       That is the only case where Spark seems to actually need a relation to be returned here
        *       [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]]
        *
        * @param sqlContext Spark SQL Context
        * @param mode Mode for saving the DataFrame at the destination
        * @param optParams Parameters passed as part of the DataFrame write operation
        * @param df Spark DataFrame to be written
        * @return Spark Relation
        */
      override def createRelation(sqlContext: SQLContext,
                                  mode: SaveMode,
                                  optParams: Map[String, String],
                                  df: DataFrame): BaseRelation = {
        val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
    
        if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
          HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
        } else {
          HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
        }
        new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
      }
      ......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    总结

    本文分析总结了从df.write.format("hudi").saveHoodieSparkSqlWriter.write的调用逻辑,解决了自己最开始学习Hudi时的一个疑惑😄,希望对大家也能有所帮助。

  • 相关阅读:
    【计组笔记】02_数据表示
    深入浅出大数据:88页Hadoop实战手册,重视实操易于理解
    WPF如何构建MVVM+模块化的桌面应用
    unix多进程多线程
    CSDN上代码块背景颜色的设置
    jclasslib :java class字节码jvm指令分析
    第三篇 《随机点名答题系统》——人员管理详解(类抽奖系统、在线答题系统、线上答题系统、在线点名系统、线上点名系统、在线考试系统、线上考试系统)
    携程开源分布式配置系统Apollo服务端是如何实时更新配置的?
    Elasticsearch 索引库操作 &文档操作
    在 Apache Tomcat 服务器上启用 HTTPS 或 SSL 正确方式的分步指南 – 端口 8443
  • 原文地址:https://blog.csdn.net/dkl12/article/details/126146654