分布式计算的精髓,在于如何把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。
Spark调度系统流程:
根据用户代码构建 DAG;
以 Shuffle 为边界切割 Stages;
基于 Stages 创建 TaskSets,并将 TaskSets 提交给 TaskScheduler 请求调度。
从 DAG 到 Stages 的拆分过程:以 Actions 算子为起点,从后 向前回溯 DAG,以 Shuffle 操作为边界去划分 Stages。
DAGScheduler 根据 Stage 内 RDD 的 partitions 属性创建分布式任务集合 TaskSet。TaskSet 包含一个又一个分布式任务 Task,RDD 有多少数据 分区,TaskSet 就包含多少个 Task。换句话说,Task 与 RDD 的分区,是一一对应的。
stageId、stageAttemptId 标记了 Task 与执行阶段 Stage 的所属关系;
taskBinary 则封装了隶属于这个执行阶段的用户代码;
partition RDD 数据分区;
locs 属性以字符串的形式记录了该任务倾向的计算节点或是 Executor ID。
获取集群内可用资源
在分发任务之前,调度系统得先判断哪些节点的计算资源空闲,然后再把任务分发过去。
对于集群中可用的计算资源,SchedulerBackend 用 ExecutorDataMap 来记录每一个计算节点中 Executors 的资源状态。ExecutorDataMap 是一种 HashMap,它的 Key 是标记 Executor 的字符串,Value 是ExecutorData Object,ExecutorData 用于封装 Executor 的资源状态,如 RPC 地址、主机地址、可用 CPU 核数和满配 CPU 核数等。
对内,SchedulerBackend 使用ExecutorDataMap 对Executor 做“资源画像”;
对外,SchedulerBackend 以 WorkerOffer 为粒度提供计算资源。WorkerOffer 封装了 Executor ID、主机地址和 CPU 核数,它用来表示一份可用于调度任务的空闲资源。
SchedulerBackend 与集群内所有 Executors 中的 ExecutorBackend 保持周期性通信, 双方通过LaunchedExecutor、RemoveExecutor、StatusUpdate 等来更新可用计算资源。
按照调度规则决定任务优先级,完成任务调度;
TaskScheduler 的调度策略分为两个层次:
不同 Stages 之间的调度优先级,TaskScheduler 提供了两种模式 : FIFO(先到先得)和 FAIR(公平调度)
FIFO调度:先到先得
FAIR调度:哪个 Stages 优先被调度,取决于用户在配置文件 fairscheduler.xml 中的定义。
Stages 内不同任务之间的调度优先级
WorkerOffer 封装了 Executor ID、主机地址和 CPU 核数,用来表示一份可用于调度任务的空闲资源。
当 TaskScheduler 接收到来自 SchedulerBackend 的 WorkerOffer 后,TaskScheduler会优先挑选那些满足本地性级别要求的任务进行分发。本地性级别有 4 种:Process local < Node local < Rack local < Any。
TaskScheduler 接收到 WorkerOffer 之后,优先调度本地性倾向为 PROCESS_LOCAL 的 Task,而 NODE_LOCAL 次之,RACK_LOCAL 为再次,最后是 ANY。
**Spark 调度系统的核心思想,是“数据不动、代码动”。**在任务调度的过程 中,为了完成分布式计算,Spark 倾向于让数据待在原地、保持不动,而把计算任务(代码)调度、分发到数据所在的地方,从而消除数据分发引入的性能隐患。相比分发 数据,分发代码要轻量得多。
DAGScheduler 划分 Stages、创建分布式任务的过程中,会为每一个任务指定本地性级别,本地性级别中会记录该任务有意向的计算节点地址,甚至是 Executor 进程 ID。换句话说,任务自带调度意愿,它通过本地性级别告诉 TaskScheduler 自己更乐意被调度到哪里去。
对于给定的 WorkerOffer,TaskScheduler 是按照任务的本地倾向 性,来遴选出 TaskSet 中适合调度的 Tasks。
Task 与 RDD 的 partitions 是一一对应的,在创建 Task 的过程中, DAGScheduler 会根据数据分区的物理地址,来为 Task 设置 locs 属性。locs 属性记录了数据分区所在的计算节点、甚至是 Executor 进程 ID。
举例来说,当调用 textFile API 从 HDFS 文件系统中读取源文件时,Spark 会根据 HDFS NameNode 当中记录的元数据,获取数据分区的存储地址,例如 node0:/rootPath/partition0-replica0,node1:/rootPath/partition0-replica1 和 node2:/rootPath/partition0-replica2。 那么,DAGScheduler 在为该数据分区创建 Task0 的时候,会把这些地址中的计算节点记录到 Task0 的 locs 属性。 当TaskScheduler 需要调度 Task0 这个分布式任务,根据 Task0 的 locs 属性,它就知道:“Task0 所需处理的数据分区,在节点 node0、node1、node2上存有副本,如果 WorkOffer 是来自这 3 个节点的计算资源,那对 Task0 来说就是投其所好”。
每个任务都是自带本地倾向性的,换句话说,每个任务都有自己的“调度意愿”。
总的来说,TaskScheduler 根据本地性级别遴选出待计算任务之后,先对这些任务进行序列化。然后,交给 SchedulerBackend,SchedulerBackend 根据 ExecutorData 中记录的 RPC 地址和主机地址,再将序列化的任务通过网络分发到目的主机的 Executor 中去。 最后,Executor 接收到任务之后,把任务交由内置的线程池,线程池中的多线程则并发地在不同数据分片之上执行任务中封装的数据处理函数,从而实现分布式计算。
ExecutorBackend 获取到tasks,分发给Executors 线程池,每个CPU线程负责处理一个 Task。
每当 Task 处理完毕,这些线程便会通过 ExecutorBackend,向 Driver 端的 SchedulerBackend 发送StatusUpdate 事件,告知 Task 执行状态。接下来, TaskScheduler 与 SchedulerBackend 通过接力的方式,最终把状态汇报给 DAGScheduler。
计算向数据移动:Spark 调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。
任务调度分为如下 5 个步骤:
DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行 Stages,然后为每个 Stage 创建任务集 TaskSet。
SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。
与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。
对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。
被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。