由于工作原因,之前查询Hudi主要是用Hive来查询的,所以对Hive查询Hudi的逻辑比较了解,但是对于Spark查询Hudi的逻辑不太了解。所以现在想要学习一下Spark查询Hudi的大概逻辑,搞清楚它是如何从Spark的源码跳转到Hudi源码执行Hudi查询的逻辑, 这样既能搞清楚Spark查询表的逻辑,也能搞清楚Spark查询Hudi的逻辑,也便于再后面使用Kyuubi Spark SQL 时出现问题能更好的定位解决。
Spark 2.4.4
Hudi master 0.12.0-SNAPSHOT 最新代码
(可以借助Spark3 planChangeLog 打印日志信息查看哪些规则生效)
先用上篇文章写Hudi数据,再进行查询
import spark.implicits._
val df = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "ts")
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tmp.getCanonicalPath)
spark.read.format("hudi").load(tmp.getCanonicalPath).show()
返回DataFrameReader,后面的format、load方法都是在这个类中
def read: DataFrameReader = new DataFrameReader(self)
source=“hudi”,后面load时会用到
def format(source: String): DataFrameReader = {
this.source = source
this
}
save方法首先添加path参数,然后判断source是否等于hive,我们这里source等于hudi,所以不满足,接下来通过DataSource.lookupDataSource查找hudi对应的dataSouce类,然后判断它是不是DataSourceV2的子类,再执行后面的逻辑,我们在上篇文章讲过了DataSource.lookupDataSource返回的是Spark2DefaultSource且不是DataSourceV2的子类,所以接下来执行loadV1Source方法
def load(path: String): DataFrame = {
// force invocation of `load(...varargs...)`
option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
}
/**
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
* 对于支持多路径的数据源,将输入加载为`DataFrame`
* Only works if the source is a HadoopFsRelationProvider.
* 仅当源是HadoopFsRelationProvider时有效。
* @since 1.6.0
*/
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
// 首先判断source是否等于hive
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
// 通过DataSource.lookupDataSource查找hudi对应的dataSouce类
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
// DataSourceV2是否是cls的父类
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
if (ds.isInstanceOf[ReadSupport]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = ds, conf = sparkSession.sessionState.conf)
val pathsOption = {
val objectMapper = new ObjectMapper()
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
}
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
ds, sessionOptions ++ extraOptions.toMap + pathsOption,
userSpecifiedSchema = userSpecifiedSchema))
} else {
loadV1Source(paths: _*)
}
} else {
loadV1Source(paths: _*)
}
}
loadV1Source方法的核心是后面的sparkSession.baseRelationToDataFrame,先看他的参数DataSource.resolveRelation
private def loadV1Source(paths: String*) = {
// Code path for data source v1.
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
同样地这里的providingClass.newInstance()为DataSource.lookupDataSource返回的为Spark2DefaultSource,它的父类DefaultSource既实现了SchemaRelationProvider也实现了RelationProvider,但是这里的userSpecifiedSchema为None,所以会匹配到第二个,所以relation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
* @param checkFilesExist Whether to confirm that the files exist when generating the
* non-streaming file based datasource. StructuredStreaming jobs already
* list file existence, and when generating incremental jobs, the batch
* is considered as a non-streaming file based data source. Since we know
* that files already exist, we don't need to check them again.
*/
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
case (dataSource: RelationProvider, None) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
case (dataSource: RelationProvider, Some(schema)) =>
val baseRelation =
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
if (baseRelation.schema != schema) {
throw new AnalysisException(s"$className does not allow user-specified schemas.")
}
baseRelation
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
......
relation
}
// calssName=hudi
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
这里的createRelation是在Spark2DefaultSource的父类DefaultSource中实现的,首先根据传入的路径参数,获取表路径,然后创建HoodieTableMetaClient,获取表类型等表配置,再根据表的配置信息判断返回什么类型的Relation,这里返回resolveBaseFileOnlyRelation
(可见createRelation是实现Hudi自己的查询逻辑的入口)
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType): BaseRelation = {
val path = optParams.get("path")
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
if (path.isEmpty && readPathsStr.isEmpty) {
throw new HoodieException(s"'path' or '$READ_PATHS' or both must be specified.")
}
val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
} else {
Seq.empty
}
// Add default options for unspecified read options keys.
val parameters = (if (globPaths.nonEmpty) {
Map(
"glob.paths" -> globPaths.mkString(",")
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
// Get the table base path
// 获取表路径
val tablePath = if (globPaths.nonEmpty) {
DataSourceUtils.getTablePath(fs, globPaths.toArray)
} else {
DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
}
log.info("Obtained hudi table path: " + tablePath)
// 创建HoodieTableMetaClient
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
// 是否为BootstrappedTable,本例为false
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
// 表类型,本例为COPY_ON_WRITE
val tableType = metaClient.getTableType
// 查询类型,本例为snapshot
val queryType = parameters(QUERY_TYPE.key)
// NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
// Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
// case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
// from the table itself
val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
None
} else {
Option(schema)
}
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
// 判断有没有完成的commit,即判断是不是空表
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { // 如果是空表
// 返回EmptyRelation
new EmptyRelation(sqlContext, metaClient)
} else {
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient)
case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
s"isBootstrappedTable: $isBootstrappedTable ")
}
}
}
private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
optParams: Map[String, String]): BaseRelation = {
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
// NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
// [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
// vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
//
// You can check out HUDI-3896 for more details
if (baseRelation.hasSchemaOnRead) {
baseRelation
} else {
baseRelation.toHadoopFsRelation
}
}
class BaseFileOnlyRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
......
hasSchemaOnRead是在BaseFileOnlyRelation的父类HoodieBaseRelation中定义的
def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
None
} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
None
}
}
val avroSchema = internalSchemaOpt.map { is =>
AvroInternalSchemaConverter.convert(is, "schema")
} orElse {
schemaSpec.map(convertToAvroSchema)
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logError("Failed to fetch schema from the table", e)
throw new HoodieSchemaException("Failed to fetch schema from the table")
}
}
(avroSchema, internalSchemaOpt)
}
private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
因为SCHEMA_EVOLUTION_ENABLED的默认值为false,所以internalSchemaOpt返回None,hasSchemaOnRead返回false,所以在resolveBaseFileOnlyRelation返回调用baseRelation.toHadoopFsRelation
val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
.key("hoodie.schema.on.read.enable")
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");
这个方法返回HadoopFsRelation,其中location为HoodieFileIndex,是在其父类HoodieBaseRelation中定义的,fileFomart为Spark24HoodieParquetFileFormat
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
internalSchemaOpt.isEmpty
/**
* NOTE: We have to fallback to [[HadoopFsRelation]] to make sure that all of the Spark optimizations could be
* equally applied to Hudi tables, since some of those are predicated on the usage of [[HadoopFsRelation]],
* and won't be applicable in case of us using our own custom relations (one of such optimizations is [[SchemaPruning]]
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
// globPaths为空
if (globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
// - Values parsed from the actual partition path would be appended to the final dataset
//
// In the former case, we don't need to provide the partition-schema to the relation,
// therefore we simply stub it w/ empty schema and use full table-schema as the one being
// read from the data file.
//
// In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
// being a table-schema with all partition columns stripped out
// shouldExtractPartitionValuesFromPartitionPath 为true
val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) {
(fileIndex.partitionSchema, fileIndex.dataSchema)
} else {
(StructType(Nil), tableStructSchema)
}
// 这里的 location = fileIndex,fileIndex为HoodieFileIndex
HadoopFsRelation(
location = fileIndex,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = fileFormat,
optParams)(sparkSession)
} else {
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
// NOTE: Spark is able to infer partitioning values from partition path only when Hive-style partitioning
// scheme is used. Therefore, we fallback to reading the table as non-partitioned (specifying
// partitionColumns = Seq.empty) whenever Hive-style partitioning is not involved
val partitionColumns: Seq[String] = if (tableConfig.getHiveStylePartitioningEnable.toBoolean) {
this.partitionColumns
} else {
Seq.empty
}
DataSource.apply(
sparkSession = sparkSession,
paths = extraReadPaths,
// Here we should specify the schema to the latest commit schema since
// the table schema evolution.
userSpecifiedSchema = userSchema.orElse(Some(tableStructSchema)),
className = fileFormatClassName,
options = optParams ++ Map(
// Since we're reading the table as just collection of files we have to make sure
// we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
// while keeping previous versions of the files around as well.
//
// We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
"mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName,
// We have to override [[EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH]] setting, since
// the relation might have this setting overridden
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key -> shouldExtractPartitionValuesFromPartitionPath.toString,
// NOTE: We have to specify table's base-path explicitly, since we're requesting Spark to read it as a
// list of globbed paths which complicates partitioning discovery for Spark.
// Please check [[PartitioningAwareFileIndex#basePaths]] comment for more details.
PartitioningAwareFileIndex.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString
),
partitionColumns = partitionColumns
)
.resolveRelation()
.asInstanceOf[HadoopFsRelation]
}
}
protected lazy val fileIndex: HoodieFileIndex =
HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams,
FileStatusCache.getOrCreate(sparkSession))
HoodieFileIndex继承了SparkHoodieTableFileIndex,实现了Spark SQL源码里的FileIndex的listFiles等方法,这在查询Hudi表文件时有用,也正是实现查询Hudi表逻辑的地方,接下来让我们往下看它的listFiles方法是如何被调用以及如何返回查询结果的
case class HoodieFileIndex(spark: SparkSession,
metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType],
options: Map[String, String],
@transient fileStatusCache: FileStatusCache = NoopCache)
extends SparkHoodieTableFileIndex(
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
with FileIndex {
lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
(parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
}
// Spark2Adapter
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
}
通过上面的分析我们知道了,resolveRelation返回的是HadoopFsRelation(HoodieFileIndex,partitionSchema,dataSchema,None,Spark24HoodieParquetFileFormat,optParams)(sparkSession),接下来看一下baseRelationToDataFrame
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
object LogicalRelation {
def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation =
LogicalRelation(relation, relation.schema.toAttributes, None, isStreaming)
def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
LogicalRelation(relation, relation.schema.toAttributes, Some(table), false)
}
Dataset.ofRows在前面的几篇文章中已经分析过多次了,所以我们挑重点进行分析,其中参数logicalPlan为LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...),relation.schema.toAttributes, None, false)
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
我们知道对于查询上面的Dataset.ofRows只会通过qe.assertAnalyzed()触发analysis阶段,通过分析日志我们发现并没有生效的规则,所以我们重点看一下后面的show,它最后会调用到showString,而showString的核心逻辑在getRows
def show(): Unit = show(20)
def show(numRows: Int): Unit = show(numRows, truncate = true)
def show(numRows: Int, truncate: Boolean): Unit = if (truncate) {
println(showString(numRows, truncate = 20))
} else {
println(showString(numRows, truncate = 0))
}
private[sql] def showString(
_numRows: Int,
truncate: Int = 20,
vertical: Boolean = false): String = {
val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
// Get rows represented by Seq[Seq[String]], we may get one more line if it has more data.
val tmpRows = getRows(numRows, truncate)
val hasMoreData = tmpRows.length - 1 > numRows
val rows = tmpRows.take(numRows + 1)
val sb = new StringBuilder
val numCols = schema.fieldNames.length
// We set a minimum column width at '3'
val minimumColWidth = 3
if (!vertical) {
// Initialise the width of each column to a minimum value
val colWidths = Array.fill(numCols)(minimumColWidth)
// Compute the width of each column
for (row <- rows) {
for ((cell, i) <- row.zipWithIndex) {
colWidths(i) = math.max(colWidths(i), Utils.stringHalfWidth(cell))
}
}
val paddedRows = rows.map { row =>
row.zipWithIndex.map { case (cell, i) =>
if (truncate > 0) {
StringUtils.leftPad(cell, colWidths(i) - Utils.stringHalfWidth(cell) + cell.length)
} else {
StringUtils.rightPad(cell, colWidths(i) - Utils.stringHalfWidth(cell) + cell.length)
}
}
}
// Create SeparateLine
val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()
// column names
paddedRows.head.addString(sb, "|", "|", "|\n")
sb.append(sep)
// data
paddedRows.tail.foreach(_.addString(sb, "|", "|", "|\n"))
sb.append(sep)
} else {
// Extended display mode enabled
val fieldNames = rows.head
val dataRows = rows.tail
// Compute the width of field name and data columns
val fieldNameColWidth = fieldNames.foldLeft(minimumColWidth) { case (curMax, fieldName) =>
math.max(curMax, Utils.stringHalfWidth(fieldName))
}
val dataColWidth = dataRows.foldLeft(minimumColWidth) { case (curMax, row) =>
math.max(curMax, row.map(cell => Utils.stringHalfWidth(cell)).max)
}
dataRows.zipWithIndex.foreach { case (row, i) =>
// "+ 5" in size means a character length except for padded names and data
val rowHeader = StringUtils.rightPad(
s"-RECORD $i", fieldNameColWidth + dataColWidth + 5, "-")
sb.append(rowHeader).append("\n")
row.zipWithIndex.map { case (cell, j) =>
val fieldName = StringUtils.rightPad(fieldNames(j),
fieldNameColWidth - Utils.stringHalfWidth(fieldNames(j)) + fieldNames(j).length)
val data = StringUtils.rightPad(cell,
dataColWidth - Utils.stringHalfWidth(cell) + cell.length)
s" $fieldName | $data "
}.addString(sb, "", "\n", "\n")
}
}
// Print a footer
if (vertical && rows.tail.isEmpty) {
// In a vertical mode, print an empty row set explicitly
sb.append("(0 rows)\n")
} else if (hasMoreData) {
// For Data that has more than "numRows" records
val rowsString = if (numRows == 1) "row" else "rows"
sb.append(s"only showing top $numRows $rowsString\n")
}
sb.toString()
}
getRows的核心是在newDf.select(castCols: _*).take(numRows + 1),它会触发planning
private[sql] def getRows(
numRows: Int,
truncate: Int): Seq[Seq[String]] = {
val newDf = toDF()
val castCols = newDf.logicalPlan.output.map { col =>
// Since binary types in top-level schema fields have a specific format to print,
// so we do not cast them to strings here.
if (col.dataType == BinaryType) {
Column(col)
} else {
Column(col).cast(StringType)
}
}
val data = newDf.select(castCols: _*).take(numRows + 1)
// For array values, replace Seq and Array with square brackets
// For cells that are beyond `truncate` characters, replace it with the
// first `truncate-3` and "..."
schema.fieldNames.toSeq +: data.map { row =>
row.toSeq.map { cell =>
val str = cell match {
case null => "null"
case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
case _ => cell.toString
}
if (truncate > 0 && str.length > truncate) {
// do not show ellipses for strings shorter than 4 characters.
if (truncate < 4) str.substring(0, truncate)
else str.substring(0, truncate - 3) + "..."
} else {
str
}
}: Seq[String]
}
}
先看它的select,其中select中的logicalPlan为LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...),所以withPlan的logicalPlan为Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))
def select(cols: Column*): DataFrame = withPlan {
Project(cols.map(_.named), logicalPlan)
}
@inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}
接下来到了take
def take(n: Int): Array[T] = head(n)
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
先看参数limit
def limit(n: Int): Dataset[T] = withTypedPlan {
Limit(Literal(n), logicalPlan)
}
其中Literal(n)的apply方法返回Literal(21, IntegerType)
object Limit {
def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
}
def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
p match {
case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
case _ => None
}
}
}
Limit的apply返回GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))),所以limit返回Dataset(sparkSession, GlobalLimit(Literal(21, IntegerType), LocalLimit(Literal(21, IntegerType), Project(cols.map(.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,…)))))
@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
}
withAction在前面的几篇文章也讲过了,其中executedPlan都会触发一遍完整的Spark SQL的parsing、analysis、optimization 、planning,并且最终会在withNewExecutionId调用方法action,具体为上面的collectFromPlan,让我们先看看有哪些重要的规则或者策略生效了
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
try {
qe.executedPlan.foreach { plan =>
plan.resetMetrics()
}
val start = System.nanoTime()
val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
action(qe.executedPlan)
}
val end = System.nanoTime()
sparkSession.listenerManager.onSuccess(name, qe, end - start)
result
} catch {
case e: Exception =>
sparkSession.listenerManager.onFailure(name, qe, e)
throw e
}
}
这里提一下plan入口,之前没有注意到,这里是把optimizedPlan用ReturnAnswer包装了一下(因为后面分析会用到)
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
根据ReturnAnswer注释可知,对于take或collect等操作,仅会在逻辑查询计划的顶部进行规则转化
/**
* 在计划take()或collect()操作时,在调用查询计划器之前,此特殊节点插入逻辑计划的顶部。
* When planning take() or collect() operations, this special node that is inserted at the top of
* the logical plan before invoking the query planner.
* 规则可以在此节点上进行模式匹配,以便应用仅在逻辑查询计划顶部生效的转换。
* Rules can pattern-match on this node in order to apply transformations that only take effect
* at the top of the logical query plan.
*/
case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
先看一下有哪些策略
0 = {SparkStrategies$PythonEvals$@13376}
1 = {DataSourceV2Strategy$@13377}
2 = {FileSourceStrategy$@13378}
3 = {DataSourceStrategy@13388}
4 = {SparkStrategies$SpecialLimits$@13380}
5 = {SparkStrategies$Aggregation$@13381}
6 = {SparkStrategies$Window$@13382}
7 = {SparkStrategies$JoinSelection$@13383}
8 = {SparkStrategies$InMemoryScans$@13384}
9 = {SparkStrategies$BasicOperators$@13385}
再看一下plan方法,之前对它的理解仅限于他会遍历策略调用aplly,现在对它的理解稍微多了一点,它首先遍历一遍策略返回candidates,再遍历candidates,调用collectPlaceholders拆开被PlanLater包装的子计划返回placeholders,如果placeholders非空,则再递归调用plan方法应用其子节点
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))
// The candidates may contain placeholders marked as [[planLater]],
// so try to replace them by their child plans.
val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate)
if (placeholders.isEmpty) {
// Take the candidate as is because it does not contain placeholders.
Iterator(candidate)
} else {
// Plan the logical plan marked as [[planLater]] and replace the placeholders.
placeholders.iterator.foldLeft(Iterator(candidate)) {
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder.
val childPlans = this.plan(logicalPlan)
candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan =>
// Replace the placeholder by the child plan
candidateWithPlaceholders.transformUp {
case p if p.eq(placeholder) => childPlan
}
}
}
}
}
}
val pruned = prunePlans(plans)
assert(pruned.hasNext, s"No plan for $plan")
pruned
}
第一遍遍历因为有ReturnAnswer包装所以在SpecialLimits匹配成功
/**
* Plans special cases of limit operators.
*/
object SpecialLimits extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ReturnAnswer(rootPlan) => rootPlan match {
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), child) =>
CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case Limit(IntegerLiteral(limit), Sort(order, true, child))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
case _ => Nil
}
}
匹配Limit时,会调用它的unapply,它会匹配到第三个,返回CollectLimitExec(21,PlanLater(Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)))
def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
p match {
case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
case _ => None
}
}
其中PlanLater只是把plan包装了一下,在planner.plan方法中会通过collectPlaceholders拆开
override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
plan.collect {
case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
}
}
拆开PlanLater,为Project(cols.map(_.named),LogicalRelation(HadoopFsRelation(HoodieFileIndex,_...)),在FileSourceStrategy匹配时调用PhysicalOperation的unapply方法,所以这里会在第一个匹配成功,返回ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))),因为这里为子节点,所以加上父节点,最后返回的是CollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))
object PhysicalOperation extends PredicateHelper {
type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan)
def unapply(plan: LogicalPlan): Option[ReturnType] = {
val (fields, filters, child, _) = collectProjectsAndFilters(plan)
Some((fields.getOrElse(child.output), filters, child))
}
private def collectProjectsAndFilters(plan: LogicalPlan):
(Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) =
plan match {
case Project(fields, child) if fields.forall(_.deterministic) =>
val (_, filters, other, aliases) = collectProjectsAndFilters(child)
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
case Filter(condition, child) if condition.deterministic =>
val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
val substitutedCondition = substitute(aliases)(condition)
(fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
case other =>
(None, Nil, other, Map.empty)
}
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
// - bucket keys only - optionally used to prune files to read
// - keys stored in the data only - optionally used to skip groups of data in files
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
val partitionColumns =
l.resolve(
fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(normalizedFilters
.filterNot(SubqueryExpression.hasSubquery(_))
.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
genBucketSet(normalizedFilters, bucketSpec.get)
} else {
None
}
val dataColumns =
l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
val requiredAttributes = AttributeSet(requiredExpressions)
val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
val outputAttributes = readDataColumns ++ partitionColumns
val scan =
FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
bucketSet,
dataFilters,
table.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
val withProjections = if (projects == withFilter.output) {
withFilter
} else {
execution.ProjectExec(projects, withFilter)
}
withProjections :: Nil
case _ => Nil
}
上面返回的sparkPlan为CollectLimitExec(21,ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...))))
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}
def apply(plan: SparkPlan): SparkPlan = {
if (conf.wholeStageEnabled) { // 默认true
WholeStageCodegenId.resetPerQuery()
insertWholeStageCodegen(plan)
} else {
plan
}
}
首先CollectLimitExec匹配到other,所以递归调用children,ProjectExec实现了CodegenSupport,返回WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))),因为ProjectExec是子节点,所以最终返回CollectLimitExec(21,WholeStageCodegenExec(ProjectExec(projectList,FileSourceScanExec(HadoopFsRelation(HoodieFileIndex,_...)))))
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match {
// For operators that will output domain object, do not insert WholeStageCodegen for it as
// domain object can not be written into unsafe row.
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegenExec(insertInputAdapter(plan))(WholeStageCodegenId.getNextStageId())
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}
case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
private def collectFromPlan(plan: SparkPlan): Array[T] = {
// This projection writes output to a `InternalRow`, which means applying this projection is not
// thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
plan.executeCollect().map { row =>
// The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
// parameter of its `get` method, so it's safe to use null here.
objProj(row).get(0, null).asInstanceOf[T]
}
}
这里的plan为CollectLimitExec,调用child.executeTake,这里child为WholeStageCodegenExec,它没有重写executeTake,所以调用父类SparkPlan的executeTake
case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
val shuffled = new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
locallyLimited, child.output, SinglePartition, serializer))
shuffled.mapPartitionsInternal(_.take(limit))
}
}
核心getByteArrayRdd
/**
* Runs this query returning the first `n` rows as an array.
*
* This is modeled after `RDD.take` but never runs any job locally on the driver.
*/
def executeTake(n: Int): Array[InternalRow] = {
if (n == 0) {
return new Array[InternalRow](0)
}
val childRDD = getByteArrayRdd(n).map(_._2)
val buf = new ArrayBuffer[InternalRow]
val totalParts = childRDD.partitions.length
var partsScanned = 0
while (buf.size < n && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
if (buf.isEmpty) {
numPartsToTry = partsScanned * limitScaleUpFactor
} else {
val left = n - buf.size
// As left > 0, numPartsToTry is always >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
}
}
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val sc = sqlContext.sparkContext
val res = sc.runJob(childRDD,
(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p)
buf ++= res.flatMap(decodeUnsafeRows)
partsScanned += p.size
}
if (buf.size > n) {
buf.take(n).toArray
} else {
buf.toArray
}
}
private def getByteArrayRdd(n: Int = -1): RDD[(Long, Array[Byte])] = {
execute().mapPartitionsInternal { iter =>
var count = 0
val buffer = new Array[Byte](4 << 10) // 4K
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(codec.compressedOutputStream(bos))
// `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is
// not hit.
while ((n < 0 || count < n) && iter.hasNext) {
val row = iter.next().asInstanceOf[UnsafeRow]
out.writeInt(row.getSizeInBytes)
row.writeToStream(out, buffer)
count += 1
}
out.writeInt(-1)
out.flush()
out.close()
Iterator((count, bos.toByteArray))
}
}
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
WholeStageCodegenExec.doExecute,核心:child.asInstanceOf[CodegenSupport].inputRDDs(),这里的child为ProjectExec
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
// try to compile and fallback if it failed
val (_, maxCodeSize) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
// We should already saw the error message
logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString")
return child.execute()
}
// Check if compiled code has a too large function
if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
logInfo(s"Found too long generated codes and JIT optimization might not work: " +
s"the bytecode size ($maxCodeSize) is above the limit " +
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
child match {
// The fallback solution of batch file source scan still uses WholeStageCodegenExec
case f: FileSourceScanExec if f.supportsBatch => // do nothing
case _ => return child.execute()
}
}
val references = ctx.references.toArray
val durationMs = longMetric("pipelineTime")
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter))
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val v = buffer.hasNext
if (!v) durationMs += buffer.durationMs()
v
}
override def next: InternalRow = buffer.next()
}
}
} else {
// Right now, we support up to two input RDDs.
rdds.head.zipPartitions(rdds(1)) { (leftIter, rightIter) =>
Iterator((leftIter, rightIter))
// a small hack to obtain the correct partition index
}.mapPartitionsWithIndex { (index, zippedIter) =>
val (leftIter, rightIter) = zippedIter.next()
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(leftIter, rightIter))
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val v = buffer.hasNext
if (!v) durationMs += buffer.durationMs()
v
}
override def next: InternalRow = buffer.next()
}
}
}
}
ProjectExec.inputRDDs, child为FileSourceScanExec
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}
FileSourceScanExec.inputRDD
private lazy val inputRDD: RDD[InternalRow] = {
// Update metrics for taking effect in both code generation node and normal node.
updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
/**
* Create an RDD for non-bucketed reads.
* The bucketed variant of this function is [[createBucketedReadRDD]].
*
* @param readFile a function to read each (part of a) file.
* @param selectedPartitions Hive-style partition that are part of the read.
* @param fsRelation [[HadoopFsRelation]] associated with the read.
*/
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Array[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(
partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
}
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
FilePartition(
partitions.size,
currentFiles.toArray) // Copy to a new Array.
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
@transient private lazy val selectedPartitions: Array[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
metadataTime = timeTakenMs
ret
}.toArray
涉及方法:lookupCandidateFilesInMetadataTable、refresh、doRefresh、loadPartitionPathFiles等,其中在初始化new时会调用refresh 、doRefresh
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
* @param partitionFilters partition column filters
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// Look up candidate files names in the col-stats index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInMetadataTable(dataFilters) match {
case Success(opt) => opt
case Failure(e) =>
logError("Failed to lookup candidate files in File Index", e)
spark.sqlContext.getConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Fallback.value) match {
case DataSkippingFailureMode.Fallback.value => Option.empty
case DataSkippingFailureMode.Strict.value => throw new HoodieException(e);
}
}
logDebug(s"Overlapping candidate files from Column Stats Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table
// Filter in candidate files based on the col-stats index lookup
val candidateFiles = allFiles.filter(fileStatus =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
)
logInfo(s"Total files : ${allFiles.size}; " +
s"candidate files after data skipping: ${candidateFiles.size}; " +
s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
var totalFileSize = 0
var candidateFileSize = 0
val result = prunedPartitions.map { partition =>
val baseFileStatuses: Seq[FileStatus] =
cachedAllInputFileSlices.get(partition).asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
// Filter in candidate files based on the col-stats index lookup
val candidateFiles = baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles)
}
logInfo(s"Total base files: $totalFileSize; " +
s"candidate files after data skipping : $candidateFileSize; " +
s"skipping percent ${if (allFiles.nonEmpty && totalFileSize > 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
result
}
}
在doRefresh初始化的cachedAllInputFileSlices实现了获取Hudi文件列表的逻辑
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
queryInstant.map(instant ->
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
)
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
)
);
PR:[HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns 大家可以自己研究一下
这样基本上理清了spark.read.format(“hudi”).load 从Spark源码到Hudi源码的逻辑,限于能力和精力有些地方还没有完全搞清楚,不过只要搞懂整体流程,细节上在有需要时再慢慢研究。