Spark,是一种"One Stack to rule them all"的大数据计算框架,期望使用一个技术堆栈就完美地解决大数据领域的各种计算任务。Apache官方,对Spark的定义就是:通用的大数据快速处理引擎。
spark是在hadoop的基础上改造的,spark基于MR算法实现分布式计算,拥有 hadoop MR所具有的有点。但不同与MR的是,spark的job中间输出结果可以保存在内存中,从而不再需要在中间读取HDFS。
原文链接:https://blog.csdn.net/a3125504x/article/details/107253031
大数据计算就是在大规模的数据集上进行一系列的数据计算处理。
Spark 是在 MapReduce 的基础上产生的,借鉴了 大量 MapReduce 实践经验,并引入了多种新型设计思想和优化策略。
Hadoop MapReduce 计算框架存在的局限性:
Spark的改进
不像 MapReduce 那样仅仅局限于 Mapper、Partitioner、Reducer 等低级 API,Spark既提供了丰富的 API:低层次 API——RDD、累加器、广播变量
;高层次的结构化 API——DataFrame、DataSet
搬运自Shockang的为什么Spark这么牛逼?
(1)Spark Core
Spark Core包含Spark的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等,主要面向批量数据处理。Spark建立在统一的抽象弹性分布式数据集(Resilient Distributed Dataset,RDD)之上,使其可以以基本一致的方式应对不同的大数据处理场景。
(2)Spark SQL
Spark SQL允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员不需要自己编写Spark应用程序。
(3)Spark Streaming
Spark Streaming支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流数据分解成一系列短小的批处理作业,每个短小的批处理作业都可以使用Spark Core进行快速处理。
(4)Structured Streaming
Structured Streaming是一种基于Spark SQL引擎构建的、可扩展且容错的流处理引擎。通过一致的API,Structured Streaming使得使用者可以像编写批处理程序一样编写流处理程序,简化了使用者的使用难度。
(5)MLlib(机器学习)
MLlib提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习工作。
(6)GraphX(图计算)
GraphX是Spark中用于图计算的API,可认为是Pregel在Spark上的重写及优化。GraphX性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。
原文链接:https://blog.csdn.net/wyz191/article/details/124240559
Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。
Driver:运行应用的的main函数,提交任务,下发计算任务。
ClusterManager:资源管理,在独立的模式(standalone)模式下是master ,在yarn是ResouceManager
Work:计算工作节点,上报自己的资源情况,启动和管理Executer
Executor:执行器,是某个应用运行在work节点上的一个进程,负责执行task任务(工作线程)
Task:被发送到某个Executor上的工作单元,跟MR中的MapTask和ReduceTask概念一样,是运行Application的基本单位。
运行大概流程:
1.Driver端提交应用,并向master申请资源
2.master根据各个work节点上汇报的资源,在相应的节点上启动executor,并将资源参数传递给Driver端
3.启动executor进程会主动和Driver端进行通信,Driver端根据代码的执行情况,产生多个task,并发送给Executor
4.Executor启动task做真正的计算,每个task得到资源后,对相应的数据分片做计算逻辑
原文链接:https://blog.csdn.net/weixin_43006131/article/details/104179299
大体执行流程
Action算子 触发 Job ,DAGScheduler对象 根据 宽依赖 划分 Stage ,根据 Stage 里所处理的数据,决定有多少个 Task ,然后创建对应的 TaskSet ,然后交给 TaskScheduler 进行调度,交给 SchedulerBackend(用于Driver和其他组件交互) ,发送个对应的 Executor 执行。 Executor 执行多少个 Task 由 TaskSchedule 决定。
各个相关组件详解
Driver:
Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。Driver在Spark作业中执行时主要负责:
1.将用户程序转化为任务(job)
2.在Executor之间调度任务(task)
3.跟踪Executor的执行情况
4.通过UI展示查询运行情况
Executor:
Executor节点是一个JVM进程,负责在Spark作业中运行具体任务,任务彼此相互独立。Spark启动时,Executor节点同时被启动,并且始终伴随整个Spark应用的生命周期,如果发生故障,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续执行。
Executor核心功能:
1.负责运行组成Spark应用的任务,并将结果返回给驱动器进程;
2.通过自身块管理器为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据的加速运算。
Master: 是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责
Worker: 是一个进程,一个 Worker 运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储 RDD 的某个或某些 partition;另一个是启动其他进程和线程(Executor) ,对 RDD 上的 partition 进行并行的处理和计算。
YARN Cluster模式
1.任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster ,随后 ResourceManager 分配 container ,在合适的 NodeManager 上启动 ApplicationManager ,此时 ApplicationManager 就是Driver ;
2.Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到ApplicationManager 的资源申请后会分配 container ,然后在合适的NodeManager上启动 Executor 进程,Executor 进程启动后会向 Driver 反向注册;
3.Executor 全部注册完成后 Driver 开始执行 main 函数,之后执行 Action 算子时,触发一个job,并根据宽窄依赖划分 stage ,每个stage生成对应的taskSet,之后分发到各个 Executor 上执行。
补充
Spark的任务调度主要集中在两个方面:资源申请和任务分发。
Job是以 Action 方法为界,遇到一个 Action 方法则触发一个Job;
Stage是 Job 的子集,以 RDD 款依赖(即Shuffle)为界,遇到一个 Shuffle 做一次划分;
Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task,一个 task 对应一个RDD分区 ,如果数据来自HDFS,一个分区的数据就是一个 split 的数据。
搬运自:fanbuer的Spark执行流程(详细)
相关术语
作业(job):RDD中由行动操作所生成的一个或多个调度阶段
调度阶段(stage):每个作业会因为RDD之间的依赖关系拆分成多组任务集合,称为调度阶段,也叫做任务集(TaskSet)。调度阶段的划分是由DAGScheduler来划分的,调度阶段有Shuffle Map Stage和Result Stage两种。
任务(Task):分发到Executor上的工作任务,是spark实际执行应用的最小单元。
DAGScheduler:DAGScheduler是面向调度阶段的任务调度器,负责接收spark应用提交的作业,根据RDD的依赖关系划分调度阶段,并提交调度阶段给TaskScheduler.
TaskScheduler:TaskScheduler是面向任务的调度器,它接受DAGScheduler提交过来的调度阶段,然后把任务分发到work节点运行,由Worker节点的Executor来运行该任务
流程概述
1.spark应用程序进行各种转换操作,通过行动操作触发作业运行。提交之后根据RDD之间的依赖关系构建DAG图,DAG图提交给DAGScheduler进行解析。
2.DAGScheduler是面向调度阶段的高层次的调度器,DAGScheduler把DAG拆分成相互依赖的调度阶段,拆分阶段是以RDD的依赖是否为宽依赖,当遇到宽依赖就划分为新的调度阶段。每个调度阶段包含一个或多个任务,这些任务形成任务集,提交给底层调度器TaskScheduler进行调度运行。DAGScheduler记录哪些RDD被存入磁盘等物化操作,同时要寻求任务的最优化调度,例如:数据本地性等;DAGScheduler监控运行调度阶段过程,如果某个调度阶段运行失败,则需要重新提交该调度阶段。
3.每个TaskScheduler只为一个SparkContext实例服务,TaskScheduler接收来自DAGScheduler发送过来的任务集,TaskScheduler收到任务集后负责把任务集以任务的形式一个个分发到集群Worker节点的Executor中去运行。如果某个任务运行失败,TaskScheduler要负责重试。另外,如果TaskScheduler发现某个任务一直未运行完,就可能启动同样的任务运行同一个任务,哪个任务先运行完就用哪个任务的结果。
4.Worker中的Executor收到TaskScheduler发送过来的任务后,以多线程的方式运行,每一个线程负责一个任务。任务运行结束后要返回给TaskScheduler,不同类型的任务,返回的方式也不同。ShuffleMapTask返回的是一个MapStatus对象,而不是结果本身;ResultTask根据结果的不同,返回的方式又可以分为两类。
搬运自:“嘣嘣嚓”的Spark-作业执行流程概述
一个RDD就是一个分布式集合,其本质为一个只读(不能修改)的分区记录集合,一个RDD可以存储几十G或多少个T的数据,一个单机存不下,我们可以将其分布式的保存在很多台机器上,将RDD分成若干个分区,每个分区放在不同的机器上去,每个分区就是一个数据片段,把它分布在不同结点上面,数据分布在不同结点上,从而就可以使计算分布式并行。这就是其为什么可以加快速度,因为数据被分布式存储了,所以计算就可以在多台计算上并行的发生。
RDD提供了一种高度受限的共享内存模型,这里说的高度受限是因为RDD为只读,RDD一旦生成就不能对其发生变化。
RDD在转换当中是可以发生修改的,通过生成一个新的RDD来完成数据修改的目的。
RDD的惰性机制,RDD的一系列转换操作并不会真正进行转换,其只记录的转换的意图,并不会真正发生计算,只有当遇到动作类型操作,它才会从头到尾执行计算。管道化是只每次转换过程中可以形成一个管道流,我们可直接将一个操作的输出呈给下一个操作,而不需要将中间结果进行存储处理。同时因为操作非常多,每一个操作就非常简单,将这些简单的操作串联起来就可以解决很多复杂的功能。
原文链接:https://blog.csdn.net/abcdrachel/article/details/105628731
转化函数和行动函数:
转化(transformation)函数:返回一个新的 RDD,比如 map() 和 filter()
执行(action)函数:把结果输出,不返回RDD。比如 saveAsTextFile、count() 和 first()。
跟MapReduce一样,Spark也是对大数据进行分片计算,Spark分布式计算的数据分片、
任务调度都是以RDD为单位展开的。
RDD上的转换操作又分为两种,一种转换操作产生的RDD不会出现新的分片,比如map、filter等,
也就是说一个RDD数据分片,经过map转换操作后,结果还在当前分片中。就像你用map函数对每
个数据加1,得到的还是这样一组数据,只是值不同。实际上,Spark并不是按照代码写的操作
顺序去生成RDD,比如map并不会在物理上生成一个新的数据集。物理上,Spark只有在产生新的
RDD分片时候,才会真的生成一个数据集,Spark的这种特性也被称为惰性计算。
另一种转换操作产生的RDD则会产生新的分片,比如reduceByKey,来自不同分片的相同Key
必须聚合在一起进行操作,这样就会产生新的RDD分片。
总之,Spark应用程序代码中的RDD和Spark执行过程中生成的物理RDD不是一一对应的,
RDD在Spark里面是一个非常灵活的概念,同时又非常重要,需要认真理解。
一组分片(Partition):即数据集的基本组成单位。对于RDD来说,每个分片都会被一个
计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,
那么就会采用默认值。默认值是根据spark集群中分配的core的数量来决定。
一个计算每个分区的函数:Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute
函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
RDD之间的依赖关系:RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于
流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算
丢失的分区数据,而不是对RDD的所有分区进行重新计算。
一个Partitioner:即RDD的分片函数。Partitioner函数决定了数据是如何进行分区的。
一个列表,存储每个Partition的优先位置(preferred location)。对于一个
HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据
不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其
所要处理数据块的存储位置。
原文链接:https://blog.csdn.net/qq_34555905/article/details/89150352
Spark通过分析各个RDD的依赖关系生成了DAG(这里提到DAG了,划重点!),
即:Directed Acyclic Graph(有向无环图)
具体来说,DAG就是两部分的组合:
·所有的RDD
·对RDD的所有操作
划分阶段(stage):
通过分析各个 RDD 中分区之间的依赖关系 来决定如何划分阶段。
具体划分方法是:
在DAG中从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将当前的RDD加入
到当前的stage中;窄依赖可以形成一个流水线操作,这样流水线执行大大提高了计算的效率。
对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成;
而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
之所以这样划分,是因为窄依赖可以进行流水线优化的,而宽依赖不能进行流水线优化。
Task:
task是stage里的一个任务执行单元,一般来说,一个RDD有多少个partition,就会有多少个task,
因为每一个task只是处理一个partition上的数据。
在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;
简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,而其余所有阶段都
会生成ShuffleMapTask。
之所以称之为ShuffleMapTask是因为它需要将自己的计算结果shuffle到下一个stage中。
原文链接:https://blog.csdn.net/qq_34555905/article/details/89150352
**窄依赖:**是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,
例如map、filter等操作都会产生窄依赖(独生子女)
在窄依赖中,子RDD所依赖的父RDD的partition的个数是确定的。
**宽依赖:**是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、
reduceByKey、sortByKey等操作都会产生宽依赖;(超生)也就是说,
只要产生了shuffle,就会产生宽依赖
而宽依赖是shuffle级别的,数据量越大,那么子RDD所依赖的父RDD的个数就越多,
从而子RDD所依赖的父RDD的partition的个数也会变得越来越多。
原文链接:https://blog.csdn.net/qq_34555905/article/details/89150352
自动进行内存和磁盘数据存储的切换
Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换。
基于血统的高效容错机制
在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。
Task如果失败会自动进行特定次数的重试
RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。
Stage如果失败会自动进行特定次数的重试
如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。
Checkpoint和Persist可主动或被动触发
RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。
数据调度弹性
Spark把整个Job执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。
数据分片的高度弹性
可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。
原文链接:https://blog.csdn.net/qq_34555905/article/details/89150352