
Task的个数 =Paration的个数 = 并行度
job串行,Stage串行加并行,task并行;
app > job > stage > task
RDD 之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。
父 RDD 和子 RDD partition 之间的关系是一对一的。或者父 RDD 和子 RDD的partition 关系是多对一的。不会有 shuffle 的产生。
shuffle会产生数据重新分布
父 RDD 与子 RDD partition 之间的关系是一对多。会有 shuffle 的产生。


Spark 任务会根据 RDD 之间的依赖关系,形成一个 DAG 有向无环图,DAG会提交给 DAGScheduler,DAGScheduler 会把 DAG 划分相互依赖的多个stage,划分 stage 的依据就是 RDD 之间的宽窄依赖。遇到宽依赖就划分stage,每个 stage 包含一个或多个 task 任务。然后将这些 task 以 taskSet的形式提交给 TaskScheduler 运行。stage是由一组并行的 task 组成。
切割规则:从后往前,遇到宽依赖就切割 stage。

stage 计算模式是pipeline 管道计算模式。pipeline 只是一种计算思想、模式。

测试验证 pipeline 计算模式:
package com.shsxt.scalaTest.core.transform_operator
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Pipeline_Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("pipeline")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3))
val rdd2: RDD[Int] = rdd.map(x => {
println("map---------------", x)
x
})
val rdd3: RDD[Boolean] = rdd2.map(x => {
println("filter-------------", x)
true
})
rdd3.collect()
sc.stop()
}
}
(map---------------,1)
(filter-------------,1)
(map---------------,2)
(filter-------------,2)
(map---------------,3)
(filter-------------,3)
注意点:
- 对
RDD进行持久化shuffle write的时候
Stage 的 task 并行度是由 stage 的最后一个 RDD 的分区数来决定的 。RDD 的分区数?
例如:
reduceByKey(XXX,3),GroupByKey(4)

启动集群后,Worker 节点会向 Master 节点汇报资源情况,Master 掌握了集群资源情况。当 Spark 提交一个 Application 后,根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图。任务提交后,Spark 会在Driver 端创建两个对象:DAGScheduler 和 TaskScheduler,DAGScheduler是任务调度的高层调度器,是一个对象。
DAGScheduler 的主要作用就是将DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage,然后将这些Stage 以 TaskSet 的形式提交给 TaskSchedule(TaskScheduler 是任务调度的底层调度器,这里 TaskSet 其实就是一个集合,里面封装的就是一个个的 task 任务,也就是 stage 中的并行度 task 任务)。
TaskSchedule 会遍历TaskSet 集合,拿到每个 task 后会将 task 发送到计算节点 Executor 中去执行(其实就是发送到 Executor 中的线程池 ThreadPool 去执行)。
task 在Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由 TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,默认重试 3 次。如果重试 3 次依然失败,那么这个 task 所在的 stage 就失败了。
stage 失败了则由DAGScheduler 来负责重试,重新发送 TaskSet 到TaskSchdeuler,Stage 默认重试 4 次。如果重试 4 次以后依然失败,那么这个 job 就失败了。job 失败了,Application 就失败了。
TaskScheduler 不仅能重试失败的 task,还会重试 straggling(落后,缓慢)task(也就是执行速度比其他 task 慢太多的 task)。如果有运行缓慢的task ,那么 TaskScheduler 会启动一个新的 task 来与这个运行缓慢的 task执行相同的处理逻辑。两个 task 哪个先执行完,就以哪个 task 的执行结果为准。这就是 Spark 的推测执行机制。在 Spark 中推测执行默认是关闭的。推测执行可以通过 spark.speculation 属性来配置。
注意:
ETL 类型(抽取、转换、加载)要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。 下面展示standalone-client模式下的资源调度与任务调度:

资源请求简单图:

