
👉Cluster Manager(Master):在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器
👉 Worker节点:从节点,负责控制计算节点,启动Executor或者Driver。
👉 Driver: 运行Application 的main()函数
👉 Executor:执行器,是为某个Application运行在worker node上的一个进程
1、启动:用户程序启动SparkContext,是程序的总入口,初始化过程中启动DAGScheduler作业调度和 TaskScheduler任务调度。
2、生成作业:DAGScheduler:根据shuffleDependency将作业划分为不同的stage,根据 RDD之间的依赖关系,宽依赖和窄依赖,划分原则就是遇见窄依赖就放进当前stage,遇到宽依赖则断开。(相当于shuffle是前后的stage分界线)每一个stage里面都会划分一个taskset,也就是数据集,而DAGSchedule的下一个任务就是将这个TaskSet传给TaskSchedule(在最后一个 stage划分结束,就会触发作业的提交)。
3、提交任务集: TaskScheduler:分配 Task到哪一个executor上去执行,SchedulerBackend配合TaskScheduler 完成具体任务的资源分配。
4、任务执行:Executor:实际任务的运行最终都 Execter 类来执行,对每个任务创建一个TaskRunner类,交给线程池去实现。
spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。区别宽窄依赖的核心点是 子RDD的partition与父RDD的partition是否是1对多的关系,
如果是这样的关系的话,说明多个父rdd的partition需要经过shuffle过程汇总到一个子rdd的partition,这样就是一次宽依赖,在DAGScheduler中会产生stage的切分.
窄依赖:Narrow Dependency
父RDD和子RDD是一对一的依赖关系,如map,filter
宽依赖:Shuffle Dependency
本质就是shuffle。如reduceByKey,groupyByKey,父RDD一个分区数据给了子RDD的多个分区
存在shuffle就是宽依赖,否则就是窄依赖
RDD 作为数据结构,本质上是一个只读的分区记录集合。一个 RDD 可以包含多个分区,每个分区就是一个数据集片段。
首先,窄依赖可以支持在同一个节点上,以 pipeline 形式执行多条命令(也叫同一个 Stage 的操作),例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。
其次,则是从失败恢复的角度考虑。窄依赖的失败恢复更有效,因为它只需要重新计算丢失的 parent partition 即可,而且可以并行地在不同节点进行重计算(一台机器太慢就会重新调度到多个节点进行)。
stage的划分依据就是看是否产生了shuflle(即宽依赖),遇到一个shuffle操作就划分为前后两个stage.
两个方法:利用“血缘(Lineage)容错”和检查点(checkpoint)机制。
应用场景:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用 checkpoint 功能。
原因:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。
Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR() 方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
检查点机制是我们在 spark streaming 中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:
RDD RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。
DAG Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。
Stage 在 DAG 中又进行 Stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 Stage 又可以划分成若干 Task。接下来的事情就是 Driver 发送 Task 到 Executor,Executor 线程池去执行这些 task,完成之后将结果返回给 Driver。
Job Spark 的 Job 来源于用户执行 action 操作(这是 Spark 中实际意义的 Job),就是从 RDD 中获取结果的操作,而不是将一个 RDD 转换成另一个 RDD 的 transformation 操作。
Task 一个 Stage 内,最终的 RDD 有多少个 partition,就会产生多少个 task。
shuffle简介:在 DAG 阶段以shuffle为界,划分 stage,
上游 stage做 map task,每个maptask将计算结果数据分成多份,每一份对应到下游stage 的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;
下游stage 做reduce task,每个reduce task通过网络拉取上游 stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。
Shuffle版本也随着spark不断进步和优化:
从2.0开始,把 Sort Based Shuffle和 Tungsten-Sort全部统一到 Sort Based Shuffle中,Hash Based Shuffle退出历史舞台。
目前spark2.1,直接把SortBased Shuffle的writer分为三种:BypassMergeSortShuffleWriter,SortShuffleWriter和 UnsafeShuffle Writer。
BypassMergeSortShuffle Writer: Hash Shuffle 中的HashShuffle Writer 实现基本一致,唯一的区别在于,map端的多个输出文件会被汇总为一个文件。所有分区的数据会合并为同一个文件,会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机 access某个partition 的所有数据。
SortShuffleWriter:会对分区内进行排序或者全局排序。
处理步骤:使用 PartitionedAppendOnlyMap或者 PartitionedPairBuffer 在内存中进行排序,排序的K是(partitionld, hash (key))这样一个元组。如果超过内存limit,spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionld 时排厅,如宋 partona相同,再根据hash (key)进行比较排序。如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件和当前内存中的数据结构中的数据进行merge sort,实现全局排序。
最终读取的时候,从整个全局merge后的读取迭代器中读取的数据,就是按照partitionld 从小到大排序的数据,误取过在中使用丹仅州刀区力权,并且记录每个分区文件的起始写入位置,把这些位置数据写入索引文件中。
UnsafeShuffleWriter:优化部分是在shuffle write进行序列化写入过程中,直接对二进制进行排序,减少了内存消耗,最终只是 partition 级别的排序。
但是这种需要一定条件:对单条记录、shuffle数量有限制,而且不能带有聚合函数。排序实现:利用一个LongArray存储分区 ID、pageNumber、offset in page,并对这个数组排序。每次插入一条 record 到 page 中,就把 partionld + pageNumber + offset in page,作可以迭代器 PackedRecordPointer
为一个元素插入到 LongArray中。要想反向获得 record,
定义的数据结构就是[24 bit partition number][13 bit memory page number][27 bit offset inpage]然后到根据该指针可以拿到真实的record。
Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。
RDD分区函数HashPatitoner和 RangePatitoner 实现
1、HashPatitioner分区:对于给定的key值,计算其 hashcode除以分区数取余,最后这个值就是分区的id。可能会出现数据不均匀,因为同一 key值都在同一分区。
2、RangerPartitoner分区:将一定范围的数映射到某一分区内。
第一步:确定边界,从所有的RDD当中随机抽取样本并进行排序,依次来确定分区的rangerBounds(边界)(这里因为RDD如果很大的话,没法按照计算总量,所以需要用到蓄水池抽样)。
第二步:计算key值在rangerBounds所处的范围,得出分区id。
额外:蓄水池抽样算法,不知总量的随机抽样。找个 leetcode敲下理解。
leetcode382.Linked List Random Node(算法随机采样)
Init : a reservoir with the size: k //k is the sample number
fori=k+1 to N
M=random(1, i);
if(M<k)
SWAP the Mth value and ith value
end for
Standalone模式:独立模式,自带完整的模式。在架构上和 MapReduce1比较,具有一致性,都是由Master、worker组成(只是名称不一样),资源抽象为粗粒式的slot,多少slot多少task。Spark on YARN:因为现在企业用到 hadoop是基于YARN 的,为了融合spark,进行统一资源管理。有两种方式, YARN-client(用于交互,client当中运行sparkContext进程进行任务分发监控),YARN-cluster 任务的分发和监控放在 MRAPPmaster当中。Spark on mesos:YARN和 mesos都是统一资源管理和调度系统。mesos支持粗粒式和细粒式调度,前者节省了资源调度时间的开销,后者是不存在资源的浪费,但是资源调度延迟较大。总的来说,spark分为三大类算子:
Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理; Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算
Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业; Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统
controller 控制操作:Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。
控制算子有三种,cache,persist,(RDD 持久化原理)checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
输入分区与输出分区一对一型
输入分区与输出分区多对一型
输入分区与输出分区多对多型
输出分区为输入分区子集型
Cache型
输入分区与输出分区一对一
对单个RDD或两个RDD聚集
连接
无输出
HDFS算子
Scala集合和数据类型
reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。
groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列 (Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。
所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速度,还可以防止使用 groupByKey 造成的内存溢出问题。
spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。
调用 cache() 和 persist() 方法即可。cache() 和 persist() 的区别在于,cache() 是 persist() 的一种简化方式,cache() 的底层就是调用 persist() 的无参版本 persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中清除缓存,可以使用 unpersist() 方法。RDD 持久化是可以手动选择不同的策略的。在调用 persist() 时传入对应的 StorageLevel 即可。
最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系) 是不变的。但是 checkpoint 执行完之后,rdd 已经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是 checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低
Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Acion 操作的时候才会真正触发运算,这也就是懒加载.
顾名思义,从字面理解RDD就是 Resillient Distributed Dataset,即弹性分布式数据集。
它是Spark提供的核心抽象。
RDD在抽象上来讲是一种抽象的分布式的数据集。它是被分区的,每个分区分布在集群中的不同的节点上。从而可以让数据进行并行的计算
rdd 分布式弹性数据集,简单的理解成一种数据结构,是 spark 框架上的通用货币。所有算子都是基于 rdd 来执行的,不同的场景会有不同的 rdd 实现类,但是都可以进行互相转换。rdd 执行过程中会形成 dag 图,然后形成 lineage 保证容错性等。从物理的角度来看 rdd 存储的是 block 和 node 之间的映射。
RDD 在逻辑上是一个 hdfs 文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让 RDD 中的数据可以被并行操作(分布式数据集)
比如有个 RDD 有 90W 数据,3 个 partition,则每个分区上有 30W 数据。RDD 通常通过 Hadoop 上的文件,即 HDFS 或者 HIVE 表来创建,还可以通过应用程序中的集合来创建;RDD 最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的 RDD partition 因为节点故障,导致数据丢失,那么 RDD 可以通过自己的数据来源重新计算该 partition。这一切对使用者都是透明的。
RDD 是 spark 提供的核心抽象,全称为弹性分布式数据集。
它主要特点就是弹性和容错性。
弹性:RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘
容错性:RDD可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。
MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;
Spark:Spark 采用 RDD 计算模型,简单容易上手。
MR:只提供 map 和 reduce 两个操作,表达能力欠缺;
Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;
MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job 之间的管理以来需要开发者自己进行管理;
Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;
MR:中间结果存放在 hdfs 中;
Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;
MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;
Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。
MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;
Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。
Spark 中和 join 相关的算子有这几个:join、fullOuterJoin、leftOuterJoin、rightOuterJoin
join
join函数会输出两个RDD中key相同的所有项,并将它们的value联结起来,它联结的key要求在两个表中都存在,类似于SQL中的INNER JOIN。但它不满足交换律,a.join(b)与b.join(a)的结果不完全相同,值插入的顺序与调用关系有关。
leftOuterJoin
leftOuterJoin会保留对象的所有key,而用None填充在参数RDD other中缺失的值,因此调用顺序会使结果完全不同。如下面展示的结果,
rightOuterJoin
rightOuterJoin与leftOuterJoin基本一致,区别在于它的结果保留的是参数other这个RDD中所有的key。
fullOuterJoin
fullOuterJoin会保留两个RDD中所有的key,因此所有的值列都有可能出现缺失的情况,所有的值列都会转为Some对象。
Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适合高时延环境下批处理计算的应用;
Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算的应用;
spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。
但是 spark 也有劣势,由于 spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的轻局昂下,可能会出现各种各样的问题,比如 OOM 内存溢出等情况,导致 spark 程序可能无法运行起来,而 mapreduce 虽然运行缓慢,但是至少可以慢慢运行完。
Hadoop/MapReduce 和 Spark 最适合的都是做离线型的数据分析,但 Hadoop 特别适合是单次分析的数据量 “很大” 的情景,而 Spark 则适用于数据量不是很大的情景。
Dataset转POJO
SparkSession spark = CloudUtils.getSparkSession();
// 查询原始数据
Dataset<Row> student = spark.sql("select * from `event`.`student`");
// 生成schema
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("major", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
// 转换查询结果为POJO List
List<Student> students = spark.createDataFrame(student.toJavaRDD(), schema)
.as(Encoders.bean(Student.class))
.collectAsList();
System.out.println(students);
Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码
// 查出数据并转为json集合
List<String> jsonList = spark.sql("select * from `event`.`user`")
.toJSON()
.collectAsList();
// 将json转为pojo,这里使用的是FastJSON
List<User> users = jsonList.stream()
.map(jsonString -> JSON.parseObject(jsonString, User.class))
.collect(Collectors.toList());
System.out.println(users);
POJO转Dataset
// 获取users列表
List<User> users = createUsers();
// 使用createDataFrame转为dataset
Dataset<Row> ds = spark.createDataFrame(users, User.class);
// 将驼峰式列名改为下划线式列名,camelToUnderline方法网上搜索
String[] columns = ds.columns();
String[] newColumns = Arrays.stream(columns)
.map(column -> camelToUnderline(column))
.toArray(String[]::new);
// 转为新的df(重命名后的)
ds.toDF(newColumns);
ds.show();
Mysql 的 join怎么实现的?
对于Spark来说有3中Join的实现,每种 Join对应着不同的应用场景:
Hash Join:小表会作为Build Table,大表作为Probe Table,依次读取Build Table的数据,对于每一行数据根据join key进行 hash,hash到对应的Bucket,生成 hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存;再依次扫描 Probe Table ( order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件,如果匹配成功就可以将两者 join在一起(为什么 Build Table选择小表?因为构建的 HashTable最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景)。
Broadcast Hash Join:当有限维度表和事实表进行Join操作时,为了避免shuffle,我们可以将大小有限的维度表的全部数据分发到每个节点上,供事实表使用。executor存储维度表中的全部数据,一定程度上牺牲了空间,换取 shuffle操作大量的耗时。
👉(1)broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的 p2p思路;
👉(2) hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;
Shuffle Hash Join:利用key相同必然分区相同的这个原理,SparkSQL将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行 Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗。
数据倾斜以为着某一个或者某几个 partition 的数据特别大,导致这几个 partition 上的计算需要耗费相当长的时间。
在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而一个 stage 里面的多个 task 是可以并行执行,task 数目由 partition 数目决定,如果一个 partition 的数目特别大,那么导致这个 task 执行时间很长,导致接下来的 stage 无法执行,从而导致整个 job 执行变慢。
避免数据倾斜,一般是要选用合适的 key,或者自己定义相关的 partitioner,通过加盐或者哈希值来拆分这些 key,从而将这些数据分散到不同的 partition 去执行。
如下算子会导致 shuffle 操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;
Master 实际上可以配置两个,Spark 原生的 standalone 模式是支持 Master 主备切换的。当 Active Master 节点挂掉以后,我们可以将 Standby Master 切换为 Active Master。
Spark Master 主备切换可以基于两种机制,一种是基于文件系统的,一种是基于 ZooKeeper 的。
基于文件系统的主备切换机制,需要在 Active Master 挂掉之后手动切换到 Standby Master 上;
而基于 Zookeeper 的主备切换机制,可以实现自动切换 Master。
NONE :什么类型都不是
DISK_ONLY:磁盘
DISK_ONLY_2:磁盘;双副本
MEMORY_ONLY: 内存;反序列化;把RDD作为反序列化的方式存储,假如RDD的内容存不下,剩余的分区在以后需要时会重新计算,不会刷到磁盘上。
MEMORY_ONLY_2:内存;反序列化;双副本
MEMORY_ONLY_SER:内存;序列化;这种序列化方式,每一个partition以字节数据存储,好处是能带来更好的空间存储,但CPU耗费高
MEMORY_ONLY_SER_2 : 内存;序列化;双副本
MEMORY_AND_DISK:内存 + 磁盘;反序列化;双副本;RDD以反序列化的方式存内存,假如RDD的内容存不下,剩余的会存到磁盘
MEMORY_AND_DISK_2 : 内存 + 磁盘;反序列化;双副本
MEMORY_AND_DISK_SER:内存 + 磁盘;序列化
MEMORY_AND_DISK_SER_2:内存 + 磁盘;序列化;双副本
目前,除了local模式为本地调试模式以为, Spark支持三种分布式部署方式,分别是standalone、spark on mesos和 spark on YARN
Standalone模式
即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。从一定程度上说,该模式是其他两种的基础。目前Spark在standalone模式下是没有任何单点故障问题的,这是借助zookeeper实现的,思想类似于Hbase master单点故障解决方案。将Spark standalone与MapReduce比较,会发现它们两个在架构上是完全一致的:
Spark On YARN模式
spark on yarn 的支持两种模式:
yarn-cluster和yarn-client的区别在于yarn appMaster,每个yarn app实例有一个appMaster进程,是为app启动的第一个container;负责从ResourceManager请求资源,获取到资源后,告诉NodeManager为其启动container。yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果你需要用于生产环境,那么请选择yarn-cluster;而如果你仅仅是Debug程序,可以选择yarn-client。
Spark On Mesos模式
Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序
粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。
spark可以在集群运行时启动一个或多个standby Master,当 Master 出现异常时,会根据规则启动某个standby master接管,在standlone模式下有如下几种配置
ZOOKEEPER
集群数据持久化到zk中,当master出现异常时,zk通过选举机制选出新的master,新的master接管是需要从zk获取持久化信息
FILESYSTEM
集群元数据信息持久化到本地文件系统, 当master出现异常时,只需要在该机器上重新启动master,启动后新的master获取持久化信息并根据这些信息恢复集群状态
CUSTOM
自定义恢复方式,对 standloneRecoveryModeFactory 抽象类 进行实现并把该类配置到系统中,当master出现异常时,会根据用户自定义行为恢复集群
None
不持久化集群的元数据, 当 master出现异常时, 新启动的Master 不进行恢复集群状态,而是直接接管集群
Worker 以定时发送心跳给 Master, 让 Master 知道 Worker 的实时状态,当worker出现超时时,Master 调用 timeOutDeadWorker 方法进行处理,在处理时根据 Worker 运行的是 Executor 和 Driver 分别进行处理
spark的内存结构分为3大块:storage/execution/系统自留
storage 内存:用于缓存 RDD、展开 partition、存放 Direct Task Result、存放广播变量。在 Spark Streaming receiver 模式中,也用来存放每个 batch 的 blocks
execution 内存:用于 shuffle、join、sort、aggregation 中的缓存、buffer
系统自留:
在 spark 运行过程中使用:比如序列化及反序列化使用的内存,各个对象、元数据、临时变量使用的内存,函数调用使用的堆栈等
作为误差缓冲:由于 storage 和 execution 中有很多内存的使用是估算的,存在误差。当 storage 或 execution 内存使用超出其最大限制时,有这样一个安全的误差缓冲在可以大大减小 OOM 的概率
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QEvJrIkw-1641869257567)(…/pictures/spark中的广播变量.png)]
顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去。这样的场景很多,比如 driver 上有一张表,其他节点上运行的 task 需要 lookup 这张表,那么 driver 可以先把这张表 copy 到这些节点,这样 task 就可以在本地查表了。如何实现一个可靠高效的 broadcast 机制是一个有挑战性的问题。先看看 Spark 官网上的一段话:
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
这就涉及一致性的问题,如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步?还会涉及 fault-tolerance 的问题。为了避免维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。
因为每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application。因此每个节点(executor)上放一份就可以被所有 task 共享。
driver program 例子:
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
driver 使用 sc.broadcast() 声明要 broadcast 的 data,bdata 的类型是 Broadcast。
当 rdd.transformation(func) 需要用 bdata 时,直接在 func 中调用,比如上面的例子中的 map() 就使用了 bdata.value.size。
broadcast 的实现机制很有意思:
Driver 先建一个本地文件夹用以存放需要 broadcast 的 data,并启动一个可以访问该文件夹的 HttpServer。当调用val bdata = sc.broadcast(data)时就把 data 写入文件夹,同时写入 driver 自己的 blockManger 中(StorageLevel 为内存+磁盘),获得一个 blockId,类型为 BroadcastBlockId。当调用rdd.transformation(func)时,如果 func 用到了 bdata,那么 driver submitTask() 的时候会将 bdata 一同 func 进行序列化得到 serialized task,**注意序列化的时候不会序列化 bdata 中包含的 data。**上一章讲到 serialized task 从 driverActor 传递到 executor 时使用 Akka 的传消息机制,消息不能太大,而实际的 data 可能很大,所以这时候还不能 broadcast data。
driver 为什么会同时将 data 放到磁盘和 blockManager 里面?放到磁盘是为了让 HttpServer 访问到,放到 blockManager 是为了让 driver program 自身使用 bdata 时方便(其实我觉得不放到 blockManger 里面也行)。
**那么什么时候传送真正的 data?**在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,如果不在就使用下面的两种 fetch 方式之一去将 data fetch 过来。得到 data 后,将其存放到 blockManager 里面,这样后面运行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿来用了。
下面探讨 broadcast data 时候的两种实现方式:
顾名思义,HttpBroadcast 就是每个 executor 通过的 http 协议连接 driver 并从 driver 那里 fetch data。
Driver 先准备好要 broadcast 的 data,调用sc.broadcast(data)后会调用工厂方法建立一个 HttpBroadcast 对象。该对象做的第一件事就是将 data 存到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘,blockId 类型为 BroadcastBlockId。
同时 driver 也会将 broadcast 的 data 写到本地磁盘,例如写入后得到 /var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0, 这个文件夹作为 HttpServer 的文件目录。
Driver 和 executor 启动的时候,都会生成 broadcastManager 对象,调用 HttpBroadcast.initialize(),driver 会在本地建立一个临时目录用来存放 broadcast 的 data,并启动可以访问该目录的 httpServer。
**Fetch data:**在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,**如果不在就使用 http 协议连接 driver 上的 httpServer,将 data fetch 过来。**得到 data 后,将其存放到 blockManager 里面,这样后面运行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿来用了。
HttpBroadcast 最大的问题就是 driver 所在的节点可能会出现网络拥堵,因为 worker 上的 executor 都会去 driver 那里 fetch 数据。
为了解决 HttpBroadast 中 driver 单点网络瓶颈的问题,Spark 又设计了一种 broadcast 的方法称为 TorrentBroadcast,**这个类似于大家常用的 BitTorrent 技术。**基本思想就是将 data 分块成 data blocks,然后假设有 executor fetch 到了一些 data blocks,那么这个 executor 就可以被当作 data server 了,随着 fetch 的 executor 越来越多,有更多的 data server 加入,data 就很快能传播到全部的 executor 那里去了。
HttpBroadcast 是通过传统的 http 协议和 httpServer 去传 data,在 TorrentBroadcast 里面使用在上一章介绍的 blockManager.getRemote() => NIO ConnectionManager 传数据的方法来传递,读取数据的过程与读取 cached rdd 的方式类似,可以参阅 CacheAndCheckpoint 中的最后一张图。
下面讨论 TorrentBroadcast 的一些细节:
Driver 先把 data 序列化到 byteArray,然后切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB 设置)大小的 data block,每个 data block 被 TorrentBlock 对象持有。切割完 byteArray 后,会将其回收,因此内存消耗虽然可以达到 2 * Size(data),但这是暂时的。
完成分块切割后,就将分块信息(称为 meta 信息)存放到 driver 自己的 blockManager 里面,StorageLevel 为内存+磁盘,同时会通知 driver 自己的 blockManagerMaster 说 meta 信息已经存放好。通知 blockManagerMaster 这一步很重要,因为 blockManagerMaster 可以被 driver 和所有 executor 访问到,信息被存放到 blockManagerMaster 就变成了全局信息。
之后将每个分块 data block 存放到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘。存放后仍然通知 blockManagerMaster 说 blocks 已经存放好。到这一步,driver 的任务已经完成。
executor 收到 serialized task 后,先反序列化 task,这时候会反序列化 serialized task 中包含的 bdata 类型是 TorrentBroadcast,也就是去调用 TorrentBroadcast.readObject()。这个方法首先得到 bdata 对象,**然后发现 bdata 里面没有包含实际的 data。怎么办?**先询问所在的 executor 里的 blockManager 是会否包含 data(通过查询 data 的 broadcastId),包含就直接从本地 blockManager 读取 data。否则,就通过本地 blockManager 去连接 driver 的 blockManagerMaster 获取 data 分块的 meta 信息,获取信息后,就开始了 BT 过程。
**BT 过程:**task 先在本地开一个数组用于存放将要 fetch 过来的 data blocks arrayOfBlocks = new Array[TorrentBlock](totalBlocks),TorrentBlock 是对 data block 的包装。然后打乱要 fetch 的 data blocks 的顺序,比如如果 data block 共有 5 个,那么打乱后的 fetch 顺序可能是 3-1-2-4-5。然后按照打乱后的顺序去 fetch 一个个 data block。fetch 的过程就是通过 “本地 blockManager -本地 connectionManager-driver/executor 的 connectionManager-driver/executor 的 blockManager-data” 得到 data,这个过程与 fetch cached rdd 类似。**每 fetch 到一个 block 就将其存放到 executor 的 blockManager 里面,同时通知 driver 上的 blockManagerMaster 说该 data block 多了一个存储地址。**这一步通知非常重要,意味着 blockManagerMaster 知道 data block 现在在 cluster 中有多份,下一个不同节点上的 task 再去 fetch 这个 data block 的时候,可以有两个选择了,而且会随机选择一个去 fetch。这个过程持续下去就是 BT 协议,随着下载的客户端越来越多,data block 服务器也越来越多,就变成 p2p下载了。关于 BT 协议,Wikipedia 上有一个动画。
整个 fetch 过程结束后,task 会开一个大 Array[Byte],大小为 data 的总大小,然后将 data block 都 copy 到这个 Array,然后对 Array 中 bytes 进行反序列化得到原始的 data,这个过程就是 driver 序列化 data 的反过程。
最后将 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 为内存+磁盘。显然,这时候 data 在 blockManager 里存了两份,不过等全部 executor 都 fetch 结束,存储 data blocks 那份可以删掉了。
@Andrew-Xia 回答道:不会怎样,就是这个rdd在每个executor中实例化一份。
公共数据的 broadcast 是很实用的功能,在 Hadoop 中使用 DistributedCache,比如常用的-libjars就是使用 DistributedCache 来将 task 依赖的 jars 分发到每个 task 的工作目录。不过分发前 DistributedCache 要先将文件上传到 HDFS。这种方式的主要问题是资源浪费,如果某个节点上要运行来自同一 job 的 4 个 mapper,那么公共数据会在该节点上存在 4 份(每个 task 的工作目录会有一份)。但是通过 HDFS 进行 broadcast 的好处在于单点瓶颈不明显,因为公共 data 首先被分成多个 block,然后不同的 block 存放在不同的节点。这样,只要所有的 task 不是同时去同一个节点 fetch 同一个 block,网络拥塞不会很严重。
对于 Spark 来讲,broadcast 时考虑的不仅是如何将公共 data 分发下去的问题,还要考虑如何让同一节点上的 task 共享 data。
对于第一个问题,Spark 设计了两种 broadcast 的方式,传统存在单点瓶颈问题的 HttpBroadcast,和类似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用传统的 client-server 形式的 HttpServer 来传递真正的 data,而 TorrentBroadcast 使用 blockManager 自带的 NIO 通信方式来传递 data。TorrentBroadcast 存在的问题是慢启动和占内存,慢启动指的是刚开始 data 只在 driver 上有,要等 executors fetch 很多轮 data block 后,data server 才会变得可观,后面的 fetch 速度才会变快。executor 所占内存的在 fetch 完 data blocks 后进行反序列化时需要将近两倍 data size 的内存消耗。不管哪一种方式,driver 在分块时会有两倍 data size 的内存消耗。
对于第二个问题,每个 executor 都包含一个 blockManager 用来管理存放在 executor 里的数据,将公共数据存放在 blockManager 中(StorageLevel 为内存+磁盘),可以保证在 executor 执行的 tasks 能够共享 data。
其实 Spark 之前还尝试了一种称为 TreeBroadcast 的机制,详情可以见技术报告 Performance and Scalability of Broadcast in Spark。
更深入点,broadcast 可以用多播协议来做,不过多播使用 UDP,不是可靠的,仍然需要应用层的设计一些可靠性保障机制。
数据倾斜是一种很常见的问题(依据二八定律),简单来说,比方WordCount中某个Key对应的数据量非常大的话,就会产生数据倾斜,导致两个后果:
聚合倾斜
双重聚合(局部聚合+全局聚合)
场景: 对RDD进行reduceByKey等聚合类shuffle算子,SparkSQL的groupBy做分组聚合这两种情况
思路:首先通过map给每个key打上n以内的随机数的前缀并进行局部聚合,即(hello, 1) (hello, 1) (hello, 1) (hello, 1)变为(1_hello, 1) (1_hello, 1) (2_hello, 1),并进行reduceByKey的局部聚合,然后再次map将key的前缀随机数去掉再次进行全局聚合;
原理: 对原本相同的key进行随机数附加,变成不同key,让原本一个task处理的数据分摊到多个task做局部聚合,规避单task数据过量。之后再去随机前缀进行全局聚合;
优点:效果非常好(对聚合类Shuffle操作的倾斜问题);
缺点:范围窄(仅适用于聚合类的Shuffle操作,join类的Shuffle还需其它方案)
join倾斜
将reduce join转为map join
场景: 对RDD或Spark SQL使用join类操作或语句,且join操作的RDD或表比较小(百兆或1,2G); 思路:使用broadcast和map类算子实现join的功能替代原本的join,彻底规避shuffle。对较小RDD直接collect到内存,并创建broadcast变量;并对另外一个RDD执行map类算子,在该算子的函数中,从broadcast变量(collect出的较小RDD)与当前RDD中的每条数据依次比对key,相同的key执行你需要方式的join;
原理: 若RDD较小,可采用广播小的RDD,并对大的RDD进行map,来实现与join同样的效果。简而言之,用broadcast-map代替join,规避join带来的shuffle(无Shuffle无倾斜); 优点:效果很好(对join操作导致的倾斜),根治;
缺点:适用场景小(大表+小表),广播(driver和executor节点都会驻留小表数据)小表也耗内存
采样倾斜key并分拆join操作
场景: 两个较大的(无法采用方案五)RDD/Hive表进行join时,且一个RDD/Hive表中少数key数据量过大,另一个RDD/Hive表的key分布较均匀(RDD中两者之一有一个更倾斜);
思路:
优点: 前提是join导致的倾斜(某几个key倾斜),避免占用过多内存(只需对少数倾斜key扩容n倍);
缺点: 对过多倾斜key不适用。
用随机前缀和扩容RDD进行join
场景: RDD中有大量key导致倾斜; 思路:与方案六类似。
原理: 与方案六只有唯一不同在于这里对不倾斜RDD中所有数据进行扩大n倍,而不是找出倾斜key进行扩容;
优点: 对join类的数据倾斜都可处理,效果非常显著;
缺点: 缓解,扩容需要大内存
Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分
一般来说transformation算子均是在worker上执行的,其他类型的代码在driver端执行
上面是使用哪种 writer 的判断依据, 是否开启 mapSideCombine 这个判断,是因为有些算子会在 map 端先进行一次 combine, 减少传输数据。 因为 BypassMergeSortShuffleWriter 会临时输出Reducer个(分区数目)小文件,所以分区数必须要小于一个阀值,默认是小于200
UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据