• Spark lazy list files 的实现


    背景

    对于 Spark partition table, 在生成 HadoopFsRelation 时,如果 partitionKeyFilters 或者 subqueryFilters 非空的时候,HadoopFsRelation 的 location: FileIndex 属性为 LazyFileIndex, 在最终 FileSourceScanExec 调用 listFiles 之前才将 LazyFileIndex 转换成 InMemoryFileIndex。
    但是如果 Spark partition table 的 partition filter 含有 subquery, 此时 Spark 认为无法下推,所以会跳过使用 LazyFileIndex, 在 listFiles 时 prunePartitions 也不会过滤掉任务分区,导致做了很多没用的操作。

    如果分区过滤条件是 subquery, 默认会把所有的分区全部拿回来,然后再进行分区过滤。

    生成 HadoopFsRelation

    在 DataSourceStrategy 的 FindDataSourceTable Rule 会尝试解析 'UnresolvedCatalogRelation 中的 CatalogTable。在 DataSource.resolveRelation() 方法尝试将 table Node 转换成 HadoopFsRelation。 如果 DataSource 传入的 table 是分区表,fileCatalog 使用 CatalogFileIndex 以便于后面分区裁剪。否则,直接遍历访问DataSource 中传入的Paths, 生成 InMemoryFileIndex

    // DataSource.resolveRelation()
          case (format: FileFormat, _) =>
            val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
              catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
              catalogTable.get.partitionColumnNames.nonEmpty
            val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
              val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
              val index = new CatalogFileIndex(
                sparkSession,
                catalogTable.get,
                catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
              (index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
            } else {
              val globbedPaths = checkAndGlobPathIfNecessary(
                checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
              val index = createInMemoryFileIndex(globbedPaths)
              val (resultDataSchema, resultPartitionSchema) =
                getOrInferFileFormatSchema(format, () => index)
              (index, resultDataSchema, resultPartitionSchema)
            }
    
            HadoopFsRelation(
              fileCatalog,
              partitionSchema = partitionSchema,
              dataSchema = dataSchema.asNullable,
              bucketSpec = bucketSpec,
              format,
              caseInsensitiveOptions)(sparkSession)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    对 HadoopFsRelation 进行分区裁剪

    • 社区版本 code 在 PruneFileSourcePartitions rule 中对 HadoopFsRelation 进行分区裁剪。
    • 因为分区表生成的是 CatalogFileIndex, 通过 Plan 中的带有分区字段的过滤条件, 用来进行分区裁剪 val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters)
    • 如果过滤条件中含有 subquery, 这种过滤条件不能下推到 Hive metastore,导致返回非常多 partitions(对应多个rootPaths)。
    • 将裁减后的分区条件包装成 InMemoryFileIndex 。InMemoryFileIndex 实例化的时候执行 refresh0() 方法,获取 rootPaths 下所有的文件信息。因为 files 过多会导致 plan 解析变慢,同时占用大量 Driver 内存。
    // InMemoryFileIndex
      private def refresh0(): Unit = {
        val files = listLeafFiles(rootPaths)
        cachedLeafFiles =
          new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
        cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
        cachedPartitionSpec = null
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    LazyFileIndex 优化

    • Carmel 版本进行了优化,CatalogFileIndex.lazyFilterPartitions(filters: Seq[Expression]) 返回的是 PartitioningAwareFileIndex 子类 LazyFileIndex ,这个类不会发生实际的 HDFS 遍历。
    • 在 FileSourceStrategy 中会将 HadoopFsRelation 转换为 FileSourceScanExec 。在 FileSourceScanExec 中有三个 lazy 变量: selectedPartitions, dynamicallySelectedPartitions, inputRDD
      • selectedPartitions : 通过 InMemoryFileIndex.listFiles() 返回选中的分区
      • dynamicallySelectedPartitions : 使用不能下推的分区过滤条件对 selectedPartitions 再过滤一次
      • inputRDD : 处理最终确定的分区,生成 FileScanRDD
    • 我们在访问 selectedPartitions 的时候,自动将 LazyFileIndex 替换成 InMemoryFileIndex 并进行 HDFS 遍历。

    FileIndex 类继承关系

    FileIndex
        CatalogFileIndex
            def lazyFilterPartitions(filters: Seq[Expression]): PartitioningAwareFileIndex
    
        PartitioningAwareFileIndex
            LazyFileIndex
                def createFileIndex(predicates: Seq[Expression]): InMemoryFileIndex
    
            InMemoryFileIndex
                def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
    
            MetadataLogFileIndex // 非重点,先忽略
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    一些讨论

    • 优化点是对文件目录的遍历放到了 plan analyze 之后, 需关注文件在 hdfs 上的遍历耗时,还有遍历的文件结果内存占用。
    • TODO …
  • 相关阅读:
    一种通过篡改特定代码数据修复嵌入式产品BUG的方法
    滚动更新和回滚部署在 Kubernetes 中的工作原理
    MongoDB学习大纲
    (译)TDD(测试驱动开发)的5个步骤
    2. java基础
    国考省考行测:问题型材料主旨分析,有问题有对策,主旨是对策,有问题无对策,要合理引申对策
    【C++题解】1741 - 求出1~n中满足条件的数的个数和总和?
    分布式限流:Redis
    【C语言】简单实现扫雷游戏
    Vue跑马灯简单案列
  • 原文地址:https://blog.csdn.net/wankunde/article/details/125989763