• 详解 Spark 编程之 RDD 依赖关系


    一、依赖与血缘关系

    在这里插入图片描述

    • 依赖:两个相邻 RDD 之间的关系
    • 血缘关系:多个连续的 RDD 的依赖
    • 由于 RDD 不会保存数据,为了提高容错性,每个 RDD 都会保存自己的血缘关系,一旦某个转换过程出现错误,可以根据血缘关系重新从数据源开始读取计算
    object TestRDDDependency {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")
            val sc = new SparkContext(conf)
            
            val rdd1 = sc.textFile("data/word.txt")
            println(rdd1.toDebugString) // 打印血缘关系
            println(rdd1.dependencies) // 打印依赖关系
    		println("----------------------")
            
            val rdd2 = rdd1.flatMap(_.split(" "))
            println(rdd2.toDebugString) // 打印血缘关系
            println(rdd2.dependencies) // 打印依赖关系
    		println("----------------------")
            
            val rdd3 = rdd2.map((_, 1))
            println(rdd3.toDebugString) // 打印血缘关系
            println(rdd3.dependencies) // 打印依赖关系
    		println("----------------------")
            
            val rdd4 = rdd3.reduceByKey(_ + _)
            println(rdd4.toDebugString) // 打印血缘关系
            println(rdd4.dependencies) // 打印依赖关系
    		println("----------------------")
            
        }
    }
    

    二、宽窄依赖

    • 窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女

      class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
      
    • 宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生

      class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
          @transient private val _rdd: RDD[_ <: Product2[K, V]],
          val partitioner: Partitioner,
          val serializer: Serializer = SparkEnv.get.serializer,
          val keyOrdering: Option[Ordering[K]] = None,
          val aggregator: Option[Aggregator[K, V, C]] = None,
          val mapSideCombine: Boolean = false
      ) extends Dependency[Product2[K, V]] 
      

    三、阶段划分

    • 窄依赖由于上游和下游的 RDD 分区是一对一的,所以整个的执行过程是不受其它分区执行结果的影响,每个分区只需要一个 task 就可以完成计算任务

    在这里插入图片描述

    • 宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
      在这里插入图片描述

    • 阶段划分源码:

      /**
      	结论:
      		1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段
      		2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage)
      		3.阶段的数量 = shuffle 依赖数量 + 1
      */
      // 行动算子触发作业执行
      rdd.collect()
      
      // collect() 深入底层
      dagScheduler.runJob()
      
      // runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted()
      // handleJobSubmitted() 中的阶段划分
      try {
      	finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
      } catch {
      	...
      }
      
      // createResultStage() 方法
      private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {
          val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段
          val id = nextStageId.getAndIncrement()
          val stage = new  ResultStage(id, rdd, func, partitions, parents, jobId,  callSite) // 至少存在一个 resultStage 阶段
          stageIdToStage(id) = stage
          updateJobIdStageIdMaps(jobId, stage)
          stage
      }
      
      // getOrCreateParentStages(),判断是否有上一阶段
      private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
          // getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
          getShuffleDependencies(rdd).map { shuffleDep =>
              // 为 shuffle 依赖创建 ShuffleMapStage 阶段
          	getOrCreateShuffleMapStage(shuffleDep, firstJobId)
          }.toList
      }
      
      // getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
      private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
          val parents = new HashSet[ShuffleDependency[_, _, _]]
          val visited = new HashSet[RDD[_]]
          val waitingForVisit = new Stack[RDD[_]]
          waitingForVisit.push(rdd)
          while (waitingForVisit.nonEmpty) {
              val toVisit = waitingForVisit.pop()
              if (!visited(toVisit)) {
                  visited += toVisit
                  toVisit.dependencies.foreach {
                      case shuffleDep: ShuffleDependency[_, _, _] =>
                      parents += shuffleDep
                      case dependency =>
                      waitingForVisit.push(dependency.rdd)
                  }
              }
          }
          parents
      }
      

    四、任务划分

    • RDD 任务划分中间分为:Application、Job、Stage 和 Task

      • Application:初始化一个 SparkContext 即生成一个 Application
      • Job:一个 Action 算子就会生成一个 Job
      • Stage:Stage 等于宽依赖 (ShuffleDependency) 的个数加 1
      • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
    • Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系

    • 任务划分源码:

      val tasks: Seq[Task[_]] = try {
          stage match {
              case stage: ShuffleMapStage => 
                  partitionsToCompute.map { id =>
                      val locs = taskIdToLocations(id)
                      val part = stage.rdd.partitions(id)
                      new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                      taskBinary,  part,  locs,  stage.latestInfo.taskMetrics,  properties, 
                      Option(jobId),
                      Option(sc.applicationId), sc.applicationAttemptId)
              	}
              case stage: ResultStage => 
                  partitionsToCompute.map { id =>
                      val p: Int = stage.partitions(id)
                      val part = stage.rdd.partitions(p)
                      val locs = taskIdToLocations(id)
                      new ResultTask(stage.id, stage.latestInfo.attemptId,
                      taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
                      Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
              	}
          }
      }
      
      //
      val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
      
      //
      override def findMissingPartitions(): Seq[Int] = {
          mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
      }
      
  • 相关阅读:
    二分法寻找数组元素
    报错——warning: ignoring JAVA_HOME=/home/jdk/jdk1.8.0_281; using bundled JDK
    流形上的预积分(上)
    Gradle系列——常用指令,修改gradle源,Wrapper包装器(源于文档7.5版本,SpringBoot使用)day1-2
    Python 图_系列之基于<链接表>实现无向图最短路径搜索
    pagination分页插件的getResult明明有数据,但是getTotal方法为0
    搭载经纬恒润12V BMS的路特斯ELETRE开始量产交付
    【问题定位】通过看Mybatis源码解决系统问题
    react native使用TS实现路由
    29【定时器和延时器】
  • 原文地址:https://blog.csdn.net/weixin_44480009/article/details/139373988