RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD代表的是弹性、可分区、不可变、元素可并行计算的计算。

创建一个maven工程,并添加scala框架支持,在pom文件中添加spark-core的依赖和scala的编译插件:
- <dependencies>
- <dependency>
- <groupId>org.apache.sparkgroupId>
- <artifactId>spark-core_2.12artifactId>
- <version>3.1.3version>
- dependency>
- dependencies>
-
- <build>
- <finalName>SparkCoreTestfinalName>
- <plugins>
- <plugin>
- <groupId>net.alchim31.mavengroupId>
- <artifactId>scala-maven-pluginartifactId>
- <version>3.4.6version>
- <executions>
- <execution>
- <goals>
- <goal>compilegoal>
- <goal>testCompilegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
首先,创建sparkContext:
// 1.创建sc的配置对象
val conf: SparkConf = new SparkConf()
.setAppName("sparkCore").setMaster("local[*]")
// 2. 创建sc对象
val sc = new SparkContext(conf)
1、通过本地集合创建: sc.makeRDD(集合)/sc.parallelize(集合)
2、通过读取文件创建: sc.textFile("/../..")
3、通过其他RDD衍生: val rdd = rdd1.flatMap/filter/groupBy..
1、通过本地集合创建RDD的分区数: sc.parallelize(集合[,numSlices=defaultParallelism])
(1) 如果有设置 numSlices参数, 此时分区数 = 设置的numSlices
(2)没有设置的话, 此时分区数 = defaultParallelism
a. 如果在 sparkconf中设置spark.default.parallelism参数, 此时 defaultParallelism=设置的defaultParallelism。
b. 如果在 sparkconf中没有设置spark.default.parallelism参数,
1、master=local,此时defaultParallelism=1
2、master=local[N],此时defaultParallelism=N
3、master=local[*],此时defaultParallelism=本地cpu个数
4、master=spark://...,此时defaultParallelism = max( 所有executor总核数,2 )
2、通过读取文件创建RDD的分区数:
sc.text(path[,minPartitions=defaultMinPartitions])
1、如果有指定minPartitions参数值,此时RDD的分区数>=指定minPartitions参数值
2、如果没有指定minPartitions参数值,此时RDD的分区数>= min(defaultParallelism,2)
读取文件生成的新RDD的分区数最终由文件切片数决定。、
Spark算子分为两类:
1. Transformation转换算子
生成的是新RDD,不会触发任务计算。
2. Action行动算子
没有返回值或者返回scala数据类型,会触发任务的计算。
RDD整体上分为Value类型、双Value类型和Key-Value类型。
map与mapPartitions的区别:
1、函数针对的对象不一样
map的函数是针对每个元素操作
mapPartitions的函数是针对每个分区操作
2、函数的返回值不一样
map的函数是针对每个元素操作,要求返回一个新的元素,map生成的新RDD元素个数 = 原RDD元素个数;
mapPartitions的函数是针对分区操作,要求返回新分区的迭代器,mapPartitions生成新RDD元素个数不一定=原RDD元素个数;
3、元素内存回收的时机不一样
map对元素操作完成之后就可以垃圾回收了;
mapPartitions必须要等到分区数据迭代器里面数据全部处理完成之后才会统一垃圾回收,如果分区数据比较大可能出现内存溢出,此时可以用map代替。
mapPartitionsWithIndex的函数相比mapPartitions函数多了一个分区号。
coalesce与repartition的使用场景
coalesce一般用于减少分区数,一般搭配filter使用。
repartition一般用于增大分区数,当数据量膨胀的时候需要将分区数增大,加速数据处理速度
自定义分区器
1、定义class继承Partitioner
2、重写抽象方法override def numPartitions: Int = num //获取新RDD的分区数
override def getPartition(key: Any): Int //获取key获取分区号
3、使用: 在shuffle算子中一般都可以传入partitioner对象
groupByKey与reduceByKey的区别
- reduceByKey有combiner预聚合操作,工作中推荐使用这种高性能shuffle算子
- groupByKey没有预聚合
- first与take会首先启动一个job从RDD 0号分区获取前N个元素,如果0号分区数据不够会再次启动一个job从其他分区获取数据。
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
注意: (有闭包就需要进行序列化)
参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
使用方法:
- object serializable02_Kryo {
-
- def main(args: Array[String]): Unit = {
-
- val conf: SparkConf = new SparkConf()
- .setAppName("SerDemo")
- .setMaster("local[*]")
- // 替换默认的序列化机制
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- // 注册需要使用kryo序列化的自定义类
- .registerKryoClasses(Array(classOf[Search]))
-
- val sc = new SparkContext(conf)
-
- val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
-
- val search = new Search("hello")
- val result: RDD[String] = rdd.filter(search.isMatch)
-
- result.collect.foreach(println)
- }
- // 关键字封装在一个类里面
- // 需要自己先让类实现序列化 之后才能替换使用kryo序列化
- class Search(val query: String) extends Serializable {
- def isMatch(s: String): Boolean = {
- s.contains(query)
- }
- }
- }
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

打印结果:
(2) input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []
| MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
----------------------
(2) ShuffledRDD[4] at reduceByKey at Lineage01.scala:27 []
+-(2) MapPartitionsRDD[3] at map at Lineage01.scala:23 []
| MapPartitionsRDD[2] at flatMap at Lineage01.scala:19 []
| input/1.txt MapPartitionsRDD[1] at textFile at Lineage01.scala:15 []
| input/1.txt HadoopRDD[0] at textFile at Lineage01.scala:15 []
注意:圆括号中的数字表示RDD的并行度,也就是有几个分区

