对于 Spark partition table, 在生成 HadoopFsRelation 时,如果 partitionKeyFilters 或者 subqueryFilters 非空的时候,HadoopFsRelation 的 location: FileIndex 属性为 LazyFileIndex, 在最终 FileSourceScanExec 调用 listFiles 之前才将 LazyFileIndex 转换成 InMemoryFileIndex。
但是如果 Spark partition table 的 partition filter 含有 subquery, 此时 Spark 认为无法下推,所以会跳过使用 LazyFileIndex, 在 listFiles 时 prunePartitions 也不会过滤掉任务分区,导致做了很多没用的操作。
如果分区过滤条件是 subquery, 默认会把所有的分区全部拿回来,然后再进行分区过滤。
在 DataSourceStrategy 的 FindDataSourceTable Rule 会尝试解析 'UnresolvedCatalogRelation 中的 CatalogTable。在 DataSource.resolveRelation() 方法尝试将 table Node 转换成 HadoopFsRelation。 如果 DataSource 传入的 table 是分区表,fileCatalog 使用 CatalogFileIndex 以便于后面分区裁剪。否则,直接遍历访问DataSource 中传入的Paths, 生成 InMemoryFileIndex
// DataSource.resolveRelation()
case (format: FileFormat, _) =>
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
(index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
} else {
val globbedPaths = checkAndGlobPathIfNecessary(
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
}
HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters)// InMemoryFileIndex
private def refresh0(): Unit = {
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedPartitionSpec = null
}
CatalogFileIndex.lazyFilterPartitions(filters: Seq[Expression]) 返回的是 PartitioningAwareFileIndex 子类 LazyFileIndex ,这个类不会发生实际的 HDFS 遍历。FileIndex
CatalogFileIndex
def lazyFilterPartitions(filters: Seq[Expression]): PartitioningAwareFileIndex
PartitioningAwareFileIndex
LazyFileIndex
def createFileIndex(predicates: Seq[Expression]): InMemoryFileIndex
InMemoryFileIndex
def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
MetadataLogFileIndex // 非重点,先忽略