资源调度 Master 路径:
路径:spark-2.2.1/core/src/main/scala/org.apache.spark/deploy/master/Master.scala
提交应用程序,submit 的路径:
路径:spark-2.2.1/core/src/main/scala/org.apache.spark/deploy/SparkSubmit.scala
总结:
Executor 在集群中分散启动,有利于 task 计算的数据本地化。--executor-cores 选项),每一个 Worker 为当前的Application 启动一个 Executor,这个Executor 会使用这个 Worker 的所有的 cores 和 1G 内存。Worker 上启动多个 Executor,提交 Application 的时候要加--executor-cores 这个选项。--total-executor-cores,一个 Application 会使用 Spark 集群中所有的 cores。结论演示:
默认情况每个 worker 为当前的 Application 启动一个 Executor,这 个 Executor 使用集群中所有的 cores 和 1G 内存。
./spark-submit
--master spark://node1:7077
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.2.1.jar
10000
workr 上启动多个 Executor,设置--executor-cores 参数指定每个executor 使用的 core 数量。
./spark-submit
--master spark://node1:7077
--executor-cores 1
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.2.1.jar
10000
内存不足的情况下启动 core 的情况。Spark 启动是不仅看 core 配置参数,也要看配置的 core 的内存是否够用。
./spark-submit
--master spark://node1:7077
--executor-cores 1
--executor-memory 3g
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.2.1.jar
10000
--total-executor-cores 集群中共使用多少 cores
注意:一个进程不能让集群多个节点共同启动。
./spark-submit
--master spark://node1:7077
--executor-cores 1
--executor-memory 2g
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.2.1.jar
10000
Action 算子开始分析,任务调度可以从一个 Action 类算子开始。因为 Action 类算子会触发一个 job 的执行。
划分 stage,以 taskSet 形式提交任务。DAGScheduler 类中 getMessingParentStages()方法是切割 job 划分stage 。 可以结合以下这张图来分析:

在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的 task 执行完成后,才会释放这部分资源。
Application 执行之前,所有的资源都申请完毕,每一个task 直接使用资源就可以了,不需要 task 在执行前自己去申请资源,task 启动就快了,task 执行快了,stage 执行就快了,job 就快了,application 执行就快了。task 执行完成才会释放资源,集群的资源无法充分利用。 Application 执行之前不需要先去申请资源,而是直接执行,让 job中的每一个 task 在执行前自己去申请资源,task 执行完成就释放资源。
task 自己去申请资源,task 启动变慢,Application的运行就响应的变慢了。因为
MR本身是细粒度的,所以优化的最简单方式就是改成粗粒度
mapred.job.reuse.jvm.num.tasks=n;设置这个n为task插槽的个数。
广播变量理解图如下。在没有使用广播变量时,list需要发送到多个exexutor上,此时,有可能多个executor在同一台机器上。使用广播变量后,只需要将变量缓存到每一台机器上即可。

广播变量使用如下:
package com.shsxt.scala_Test.core.test
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BroadCastTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc: SparkContext = new SparkContext(conf)
val list: List[String] = List("hello Spark")
//显示创建
val broadCast: Broadcast[List[String]] = sc.broadcast(list)
val lineRDD: RDD[String] = sc.textFile("data/word.txt")
lineRDD.filter(x => {
//广播变量值获取
val list: List[String] = broadCast.value
list.contains(x)
}).foreach {
println
}
sc.stop()
}
}
hello Spark
注意事项:
Driver 端定义,不能在 Executor 端定义。Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。 累加器理解图如下:

累加器使用如下:
package com.shsxt.scala_Test.core.test
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc: SparkContext = new SparkContext(conf)
val accumulator: LongAccumulator = sc.longAccumulator
//
sc.textFile("data/word.txt", 2).foreach { x => {
accumulator.add(1)
}
}
println("accumulator.value : " + accumulator.value)
sc.stop()
}
}
accumulator.value : 4
注意事项:
Driver 端定义赋初始值,累加器只能在 Driver 端读取, 在 Excutor 端更新。 reduceByKey 会将上一个 RDD 中的每一个 key 对应的所有 value 聚合成一个 value,然后生成一个新的 RDD,元素类型是对的形式,这样每一个 key 对应一个聚合起来的 value。
key 对应的 value 不一定都是在一个 partition中,也不太可能在同一个节点上,因为 RDD 是分布式的弹性的数据集,RDD 的 partition 极有可能分布在各个节点上。如何聚合?
Shuffle Write:上一个 stage 的每个 map task 就必须保证将自己处理的当前分区的数据相同的 key 写入一个分区文件中,可能会写入多个不同的分区文件中。Shuffle Read:reduce task 就会从上一个 stage 的所有 task 所在的机器上寻找属于己的那些分区文件,这样就可以保证每一个 key 所对应的 value 都会汇聚到同一个节点上去处理和聚合。 Spark 中有两种 Shuffle 类型,HashShuffle 和 SortShuffle,Spark1.2之前是 HashShuffle,Spark1.2之后引入 SortShuffle 。
普通机制示意图如下:

