object TestRDDDependency {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("data/word.txt")
println(rdd1.toDebugString) // 打印血缘关系
println(rdd1.dependencies) // 打印依赖关系
println("----------------------")
val rdd2 = rdd1.flatMap(_.split(" "))
println(rdd2.toDebugString) // 打印血缘关系
println(rdd2.dependencies) // 打印依赖关系
println("----------------------")
val rdd3 = rdd2.map((_, 1))
println(rdd3.toDebugString) // 打印血缘关系
println(rdd3.dependencies) // 打印依赖关系
println("----------------------")
val rdd4 = rdd3.reduceByKey(_ + _)
println(rdd4.toDebugString) // 打印血缘关系
println(rdd4.dependencies) // 打印依赖关系
println("----------------------")
}
}
窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false
) extends Dependency[Product2[K, V]]
宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
阶段划分源码:
/**
结论:
1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段
2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage)
3.阶段的数量 = shuffle 依赖数量 + 1
*/
// 行动算子触发作业执行
rdd.collect()
// collect() 深入底层
dagScheduler.runJob()
// runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted()
// handleJobSubmitted() 中的阶段划分
try {
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
...
}
// createResultStage() 方法
private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 至少存在一个 resultStage 阶段
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
// getOrCreateParentStages(),判断是否有上一阶段
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
getShuffleDependencies(rdd).map { shuffleDep =>
// 为 shuffle 依赖创建 ShuffleMapStage 阶段
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
RDD 任务划分中间分为:Application、Job、Stage 和 Task
Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系
任务划分源码:
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties,
Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
}
//
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
//
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
}