打印结果:
List(org.apache.spark.OneToOneDependency@f2ce6b)
----------------------
List(org.apache.spark.OneToOneDependency@692fd26)
----------------------
List(org.apache.spark.OneToOneDependency@627d8516)
----------------------
List(org.apache.spark.ShuffleDependency@a518813)
注意:要想理解RDDS是如何工作的,最重要的就是理解Transformations。
RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是 RDD的parent RDD(s)是什么(血缘); 另一个就是RDD依赖于parent RDD(s)的哪些Partition(s),这种关系就是RDD之间的依赖(依赖)。
RDD和它依赖的父RDD(s)的依赖关系有两种不同的类型,即窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。

宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。

具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。
在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。
1)DAG有向无环图
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

2)RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

源码解析:
mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
(2) 自带缓存算子
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
1)检查点:是通过将RDD中间结果写入磁盘。
2)为什么要做检查点?
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统
4)检查点数据存储格式为:二进制的文件
5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。
6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。

7)设置检查点步骤
(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")
(2)调用检查点方法:wordToOneRdd.checkpoint()
(1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
(2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
(3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
(4)如果使用完了缓存,可以通过unpersist()方法释放缓存。
如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。
- object checkpoint02 {
-
- def main(args: Array[String]): Unit = {
-
- // 设置访问HDFS集群的用户名
- System.setProperty("HADOOP_USER_NAME","atguigu")
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- // 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径
- sc.setCheckpointDir("hdfs://hadoop102:8020/checkpoint")
-
- //3. 创建一个RDD,读取指定位置文件:hello atguigu atguigu
- val lineRdd: RDD[String] = sc.textFile("input1")
-
- //3.1.业务逻辑
- val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
-
- val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
- word => {
- (word, System.currentTimeMillis())
- }
- }
-
- //3.4 增加缓存,避免再重新跑一个job做checkpoint
- wordToOneRdd.cache()
-
- //3.3 数据检查点:针对wordToOneRdd做检查点计算
- wordToOneRdd.checkpoint()
-
- //3.2 触发执行逻辑
- wordToOneRdd.collect().foreach(println)
-
- //4.关闭连接
- sc.stop()
- }
- }
Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
1)注意:
(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
2)获取RDD分区
(1)创建包名:com.atguigu.partitioner
(2)代码实现
- object partitioner01_get {
-
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3 创建RDD
- val pairRDD: RDD[(Int, Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))
-
- //3.1 打印分区器
- println(pairRDD.partitioner)
-
- //3.2 使用HashPartitioner对RDD进行重新分区
- val partitionRDD: RDD[(Int, Int)] = pairRDD.partitionBy(new HashPartitioner(2))
-
- //3.3 打印分区器
- println(partitionRDD.partitioner)
-
- //4.关闭连接
- sc.stop()
- }
- }


累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)
累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

1)累加器使用
(1)累加器定义(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
(2)累加器添加数据(累加器.add方法)
sum.add(count)
(3)累加器获取数据(累加器.value)
sum.value
2)创建包名:com.atguigu.accumulator
3)代码实现
- object accumulator01_system {
- package com.atguigu.cache
- import org.apache.spark.rdd.RDD
- import org.apache.spark.util.LongAccumulator
- import org.apache.spark.{SparkConf, SparkContext}
-
- object accumulator01_system {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
- val sc = new SparkContext(conf)
-
- val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
- //需求:统计a出现的所有次数 ("a",10)
-
- //普通算子实现 reduceByKey 代码会走shuffle 效率低
- //val rdd1: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
-
- //普通变量无法实现
- //结论:普通变量只能从driver端发给executor端,在executor计算完以后,结果不会返回给driver端
- /*
- var sum = 0
- dataRDD.foreach{
- case (a,count) => {
- sum += count
- println("sum = " + sum)
- }
- }
- println(("a",sum))
- */
- //累加器实现
- //1 声明累加器
- val accSum: LongAccumulator = sc.longAccumulator("sum")
-
- dataRDD.foreach{
- case (a,count) => {
- //2 使用累加器累加 累加器.add()
- accSum.add(count)
- // 4 不要在executor端获取累加器的值,因为不准确
- //因此我们说累加器叫分布式共享只写变量
- //println("sum = " + accSum.value)
- }
- }
- //3 获取累加器的值 累加器.value
- println(("a",accSum.value))
-
- sc.stop()
- }
- }
注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。
3)累加器要放在行动算子中
因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。
对于在行动算子中使用的累加器,Spark只会把每个Job对各累加器的修改应用一次。
- object accumulator02_updateCount {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
- val sc = new SparkContext(conf)
-
- val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
- //需求:统计a出现的所有次数 ("a",10)
- //累加器实现
- //1 声明累加器
- val accSum: LongAccumulator = sc.longAccumulator("sum")
-
- val mapRDD: RDD[Unit] = dataRDD.map {
- case (a, count) => {
- //2 使用累加器累加 累加器.add()
- accSum.add(count)
- // 4 不要在executor端获取累加器的值,因为不准确 因此我们说累加器叫分布式共享只写变量
- //println("sum = " + accSum.value)
- }
- }
-
- //调用两次行动算子,map执行两次,导致最终累加器的值翻倍
- mapRDD.collect()
- mapRDD.collect()
-
- /**
- * 结论:使用累加器最好要在行动算子中使用,因为行动算子只会执行一次,而转换算子的执行次数不确定!
- */
- //2 获取累加器的值 累加器.value
- println(("a",accSum.value))
-
- sc.stop()
- }
- }
一般在开发中使用的累加器为集合累加器,在某些场景可以减少shuffle。