在开始学习Hudi的时候,我们知道通过df.write.format("hudi").save可以实现写Hudi,并且写Hudi的逻辑是在HoodieSparkSqlWriter.write实现的,但是始终有一个疑问:它怎么从df.write.format("hudi").save跳到HoodieSparkSqlWriter.write中的呢?本文就是主要来回答这个问题的。
Spark 2.4.4
Hudi 0.12.0-SNAPSHOT,和上篇文章Hudi Spark SQL源码学习总结-CTAS用的Hudi代码一样
还是拿源码里的TestCreateTable中的测试语句
import spark.implicits._
val df = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "ts")
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tmp.getCanonicalPath)
返回DataFrameWriter,后面的format、option、save等方法都是在这个类中
def write: DataFrameWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
"'write' can not be called on streaming Dataset/DataFrame")
}
new DataFrameWriter[T](this)
}
source=“hudi”,后面save时会用到
def format(source: String): DataFrameWriter[T] = {
this.source = source
this
}
save方法首先添加path参数,然后判断source是否等于hive,我们这里source等于hudi,所以不满足,接下来通过DataSource.lookupDataSource查找hudi对应的dataSouce类,然后判断它是不是DataSourceV2的子类,再执行后面的逻辑,所以我们需要先看一下DataSource.lookupDataSource
def save(path: String): Unit = {
// 添加ptah参数
this.extraOptions += ("path" -> path)
save()
}
def save(): Unit = {
// 首先判断source是否等于hive
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}
assertNotBucketed("save")
// 通过DataSource.lookupDataSource查找hudi对应的dataSouce类
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
// DataSourceV2是否是cls的父类
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val source = cls.newInstance().asInstanceOf[DataSourceV2]
source match {
case ws: WriteSupport =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source,
df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val writer = ws.createWriter(
UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
new DataSourceOptions(options.asJava))
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
}
}
// Streaming also uses the data source V2 API. So it may be that the data source implements
// v2, but has no v2 implementation for batch writes. In that case, we fall back to saving
// as though it's a V1 source.
case _ => saveToV1Source()
}
} else {
saveToV1Source()
}
}
其实我们在上篇文章中讲isV2Provider时涉及到Spark3.2.1版本的lookupDataSource方法了,spark2.4.4的也差不多,我们再来看一下:其中的provider1 = hudi, provider2 = hudi.DefaultSource,然后加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,再返回里面的内容,和Hudi相关的有org.apache.hudi.DefaultSource org.apache.hudi.Spark2DefaultSource org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat 等,然后过滤shortName=hudi的,只有Spark2DefaultSource满足,所以直接返回Spark2DefaultSource。这里和Spark3.2.1不同的是: Spark2对应Spark2DefaultSource Spark3对应Spark3DefaultSource
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
// hudi
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
// hudi.DefaultSource
val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
/ 这里是去加载所有的META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,然后返回里面的内容
// 其中有`org.apache.hudi.DefaultSource` `org.apache.hudi.Spark2DefaultSource` `org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat` 等
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
try {
// 过滤shortName=hudi的,只有Spark2DefaultSource满足,所以直接返回Spark2DefaultSource
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
// the provider format did not match any given registered aliases
case Nil =>
try {
Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) =>
// Found the data source using fully qualified path
dataSource
case Failure(error) =>
if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
throw new AnalysisException(
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro" ||
provider1 == "org.apache.spark.sql.avro") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Avro is built-in but external data " +
"source module since Spark 2.4. Please deploy the application as per " +
"the deployment section of \"Apache Avro Data Source Guide\".")
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Please deploy the application as " +
"per the deployment section of " +
"\"Structured Streaming + Kafka Integration Guide\".")
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +
"http://spark.apache.org/third-party-projects.html",
error)
}
}
} catch {
case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
"Please check if your library is compatible with Spark 2.0", e)
} else {
throw e
}
}
case head :: Nil =>
// there is exactly one registered alias
head.getClass
case sources =>
// There are multiple registered aliases for the input. If there is single datasource
// that has "org.apache.spark" package in the prefix, we use it considering it is an
// internal datasource within Spark.
val sourceNames = sources.map(_.getClass.getName)
val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
if (internalSources.size == 1) {
logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
throw new AnalysisException(s"Multiple sources found for $provider1 " +
s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
}
}
} catch {
case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getCause.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
"Please remove the incompatible library from classpath or upgrade it. " +
s"Error: ${e.getMessage}", e)
} else {
throw e
}
}
}
那么Spark2DefaultSource是不是DataSourceV2的子类呢,需要看一下它的定义
class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
override def shortName(): String = "hudi"
}
class DefaultSource extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with DataSourceRegister
with StreamSinkProvider
with StreamSourceProvider
with SparkAdapterSupport
with Serializable {
可见Spark2DefaultSource并不是DataSourceV2的子类,所以接下来执行saveToV1Source方法
saveToV1Source方法的核心是后面的runCommand,先看他的参数DataSource.planForWriting
if (SparkSession.active.sessionState.conf.getConf(
SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) {
partitioningColumns.foreach { columns =>
extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
DataSourceUtils.encodePartitioningColumns(columns))
}
}
// Code path for data source v1.
runCommand(df.sparkSession, "save") {
DataSource(
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
}
根据上面的分析可知DataSource.lookupDataSource返回的为Spark2DefaultSource,并且它的父类DefaultSource实现了CreatableRelationProvider,所以返回SaveIntoDataSourceCommand
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
// data为df.logicalPlan,dataSource为Spark2DefaultSource
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
DataSource.validateSchema(data.schema)
planForWritingFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
// calssName=hudi
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
try {
val start = System.nanoTime()
// call `QueryExecution.toRDD` to trigger the execution of commands.
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
val end = System.nanoTime()
session.listenerManager.onSuccess(name, qe, end - start)
} catch {
case e: Exception =>
session.listenerManager.onFailure(name, qe, e)
throw e
}
}
我们在文章Hudi Spark SQL源码学习总结-Create Table中讲到过:withNewExecutionId方法会调用方法体body,这里的body为qe.toRdd
def withNewExecutionId[T](
sparkSession: SparkSession,
queryExecution: QueryExecution)(body: => T): T = {
val sc = sparkSession.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
executionIdToQueryExecution.put(executionId, queryExecution)
try {
// sparkContext.getCallSite() would first try to pick up any call site that was previously
// set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
// streaming queries would give us call site like "run at :0"
val callSite = sc.getCallSite()
withSQLConfPropagated(sparkSession) {
sc.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
try {
body
} finally {
sc.listenerBus.post(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
}
}
} finally {
executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
}
}
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
这里的executedPlan 为ExecutedCommandExec,因为SaveIntoDataSourceCommand是RunnableCommand也是Command的子类,同样在我之前写的文章Hudi Spark SQL源码学习总结-Create Table我们可知无论是df.logicalPlan还是executedPlan都会触发一遍完整的Spark SQL的parsing、analysis、optimization 、planning,并且在planning阶段的planner.plan方法中会遍历strategies并应用其apply方法,其中有一个BasicOperators,它的apply方法为
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(StructType.fromAttributes(output))
val toRow = encoder.createSerializer()
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: Nil
我们这里的plan为SaveIntoDataSourceCommand是RunnableCommand的子类,所以返回ExecutedCommandExec,它是SparkPlan的子类,至于如何触发Spark SQL的parsing、analysis、optimization 、planning,本文就不再讲解了,只贴一下入口相关的代码
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
assertSupported()
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}
// 这里的logical为SaveIntoDataSourceCommand,是在runCommand方法中传参传进来的
lazy val analyzed: LogicalPlan = {
SparkSession.setActiveSession(sparkSession)
sparkSession.sessionState.analyzer.executeAndCheck(logical)
}
executedPlan.execute()方法是在SparkPlan中定义的,它会调用doExecute方法,SparkPlan的doExecute并没有具体实现,所以需要看一下它的具体实现类
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
上面我们讲到,这里的SparkPlan的子类是ExecutedCommandExec,它的doExecute会调用sideEffectResult,继而调用cmd.run,这里的cmd为SaveIntoDataSourceCommand
case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
}
override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
override def output: Seq[Attribute] = cmd.output
override def nodeName: String = "Execute " + cmd.nodeName
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
}
它的run方法会调用dataSource.createRelation,在上面构造SaveIntoDataSourceCommand时我们知道dataSource为Spark2DefaultSource
case class SaveIntoDataSourceCommand(
query: LogicalPlan,
dataSource: CreatableRelationProvider,
options: Map[String, String],
mode: SaveMode) extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
dataSource.createRelation(
sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, query))
Seq.empty[Row]
}
override def simpleString: String = {
val redacted = SQLConf.get.redactOptions(options)
s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
}
}
Spark2DefaultSource的createRelation方法是在其父类DefaultSource中实现的,可见在createRelation中调用了HoodieSparkSqlWriter.write,到此为止,我们终于搞清楚了从df.write.format("hudi").save到HoodieSparkSqlWriter.write的逻辑。
class Spark2DefaultSource extends DefaultSource with DataSourceRegister {
override def shortName(): String = "hudi"
}
/**
* Hoodie Spark Datasource, for reading and writing hoodie tables
*
*/
class DefaultSource extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with DataSourceRegister
with StreamSinkProvider
with StreamSourceProvider
with SparkAdapterSupport
with Serializable {
SparkSession.getActiveSession.foreach { spark =>
val sparkVersion = spark.version
if (sparkVersion.startsWith("0.") || sparkVersion.startsWith("1.") || sparkVersion.startsWith("2.")) {
// Enable "passPartitionByAsOptions" to support "write.partitionBy(...)"
spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions", "true")
}
}
private val log = LogManager.getLogger(classOf[DefaultSource])
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType): BaseRelation = {
// Add default options for unspecified read options keys.
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
......
}
......
/**
* This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy
* relation here because Spark does not really make use of the relation returned, and just returns an empty
* dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost
* of creating and returning a parquet relation here.
*
* TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
* That is the only case where Spark seems to actually need a relation to be returned here
* [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]]
*
* @param sqlContext Spark SQL Context
* @param mode Mode for saving the DataFrame at the destination
* @param optParams Parameters passed as part of the DataFrame write operation
* @param df Spark DataFrame to be written
* @return Spark Relation
*/
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
}
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}
......
本文分析总结了从df.write.format("hudi").save到HoodieSparkSqlWriter.write的调用逻辑,解决了自己最开始学习Hudi时的一个疑惑😄,希望对大家也能有所帮助。