17.1 Spark介绍
17.1.1 什么是Spark
-
概念理解
- 并行计算框架
- Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。 Spark 是加州大学伯克利分校的AMP实验室所开源的类 Hadoop MapReduce 的通用并行计算框架
- 任务的中间结果可以缓存在内存中,减少磁盘数据交互
- Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以缓存在内存中,从而不再需要读写 HDFS ,减少磁盘数据交互,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的算法
- Spark诞生于2009年美国加州伯克利分校的AMP实验室,基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序
-
Spark和Hadoop的比较
17.1.2 总体技术栈讲解
Spark 提供了 Sparkcore RDD 、 Spark SQL 、 Spark Streaming 、 Spark MLlib 、 Spark、GraphX等技术组件,可以一站式地完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。这就是 spark 一站式开发的特点
17.1.3 Spark 与 MR 的区别
-
MR
- MR只能做离线计算,如果实现复杂计算逻辑,一个MR搞不定,就需要将多个MR按照先后顺序连成一串,一个MR计算完成后会将计算结果写入到HDFS中,下一个MR将上一个MR的输出作为输入,这样就要频繁读写HDFS,网络IO和磁盘IO会成为性能瓶颈。从而导致效率低下。
-
Spark
- spark既可以做离线计算,有可以做实时计算,提供了抽象的数据集(RDD、Dataset、DataFrame、DStream)有高度封装的API,算子丰富,并且使用了更先进的DAG有向无环图调度思想,可以对执行计划优化后在执行,并且可以数据可以cache到内存中进行复用
-
归根结底最重要的区别还是在于 下图为 MapReduce 执行任务流程(MR基于磁盘,Spark基于内存)
- 都是分布式计算框架, Spark 计算中间结果基于内存缓存, MapReduce 基于 HDFS 存储。也正因此,Spark 处理数据的能力一般是 MR 的三到五倍以上, Spark 中除了基于内存计算这一个计算快的原因,
- 还有 DAG(DAGShecdule) 有向无环图来切分任务的执行先后顺序。
-
MR的数据读取流程
-
Spark的数据读取流程
17.1.4 Spark API
多种编程语言的支持: Scala,Java,Python,R,SQL 。
17.1.5 Spark运行模式
Local
- 多用于本地测试,如在 eclipse , idea 中写程序测试等
Standalone
- Standalone 是 Spark 自带的一个资源调度框架,它支持完全分布式
- 由Master负责资源的分配
Yarn
- Hadoop 生态圈里面的一个资源调度框架, Spark 也是可以基于 Yarn 来计算的
- 若要基于 Yarn 来进行资源调度,必须实现 AppalicationMaster 接口, Spark 实现了这个接口,所以可以基于 Yarn 来进行资源调度
- 由Yarn中的ResourceManager负责资源的分配
Mesos
- 资源调度框架
- 由Messos中的Messos Master负责资源管理
17.1.6 Spark总结
17.2 Spark Core
17.2.1 Partition
1. 概念
-
分区的原因
- 单节点处理不了大量的数据
- Spark RDD 是一种分布式的数据集,由于数据量很大,因此要它被切分并存储在各个结点的分区当中
-
RDD的理解
- Spark中,RDD(Resilient Distributed Dataset 弹性分布式数据集),是最基本的抽象数据集,
- 其中每个RDD由若干个Partition组成,不同的分区可能在集群的不同节点上
- 多个Partition是并行操作的
- 一个Partition对应一个Task
2. 分区方式
对于Spark Shuffle阶段的理解
Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)
-
HashPartitioner分区
- Hash分区
- HashPartitioner采用哈希的方式对键值对数据进行分区
- 其数据分区规则为
partitionId = Key.hashCode % numPartitions
- partitionId代表该Key对应的键值对数据应当分配到的Partition标识
- Key.hashCode表示该Key的哈希值
- numPartitions表示包含的Partition个数
- 可能会造成数据倾斜(数据量不均衡)
-
RangePartitioner分区
3. HDFS-Block与Spark-Partition
spark本身并没有提供分布式文件系统,因此spark的分析大多依赖于Hadoop的分布式文件系统HDFS
-
HDFS-Block
- hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件
-
Spark-Partition
- spark中的partition 是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition组成的
- partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的
-
BLock和Partition的联系
- Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。
- 如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235
-
Block和Partition的区别
| Block | Partition |
---|
位置 | 存储空间 | 计算空间 |
大小 | 固定 | 不固定 |
数据冗余 | 有冗余的、不会轻易丢失 | 没有冗余设计、丢失之后重新计算得到 |
17.2.2 RDD⭐️
RDD(Resilient Distributed Dataset) 弹性分布式数据集,Spark计算流程中的一个算子,类似于Storm中的Bolt,对数据进行计算,得到临时结果(partition)
1. RDD五大属性
- RDD是由一系列的partition组成的
- RDD中的每一个task运行在自己的Partition上
- RDD之间有一系列的依赖关系(依赖其他的RDD)
- 分区器是作用在(K,V)格式的RDD上
- RDD默认会寻找最佳的计算位置
2. RDD流程图
- 理解
- TextFile方法底层封装的是MR读取文件的方式,读取文件之前先进行split切片,默认split大小是一个block大小
- RDD 实际上不存储数据,这里方便理解,暂时理解为存储数据
- 真正存储数据的是partition,RDD不存储数据,RDD就是对这个partition的抽象
- 什么是 K,V格式的RDD ?
- 如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们就叫做 K,V格式的RDD
- 哪里体现 RDD 的弹性(容错)?
- partition 数量,大小没有限制,体现了 RDD 的弹性。
- RDD 之间依赖关系,可以基于上一个 RDD 重新计算出 RDD
- 哪里体现 RDD 的分布式?
- RDD 是由 Partition 组成, partition 是分布在不同节点上的。
- RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。
3. Lineage血统
利用内存加快数据加载,在其它的In-Memory类数据库或Cache类系统中也有实现。Spark的主要区别在于它采用血统(Lineage)来时实现分布式运算环境下的数据容错性(节点失效、数据丢失)问题
RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来
当这个RDD的部分分区数据丢失时,它可以通过Lineage找到丢失的父RDD的分区进行局部计算来恢复丢失的数据,这样可以节省资源提高运行效率
17.2.3 系统架构
系统架构图一:
系统架构图二:
1. Master (Standalone)
- 作用
- 资源管理的主节点(进程)
- Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程
- 接受用户的请求
2. Cluster Manager
-
作用
- 在集群上获取资源的外部服务
- 例如:standalone ; yarn ; mesos
-
在Standalone模式下
- Cluster Manager是Master(主节点),控制整个集群,监控Worker
-
在Yarn模式下
3. Worker计算节点(Standalone)
- 理解
- Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
- 资源管理的从节点(进程),或者说是管理本机资源的进程
- 作用
- Standalone模式
- 资源管理的从节点,负责控制计算节点,启动Executor
- Yarn模式
4. Application
- 概念理解
- 基于Spark的用户程序,包含driver程序和运行在集群上的executor程序,即一个完整的spark应用
5. Driver(program)
- 作用
- 概念理解
- 驱动程序,Application中的main函数并创建SparkContext
6. Executor
- 概念理解
- 是在一个worker进程所管理的节点上为某个Application启动的一个个进程,
- 作用
- 这个进程负责运行任务,并且负责将数据存在内存或者磁盘上,每个应用之间都有各自独立的executor
7. Task
8. Job
- 概念理解
- 包含很多任务(Task) 的组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job
9. Stage
- 概念理解
- 一个job会被拆分成很多组任务(Task),每组任务(Task)被称为Stage
- 就像MapReduce分为MapTask和ReduceTask一样
Spark代码流程
创建SparkConf对象
可以设置Application name。
可以设置运行模式及资源需求。
创建SparkContext对象
基于Spark的上下文创建一个RDD,对RDD进行处理。
应用程序中要有Action类算子来触发Transformation类算子执行。
关闭Spark上下文对象SparkContext。
17.2.4 Spark代码流程
-
创建SparkConf对象
- 可以设置Application name。
- 可以设置运行模式及资源需求。
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
-
创建SparkContext对象
val sparkContext: SparkContext = new SparkContext(sparkConf)
-
基于Spark的上下文创建一个RDD,对RDD进行处理。
val value: RDD[(String,Int)] = sparkContext.textFile("src/main/resources/user.log")
-
应用程序中要有Action类算子来触发Transformation类算子执行。
println(lines.count())
lines.foreach(ele => println("foreach:" + ele))
lines.take(5).foreach(ele => println("take:" + ele))
println(lines.first())
lines.collect().foreach(ele => println("collect:" + ele))
-
关闭Spark上下文对象SparkContext。
sparkContext.stop()
17.3 算子(单文件)⭐️
什么是算子?
可以理解成spark RDD的方法,这些方法作用于RDD的每一个partition。因为spark的RDD是一个 lazy的计算过程,只有得到特定触发才会进行计算,否则不会产生任何结果
Spark 记录了RDD之间的生成和依赖关系,但是只有当F进行行动操作时,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算,如下图的A、B、C、D分别是一个个RDD
17.3.1 转换算子
- 概念理解
- Transformations 类算子叫做转换算子(本质就是函数),Transformations算子是延迟执行,也叫 懒加载 执行
- 常见的Transformations 类算子
- filter: 过滤符合条件的记录数,true保留,false过滤掉
- map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素
- faltMap:先map后flat,与map类似,每个输入项可以映射0到多个输出项
- sample:随机抽样算子,根据传进去的小数按比例进行有放回或者无放回的抽样
- reduceByKey:将相同的key根据相应的逻辑进行处理
- sortByKey/sortBy:作用在k,v格式的RDD上,对key进行升序或者降序排序
17.3.2 行动算子
- 概念理解
- Action类算子叫做行动算子,Action类算子是触发执行
- 一个Application应用程序中有几个Action类算子执行,就有几个job运行
- 常见Action类算子
- count:返回数据集中的元素数,会在结果计算完成后回收到Driver端
- take(n) :返回一个包含数据集前n个元素的集合
- first:效果等同于 take(1) ,返回数据集中的第一个元素
- foreach :循环遍历数据集中的每个元素,运行相应的逻辑
- collect :将计算结果回收到 Driver 端
17.3.3 控制算子
- 概念理解
- 将RDD持久化,持久化的单位是Partition
- 控制算子有3种,cache、persist、checkpoint,其中cache和persist都是 懒执行 的,必须有一个Action类算子来触发执行
- checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系
1. cache
-
默认将RDD的数据持久化到内存中,cache是 懒执行
- cache() = persist() = persist(StorageLevel.Memory_Only)
-
rdd.cache().count() 返回的不是持久化的RDD,而是一个数值
object Hello04Cache {
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext((new SparkConf().setMaster("local").setAppName("Hello04Cache" + System.currentTimeMillis())))
var lines: RDD[String] = sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
lines = lines.cache()
val startTime = System.currentTimeMillis
val count = lines.count
val endTime = System.currentTimeMillis
System.out.println("第一次共" + count + "条数据," + "计算时间=" + (endTime - startTime))
val cacheStartTime = System.currentTimeMillis
val cacheResult = lines.count
val cacheEndTime = System.currentTimeMillis
System.out.println("第二次共" + cacheResult + "条数据," + "计算时间=" + (cacheEndTime - cacheStartTime))
sparkContext.stop()
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
2. persist
-
特点:
- 可以指定持久化的级别,最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。
-
持久化级别
- MEMORY_ONLY
- 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
- MEMORY_AND_DISK
- 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
- MEMORY_ONLY_SER
- 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
- MEMORY_AND_DISK_SER
- 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
- DISK_ONLY
- 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
- MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等。
-
代码实现
object Hello05Persist {
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext((new SparkConf().setMaster("local").setAppName("Hello05Persist" + System.currentTimeMillis())))
var lines: RDD[String] = sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
lines = lines.persist(StorageLevel.DISK_ONLY)
val startTime = System.currentTimeMillis
val count = lines.count
val endTime = System.currentTimeMillis
System.out.println("第一次共" + count + "条数据," + "计算时间=" + (endTime - startTime))
val cacheStartTime = System.currentTimeMillis
val cacheResult = lines.count
val cacheEndTime = System.currentTimeMillis
System.out.println("第二次共" + cacheResult + "条数据," + "计算时间=" + (cacheEndTime - cacheStartTime))
sparkContext.stop()
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
3. checkpoint
-
特点
- checkpoint将RDD持久化到磁盘,还可以切断 RDD 之间的依赖关系,也是 懒执行
- 当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制
- checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制
-
执行原理
- 当RDD的 job 执行完毕之后,会从finalRDD从后往前回溯
- 当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的RDD做一个标记
- Spark 框架会自动启动一个新的 job ,从头开始重新计算这个 RDD 的数据,并将计算出的数据持久化到Checkpoint目录中
- 以便下次可以快速访问到被标记了checkpoint的RDD,切断了RDD之间的依赖性
-
使用Checkpoint时常用的优化手段
- 对RDD执行Checkpoint之前,最好对这个RDD先执行cache
- 这样新启动的 job 只需要将内存中的数据拷贝到Checkpoint目录中就可以,省去了重新计算这一步
-
代码实现
def main(args: Array[String]): Unit = {
val sparkConf = new
SparkConf().setMaster("local").setAppName("SparkCheckPoint" + System.currentTimeMillis())
val sparkContext = new SparkContext(sparkConf)
sparkContext.setCheckpointDir("./checkpoint")
val lines: RDD[String] =
sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
val words: RDD[String] = lines.flatMap(_.split(" "))
println("words" + words.getNumPartitions)
words.checkpoint
words.count
sparkContext.stop
}
17.4 Spark集群搭建
17.4.1 安装环境检测
-
搭建之前确认对应的 java 版本为8版本
-
搭建之前确认对应的 scala 版本为2.12.x版本。
-
[root@node01 ~]# rpm -ivh scala-2.12.11.rpm
-
[root@node01 ~]# whereis scala
-
[root@node01 ~]# vim /etc/profile
export SCALA_HOME=/usr/share/scala
export PATH=$SCALA_HOME/bin:$PATH
-
[root@node01 ~]# source /etc/profile
-
三台计算机Node01 Node02 Node03都需要安装Scala
17.4.2 standalone(Single)
启动spark的集群
[root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/
[root@node01 sbin]# ./start-all.sh
访问
http://192.168.88.101:8080/
运行案例
spark-submit --master spark://node01:7077 --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
17.4.3 standalone(HA)
- 启动集群
- Zookeeper :
- 主节点:
- [root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/
- [root@node01 sbin]# ./start-all.sh
- 备用节点
- [root@node02 ~]# cd /opt/yjx/spark-2.4.6/sbin/
- [root@node02 sbin]# ./start-master.sh
17.4.4 standalone(UI)
17.4.5 SparkShell
17.4.6 yarn模式
17.5 任务提交方式⭐️
Standalon 和 Yarn 的任务提交方式图解
17.5.1 Standalone-client
-
提交命令
spark-submit --master spark://node01:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
-
执行流程
- client模式提交任务后,会在客户端启动Driver进程,来进行任务调度
- Driver会向Master申请启动Application启动的资源,资源申请成功后
- Driver端将task分发到worker端执行,启动executor进程(任务的分发)
- worker端(executor进程)将task执行结果返回到Driver端(任务结果的回收)
-
总结
- client模式适用于测试调试程序,Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况
- 生产环境下不能使用client 模式,是因为:假设要提交100个 application 到集群运行,Driver 每次都会在 client 端(单个节点)启动,那么就会导致客户端100次网卡流量暴增的问题
17.5.2 Standalone-cluster
-
提交命令
spark-submit --master spark://node01:7077 --deploy-mode cluster --classorg.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
-
执行流程
- cluster模式提交应用程序后,会向Master请求启动Driver
- Master接受请求后,随机在集群中的一台节点来启动Driver进程
- Driver启动后为当前应用程序申请资源
- Driver端发送task到worker节点上执行(任务的分发)
- Worker上的executor进程将执行情况和执行结果返回给Driver端(任务结果的回收)
-
总结
- Standalone-cluster 提交方式,应用程序使用的所有 jar 包和文件,必须保证所有的worker 节点都要有,因为此种方式, spark 不会自动上传包
- 两种保证所有的Worker节点都有应用程序所需的jar包和文件
- 将所有的依赖包和文件打到同一个包中,然后放在 hdfs 上。
- 将所有的依赖包和文件各放一份在 worker 节点上
17.5.3 yarn-client
-
提交命令
spark-submit --master yarn --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
spark-submit --master yarn–client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
-
执行流程
版本一:
- 客户端提交一个Application,在客户端启动一个Driver进程
- 应用程序启动后会向RS(ResourceManager)(相当于Standalone模式下的master进程)发送请求
- RS收到请求后,随机选择一台NM(NodeManager)启动AM
- 这里的NM相当于Standalone中的Worker进程
- AM启动后,会向RS请求一批container资源,用于启动Executor
- RS会找到一批NM(包含container)返回给AM,用于启动Executor
- AM会向NM发送命令启动Executor
- Executor启动后,会 反向注册 给Driver,Driver发送task到Executor,执行情况和结果返回个Driver端
版本二:
- 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动 ApplicationMaster, 随后 ResourceManager 分配 container,在合适的 NodeManager上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver
- Driver 启动后向 ResourceManager 申请 Executor内存,ResourceManager接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程,Executor 进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到 Action 算子时,触发一个 job,并根据宽依赖开始划分 stage,每个stage生成对应的taskSet,之后将 task 分发到各个Executor上执行
版本三:
- 在client端启动Driver进程,初始化作业,解析程序,初始化两个类:DAGScheduler,TaskScheduler.
– 初始化作业: 判断路径是否存在,权限校验等
– DAGScheduler将程序的执行流程解析成DAG图,并划分阶段,根据阶段内的分区初始化Task
– TaskScheduler接收Task,等待分配Task给executor - Driver会向ResourceManager,申请资源,想要启动该应用程序的AppMaster
- ResourceManager会分配一个节点,来运行AppMaster,由NodeManager负责真正分配资源运行AppMaster
- AppMaster会向ResourceManager申请整个程序所需要的其他资源,准备运行executor进程
- 在各个节点上运行的executor会向Driver进行反向注册,要求分配任务
- TaskScheduler将Task分配到不同的executor,并监控实时状态,executor开始执行任务,
- TaskScheduler收到executor执行完的信息后,表示整个应用程序完成,会向ResouceManager申请注销
-
总结
- Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地, Driver 会与 yarn 集群中的 Executor 进行大量的通信
- ApplicationMaster (executorLauncher)的在此模式中的作用:
- 为当前的 Application 申请资源
- 给 NodeManager 发送消息启动 Executor
- 注意: ApplicationMaster 在此种模式下没有作业调度的功能
17.5.4 yarn-cluster
-
提交命令
spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
-
执行流程
版本一:
- 客户机提交Application应用程序,发送请求到RS(ApplicationMaster),请求启动AM(ApplicationMaster)
- RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)
- ApplicationMaster启动后,ApplicationMaster发送请求到RS,请求一批container用于启动Executor
- ApplicationMaster返回一批NM节点给ApplicationMaster
- ApplicationMaster连接到NM,发送请求到NM启动Executor
- Executor反向注册到AM所在的节点的Driver,Driver发送task到Executor
版本二:
- client会首先向ResourceManager申请资源,要求启动AppMaster进程
- ResouceManager会分配一个节点,由NodeManager来运行AppMaster,并在AppMaster所在节点运行Driver进程
Driver进程的作用:初始化作业,解析程序,初始化两个DAGScheduler,TaskScheduler.
– 初始化作业: 判断路径是否存在,权限校验等
– DAGScheduler将程序的执行流程解析成DAG图,并划分阶段,根据阶段内的分区初始化Task
– TaskScheduler接收Task,等待分配Task给executor - AppMaster会向ResourceManager申请整个程序所需要的其他资源,准备运行executor进程
- 在各个节点上运行的executor会向Driver进行反向注册,要求分配任务
- TaskScheduler将Task分配到不同的executor,并监控实时状态,executor开始执行任务,
- TaskScheduler收到executor执行完的信息后,表示整个应用程序完成,会向ResouceManager申请注销
-
总结
- Yarn-Cluster主要用于生产环境中,因为 Driver 运行在 Yarn 集群中某一台 NodeManager中,每次提交任务的 Driver 所在的机器都不再是提交任务的客户端节点,而是多个 NM 节点中的一台,不会产生某一台机器网卡流量激增的现象,但同样也有缺点,任务提交后不能看到日志。只能通过 yarn 查看日志
- ApplicationMaster 在此模式中的的作用:
- 为当前的 Application 申请资源
- 给 NodeManger 发送消息启动 Executor
- 任务调度
17.5.5 Standalone和Yarn的对比
-
相同点
- standalone是spark自身携带的资源管理框架
- yarn是hadoop中的资源管理框架。
- 都是对核心和内存进行管理和分配。
-
作业方式不同
-
多用户支持不同
-
Yarn和Spark的Standalone调度模式对比
Yarn | Standalone | 节点功能 |
---|
ResouceManager | Master | 管理子节点、资源调度、接收任务请求 |
NodeManger | Worker | 管理当前节点,并管理子进程 |
YarnChild | Executor | 运行真正的计算逻辑的(Task) |
Client | Client | 提交app,管理该任务的Executor |
ApplicaitonMaster | ApplicaitonMaster | 管理任务,包含driver程序和运行在集群上的executor程序 |
- Spark-Standalone
- client:提交任务的节点就是Drive
- Cluster:由集群选择一个节点作为Driver
- Spark-Yarn
- Client:提交的节点就是Driver,本身还是AM,但任务调度归属于Driver
- Cluster:Driver和AM二合一,AM是负责任务调度的