执行流程:
map task 将不同结果写到不同的 buffer 中,每个buffer 的大小为 32K。buffer 起到数据缓存的作用。buffer 文件最后对应一个磁盘小文件。reduce task 来拉取对应的磁盘小文件。总结:
.map task 的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去。ReduceTask 会去 Map 端拉取相应的磁盘小文件。M(map task 的个数)*R(reduce task 的个数)存在的问题:产生的磁盘小文件过多,会导致以下问题:
Shuffle Write 过程中会产生很多写磁盘小文件的对象。Shuffle Read 过程中会产生很多读取磁盘小文件的对象。JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存的话,就会 OOM。shuffle file cannot find 。由于这个错误导致的 task 失 败,TaskScheduler 不负责重试,由 DAGScheduler 负责重试 Stage。 合并机制示意图如下:

总结:产生磁盘小文件的个数:C(core 的个数)*R(reduce 的个数)
普通机制示意图:

执行流程:
map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是 5Mshuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过 5M 时,比如现在内存结构中的数据为 5.01M,那么他会申请 5.01*2-5=5.02M 内存给内存数据结构。map task 执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。reduce task 去 map 端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。总结:
2*M(map task 的个数) bypass 机制示意图:

总结:
bypass 运行机制的触发条件如下:
shuffle reduce task 的数量小于spark.shuffle.sort.bypassMergeThreshold 的参数值。这个值默认是 200。map 端的预聚合,比如:groupBykey,join产生的磁盘小文件为:2*M(map task 的个数)
MapOutputTracker
MapOutputTracker 是 Spark 架构中的一个模块,是一个主从架构。管理磁盘小文件的地址。
MapOutputTrackerMaster 是主对象,存在于 Driver 中。MapOutputTrackerWorker 是从对象,存在于 Excutor 中。BlockManager
BlockManager 块管理者,是 Spark 架构中的一个模块,也是一个主从架构。
BlockManagerMaster,主对象,存在于 Driver 中。BlockManagerMaster 会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知 BlockManagerSlave 传输或者删除数据。BlockManagerWorker,从对象,存在于 Excutor 中。BlockManagerWorker 会与 BlockManagerWorker 之间通信。无论在 Driver 端的 BlockManager 还是在 Excutor 端的BlockManager 都含有四个对象:
DiskStore:负责磁盘的管理。MemoryStore:负责内存的管理。ConnectionManager:负责连接其他的BlockManagerWorker。BlockTransferService:负责数据的传输。Shuffle 文件寻址图

Shuffle 文件寻址流程
map task 执行完成后,会将 task 的执行情况和磁盘小文件的地址封装到 MpStatus 对象中,通过MapOutputTrackerWorker 对象向 Driver 中的MapOutputTrackerMaster 汇报。map task 执行完毕后,Driver 中就掌握了所有的磁盘小文件的地址。reduce task 执行之前,会通过Excutor 中MapOutPutTrackerWorker 向 Driver 端的MapOutputTrackerMaster 获取磁盘小文件的地址。BlockManager 中的ConnectionManager 连接数据所在节点上的ConnectionManager,然后通过 BlockTransferService 进行数据的传输。BlockTransferService 默认启动 5 个 task 去节点拉取数据。默认情况下,5 个 task 拉取数据量不能超过 48M。 SparkShuffle 调优配置项如何使用?
new SparkConf().set(“spark.shuffle.file.buffer”,”64”)
spark-submit --conf spark.shuffle.file.buffer=64 –conf ….
Shuffle 参数调优,可调整参数介绍:
spark.shuffle.file.buffer:
spark.reducer.maxSizelnFlight:
spark.shuffle.io.maxRetries:
spark.shuffle.io.retryWait:
spark.shuffle.memoryFraction:
spark.shuffle.manager:
spark.shuffle.sort.bypassMergeThreshold:
spark.shuffle.consolidateFiles:
Spark 执行应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,Driver 负责创建 SparkContext 上下文,提交任务,task 的分发等。Executor 负责 task 的计算任务,并将结果返回给 Driver。同时需要为需要持久化的 RDD 提供储存。
Driver 端的内存管理比较简单,这里所说的 Spark内存管理针对 Executor 端的内存管理。Spark 内存管理分为静态内存管理和统一内存管理,Spark1.6 之前使用的是静态内存管理,Spark1.6 之后引入了统一内存管理。
静态内存管理中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。
统一内存管理与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。
Spar1.6 及 1.6 版本之后的版本默认使用的是统一内存管理。要想使用静态内存可以通过参数 spark.memory.useLegacyMode,设置为 true(默认为 false)使用静态内存管理。

2. 统一内存管理分布图
内存管理

3. reduce 中 OOM 如何处理?
- 减少每次拉取的数据量
- 提高 shuffle 聚合的内存比例
- 提高 Excutor 的总内存