• SPARK中关于HighlyCompressedMapStatus的说明(会造成运行时的数据不精确)


    背景

    本文基于spark 3.1.2

    分析

    HighlyCompressedMapStatus 是属于MapStatus的子类,也就是在每个ShuffleMapTask写完数据以后,会返回给Driver端的结果,以便记录该次MapTask的任务情况,以及shuffle数据在整个集群的分布情况。

    MapStatus在Map任务怎么被写入的

    在每个ShuffleMapTask结束以后,都会生成MapStatus的数据结构,如下:

    /** Write a bunch of records to this task's output */
      override def write(records: Iterator[Product2[K, V]]): Unit = {
        mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
    
    
    • 1
    • 2
    • 3
    • 4
                                    ||
                                    \/
    
    • 1
    • 2
     def apply(
          loc: BlockManagerId,
          uncompressedSizes: Array[Long],
          mapTaskId: Long): MapStatus = {
        if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
          HighlyCompressedMapStatus(loc, uncompressedSizes, mapTaskId)
        } else {
          new CompressedMapStatus(loc, uncompressedSizes, mapTaskId)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    其中minPartitionsToUseHighlyCompressMapStatus也就是spark.shuffle.minNumPartitionsToHighlyCompress,默认是2000,只要超过这个阈值,就会生成HighlyCompressedMapStatus实例,否则就是CompressedMapStatus
    这是因为MapStatus的存储会占用driver端太多的内存,这在下文中会解释到。
    我们转到HighlyCompressedMapStatus构造方法中,如下:

    def apply(
          loc: BlockManagerId,
          uncompressedSizes: Array[Long],
          mapTaskId: Long): HighlyCompressedMapStatus = {
            ...
            val threshold = Option(SparkEnv.get)
          .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
          .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
           val avgSize = if (numSmallBlocks > 0) {
          totalSmallBlockSize / numSmallBlocks
        } else {
          0
        }
        emptyBlocks.trim()
        emptyBlocks.runOptimize()
        new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
          hugeBlockSizes, mapTaskId)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    这里有个阈值判断SHUFFLE_ACCURATE_BLOCK_THRESHOLD,也就是spark.shuffle.accurateBlockThreshold,这是来map端任务记录精确分区的阈值,如果大于该阈值,则会记录真实的reduce数据的分区大小,如果小于则记录的是每个reduce大小的平均值(这导致会在reduce获取运行时的数据大小信息时数据不准确的问题,从而导致AQE的效果不是很理想)。

    MapStatus在Driver端怎么被记录的

    之前说过为啥下游reduce的个数超过2000时,就会生成压缩的MapStatus实例,这是跟MapStatus在Driver端的存储有关。
    对于MapStatus的信息都会通过ExecutorBackendstatusUpdate方法传给driver,
    最终是DAGScheduler的方法片段:

    mapOutputTracker.registerMapOutput(
                    shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
    
    • 1
    • 2

    被mapOutputTracker所注册,保存在shuffleStatuses Map结构中,如果说CompressedMapStatus类型的数据结构的化,回导致CompressedMapStatus记录的信息很多,最终会导致DriverOOM问题。
    CompressedMapStatus的数据结构如下:

    private[spark] class CompressedMapStatus(
        private[this] var loc: BlockManagerId,
        private[this] var compressedSizes: Array[Byte],
        private[this] var _mapTaskId: Long)
      extends MapStatus with Externalizable
    
    • 1
    • 2
    • 3
    • 4
    • 5

    HighlyCompressedMapStatus采用的是对于数据量小的reduce分区数据采用公用平均值的方式,这在一定程度上能够减缓Driver OOM的概率,
    HighlyCompressedMapStatus的数据结构如下:

    
    private[spark] class HighlyCompressedMapStatus private (
        private[this] var loc: BlockManagerId,
        private[this] var numNonEmptyBlocks: Int,
        private[this] var emptyBlocks: RoaringBitmap,
        private[this] var avgSize: Long,
        private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte],
        private[this] var _mapTaskId: Long)
      extends MapStatus with Externalizable
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    MapStatus在Driver端怎么被使用的

    我们知道MapStatus的信息最终会被保存在MapOutputTrackerMaster中,这样下游的reduce任务如果需要获取上游MapTask的运行情况的时候就会最终调用到MapOutputTrackerMaster对应的方法中,最终会调用到MapStatusgetSizeForBlock方法,(当然AQE中的数据倾斜处理规则OptimizeSkewedJoin也会用到)
    对于getSizeForBlock方法的实现对于不同的子类行为是不一样的:

    • CompressedMapStatus
     override def getSizeForBlock(reduceId: Int): Long = {
        MapStatus.decompressSize(compressedSizes(reduceId))
      }
    
    • 1
    • 2
    • 3

    CompressedMapStatus 直接返回每个reduce的真实数据大小

    • HighlyCompressedMapStatus
     override def getSizeForBlock(reduceId: Int): Long = {
        assert(hugeBlockSizes != null)
        if (emptyBlocks.contains(reduceId)) {
          0
        } else {
          hugeBlockSizes.get(reduceId) match {
            case Some(size) => MapStatus.decompressSize(size)
            case None => avgSize
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    HighlyCompressedMapStatus对于reduce数据量比较小的,就直接返回一个平均值(这会对数据统计造成误导),对于数据量比较大的reduce分区(spark.shuffle.accurateBlockThreshold参数控制的,默认100MB)就会返回真实值。

    为什么HighlyCompressedMapStatus更加节约存储

    因为对于小数据量的分区,只需要存储一个平均值,而不像 CompressedMapStatus那样都会存储具体的数值(用map存储)

  • 相关阅读:
    Eureka注册中心
    Servlet规范之注解与可插拔性
    springcloud集成链路追踪组件skywalking
    java毕业设计《组成原理》课程智能组卷mybatis+源码+调试部署+系统+数据库+lw
    量子笔记:单比特量子门、泡利矩阵
    go-zero单体服务使用泛型简化注册Handler路由
    【C++ 科学计算】矩阵变量类型总结
    spring事务传播的Propagation.REQUIRES_NEW以及NEVER MANDATORY验证,及其失效的诡异问题
    vscode中编码错误
    【Node.js】json-server
  • 原文地址:https://blog.csdn.net/monkeyboy_tech/article/details/127722832