• spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量


    目录

    RDD序列化

    (1)闭包检查

    (2)序列化方法和属性

    (3)Kryo序列化

    RDD依赖关系

    (1)RDD 血缘关系

    (2)RDD 依赖关系

    (3)RDD窄依赖

    (4)RDD宽依赖

     (5)RDD阶段划分 

    (6)RDD任务划分

    RDD持久化

     (1)RDD Cache缓存  

    (2)RDD CheckPoint检查点

    (3)缓存和检查点的区别

    RDD分区器

    累加器:分布式共享只写变量

    (1)系统累加器

    (2)自定义累加器

    广播变量:分布式共享只读变量


    RDD序列化

    (1)闭包检查

    计算的角度:算子外的代码都在Driver端执行, 算子内的代码都在Executor端执行。

    在scala函数式编程中,算子内经常会用到算子外的数据,这样形成了闭包的效果;算子外的数据没有序列化,意味着无法传值给Executor端执行,就会发生错误;所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

    (2)序列化方法和属性

    1. import org.apache.spark.{SparkConf, SparkContext}
    2. object RDD_Serialiazble {
    3. def main(args: Array[String]): Unit = {
    4. //TODO 创建环境
    5. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Serializable")
    6. val sc = new SparkContext(sparkConf)
    7. //TODO
    8. //创建RDD
    9. val rdd = sc.makeRDD(Array("Data","hello world", "hello scala", "hello spark","world","Big data"))
    10. //创建search对象
    11. val search = new Search("hello")
    12. //函数传递
    13. search.getMatch1(rdd).collect().foreach(println)
    14. println(">>>>>>>>>>>")
    15. //属性传递
    16. search.getMatch2(rdd).collect().foreach(println)
    17. //TODO 关闭环境
    18. sc.stop()
    19. }
    20. }
    21. //查询对象
    22. class Search(query:String) extends Serializable {
    23. def isMatch(s:String): Boolean = {
    24. s.contains(this.query)
    25. }
    26. //函数序列化
    27. def getMatch1 (rdd: RDD[String]): RDD[String] = {
    28. rdd.filter(isMatch)
    29. }
    30. //属性序列化
    31. def getMatch2(rdd: RDD[String]) : RDD[String] = {
    32. rdd.filter(x => x.contains(query))
    33. }
    34. }

    (3)Kryo序列化

    1)Java的序列化能够序列化任何的类。但是Java的序列化字节多,序列化后对象的提交也比较大。Spark出于性能的考虑,2.0 开始支持另外一种序列化机制——Kryo。

    2)Kryo的速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

    3)使用Kryo序列化也要继承Serializable接口。

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object RDD_Serializable_Kryo {
    4. def main(args: Array[String]): Unit = {
    5. //TODO 创建环境
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Serializable")
    7. //替换序列化机制
    8. .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    9. //使用kryo序列化的自定义类
    10. .registerKryoClasses(Array(classOf[Search2]))
    11. val sc = new SparkContext(sparkConf)
    12. //TODO
    13. //创建RDD
    14. val rdd = sc.makeRDD(Array("Data","hello world", "hello scala","world","Big data"))
    15. //创建search对象
    16. val search2 = new Search2("hello")
    17. //函数传递
    18. val result = search2.getMatch3(rdd)
    19. result.collect().foreach(println)
    20. println("<<<<<<<<")
    21. //属性传递
    22. search2.getMatch4(rdd).collect().foreach(println)
    23. //TODO 关闭环境
    24. sc.stop()
    25. }
    26. }
    27. //查询对象
    28. class Search2(query: String) extends Serializable {
    29. def isMatch(s: String) = {
    30. s.contains(query)
    31. }
    32. //函数序列化
    33. def getMatch3(rdd: RDD[String]) = {
    34. rdd.filter(isMatch)
    35. }
    36. //属性序列化
    37. def getMatch4(rdd: RDD[String]) = {
    38. rdd.filter(_.contains(query))
    39. }
    40. }

    RDD依赖关系

    (1)RDD 血缘关系

    1)RDD只支持粗粒度转换。

    2)将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDDLineage会记录 RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

    注:

    粗粒度:针对整个数据集进行转换操作,而不是针对数据集中的某个元素进行转换操作;

    细粒度:针对数据集中的某个元素进行转换操作。

    1. def main(args: Array[String]): Unit = {
    2. //TODO 创建环境
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Depend")
    4. val sc = new SparkContext(sparkConf)
    5. //TODO 依赖关系
    6. val rdd = sc.textFile("datas/word2.txt")
    7. println(rdd.toDebugString)
    8. println(">>>>>>>>>>>")
    9. val flatRDD = rdd.flatMap(_.split(" "))
    10. println(flatRDD.toDebugString)
    11. println(">>>>>>>>>>>")
    12. val mapRDD = flatRDD.map((_, 1))
    13. println(mapRDD.toDebugString)
    14. println(">>>>>>>>>>>>>>")
    15. val resultRDD = mapRDD.reduceByKey(_ + _)
    16. println(resultRDD.toDebugString)
    17. println(">>>>>>>>>>>>>>")
    18. resultRDD.collect().foreach(println)
    19. //TODO 关闭环境
    20. sc.stop()
    21. }

    (2)RDD 依赖关系

    依赖关系其实就是两个相邻RDD之间的关系。

    1. def main(args: Array[String]): Unit = {
    2. //TODO 创建环境
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Depend")
    4. val sc = new SparkContext(sparkConf)
    5. //TODO 依赖关系
    6. val rdd = sc.textFile("datas/word2.txt")
    7. println(rdd.dependencies)
    8. println(">>>>>>>>>>>")
    9. val flatRDD = rdd.flatMap(_.split(" "))
    10. println(flatRDD.dependencies)
    11. println(">>>>>>>>>>>")
    12. val mapRDD = flatRDD.map((_, 1))
    13. println(mapRDD.dependencies)
    14. println(">>>>>>>>>>>>>>")
    15. val resultRDD = mapRDD.reduceByKey(_ + _)
    16. println(resultRDD.dependencies)
    17. println(">>>>>>>>>>>>>>")
    18. resultRDD.collect().foreach(println)
    19. //TODO 关闭环境
    20. sc.stop()
    21. }

    (3)RDD窄依赖

    窄依赖表示每一个父RDDPartition只能被RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。

    (4)RDD宽依赖

    宽依赖表示同一个父RDDPartition被多个子RDDPartition依赖,会引起Shuffle,宽依赖我们形象的比喻为多生。

     (5)RDD阶段划分 

    DAGDirected Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。

     DAG记录了部分RDD的转换过程和任务的阶段。

    ShuffleMap阶段,Result阶段

    (6)RDD任务划分

    RDD任务切分,中间分为:ApplicationJobStage Task

    Application:初始化一个SparkContext即生成一个Application;

    Job:一个Action算子就会生成一个 Job

    StageStage等于宽依赖(ShuffleDependency)的个数加 1

    Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

    注:Application->Job->Stage->Task每一层都是1对n的关系。

    RDD持久化

     (1)RDD Cache缓存  

    RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据缓存在JVM的堆内存中。但并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

    注:缓存有可能丢失

    1)存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。

    2)通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可

    3)Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。当一个节点 Shuffle失败了可以避免重新计算整个输入。

    4)在实际使用中,如果想重用数据,建议调用persist或cache。

    1. def main(args: Array[String]): Unit = {
    2. //TODO 创建环境
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Persist")
    4. val sc = new SparkContext(sparkConf)
    5. //TODO
    6. val rdd = sc.makeRDD(List("hello world", "hello scala"))
    7. val flatRDD = rdd.flatMap(_.split(" "))
    8. val mapRDD = flatRDD.map((_, 1))
    9. //cache数据放入内存
    10. mapRDD.cache()
    11. //persist数据放入磁盘
    12. //mapRDD.persist(StorageLevel.DISK_ONLY)
    13. val reduceRDD = mapRDD.reduceByKey(_ + _)
    14. reduceRDD.collect().foreach(println)
    15. println("<<<<<<<<<<<<<")
    16. val groupRDD = mapRDD.groupByKey()
    17. groupRDD.collect().foreach(println)
    18. //TODO 关闭环境
    19. sc.stop()
    20. }

    (2)RDD CheckPoint检查点

    所谓的检查点就是将RDD中间结果写入磁盘,由于血缘依赖过长会造成容错成本过高,在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做,减少开销。

    RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

    1. def main(args: Array[String]): Unit = {
    2. //TODO 创建环境
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("CheckPoint")
    4. val sc = new SparkContext(sparkConf)
    5. //TODO 检查点
    6. //checkpoint要指定检查点的保存路径
    7. sc.setCheckpointDir("checkpoint")
    8. val rdd = sc.makeRDD(List("hello world", "hello scala"))
    9. val flatRDD = rdd.flatMap(_.split(" "))
    10. val mapRDD = flatRDD.map((_, 1))
    11. mapRDD.checkpoint()
    12. val reduceRDD = mapRDD.reduceByKey(_ + _)
    13. reduceRDD.collect().foreach(println)
    14. println("<<<<<<<<<<<<<")
    15. val groupRDD = mapRDD.groupByKey()
    16. groupRDD.collect().foreach(println)
    17. //TODO 关闭环境
    18. sc.stop()
    19. }

    (3)缓存和检查点的区别

    1Cache缓存只是将数据保存起来,不切断血缘依赖;Checkpoint检查点切断血缘依赖。

    2Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

    3)建议对checkpoint()RDD使用Cache缓存,这样checkpointjob只需从Cache缓存中读取数据即可,否则要从头计算RDD

    RDD分区器

    Spark目前支持Hash分区、Range分区和用户自定义分区。

    Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了reduce的个数。

    只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None。

    每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

    (1)Hash分区 对于给定的key,计算其hashCode,并除以分区个数取余。

    (2)Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。

    注:前面博文写过分区器,这里只演示自定义分区器,其他的不做演示。

    1. import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    2. object RDD_OnesPart {
    3. def main(args: Array[String]): Unit = {
    4. //TODO 创建环境
    5. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Part")
    6. val sc = new SparkContext(sparkConf)
    7. //TODO 自定义分区器
    8. val rdd = sc.makeRDD(List(
    9. ("CBA", "....."), ("NBA", "....."), ("WWW", "....."), ("COM", ".....")
    10. ), 3)
    11. val partRDD = rdd.partitionBy(new MyPartitioner)
    12. partRDD.saveAsTextFile("output")
    13. //TODO 关闭环境
    14. sc.stop()
    15. }
    16. //TODO 自定义分区器
    17. /**
    18. * 1.继承partitioner
    19. * 2.重写方法
    20. *
    21. *
    22. * */
    23. class MyPartitioner extends Partitioner {
    24. //分区数量
    25. override def numPartitions: Int = 3
    26. //根据数据的key值返回分区索引
    27. override def getPartition(key: Any): Int = {
    28. key match {
    29. case "CBA" => 0
    30. case "NBA" => 1
    31. case _ => 2
    32. }
    33. }
    34. }
    35. }

     

    累加器:分布式共享只写变量

    累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge

    (1)系统累加器

    1. def main(args: Array[String]): Unit = {
    2. //TODO 创建环境
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc")
    4. val sc = new SparkContext(sparkConf)
    5. //TODO 累加器——系统累加器
    6. val rdd = sc.makeRDD(List(1, 2, 3, 4))
    7. //实现累加的方法一:reduce(—_+_)
    8. // val reduceRDD = rdd.reduce(_ + _)
    9. //s\实现累加器的方法二
    10. val sum = sc.longAccumulator("sum")
    11. rdd.foreach(
    12. num => {
    13. sum.add(num)
    14. }
    15. )
    16. //1+2+3+4=10
    17. println(sum.value)
    18. //TODO 关闭环境
    19. sc.stop()
    20. }

     

     

    (2)自定义累加器

    1. import org.apache.spark.util.AccumulatorV2
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. import scala.collection.mutable
    4. object RDD_Acc_Ones {
    5. def main(args: Array[String]): Unit = {
    6. //TODO 创建环境
    7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc")
    8. val sc = new SparkContext(sparkConf)
    9. //TODO 累加器——自定义累加器
    10. val rdd = sc.makeRDD(List(
    11. "hello","spark","hello","world"
    12. ))
    13. // rdd.map((_,1)).reduceByKey(_+_)
    14. //累加器
    15. //创建累加器对象
    16. val accWc = new MyAccumulator()
    17. //向spark进行注册
    18. sc.register(accWc,"wc")
    19. rdd.foreach(
    20. word => {
    21. accWc.add(word)
    22. }
    23. )
    24. println(accWc.value)
    25. //TODO 关闭环境
    26. sc.stop()
    27. }
    28. /**MyAccumulator
    29. * 自定义累加器
    30. * 1.继承 AccumulatorV2,设置泛型
    31. * 2.重写累加器的抽象方法
    32. */
    33. class MyAccumulator extends AccumulatorV2[String, mutable.Map[String,
    34. Long]]{
    35. var map : mutable.Map[String, Long] = mutable.Map()
    36. // 累加器是否为初始状态
    37. override def isZero: Boolean = {
    38. map.isEmpty
    39. }
    40. // 复制累加器
    41. override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    42. new MyAccumulator
    43. }
    44. // 重置累加器
    45. override def reset(): Unit = {
    46. map.clear()
    47. }
    48. // 向累加器中增加数据 (In)
    49. override def add(word: String): Unit = {
    50. // 查询 map 中是否存在相同的单词
    51. // 如果有相同的单词,那么单词的数量加 1
    52. // 如果没有相同的单词,那么在 map 中增加这个单词
    53. map(word) = map.getOrElse(word, 0L) + 1L
    54. }
    55. // 合并累加器
    56. override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
    57. Unit = {
    58. val map1 = map
    59. val map2 = other.value
    60. // 两个 Map 的合并
    61. map = map1.foldLeft(map2)(
    62. ( innerMap, kv ) => {
    63. innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
    64. innerMap
    65. }
    66. ) }
    67. // 返回累加器的结果 (Out)
    68. override def value: mutable.Map[String, Long] = map
    69. }
    70. }

     

    广播变量:分布式共享只读变量

    原理:广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量很合适。在多个并行操作中使用同一个变量,但是Spark会为每个任务分别发送。  

    1. import org.apache.spark.broadcast.Broadcast
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. object RDD_Broadcast {
    5. def main(args: Array[String]): Unit = {
    6. //TODO 创建环境
    7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc")
    8. val sc = new SparkContext(sparkConf)
    9. //TODO 累加器——广播变量
    10. val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
    11. val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
    12. // 声明广播变量
    13. val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
    14. val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
    15. case (key, num) => {
    16. var num2 = 0
    17. // 使用广播变量
    18. for ((k, v) <- broadcast.value) {
    19. if (k == key) {
    20. num2 = v
    21. }
    22. }
    23. (key, (num, num2))
    24. }
    25. }
    26. resultRDD.collect().foreach(println)
    27. //TODO 关闭环境
    28. sc.stop()
    29. }
    30. }

    本文仅仅是学习笔记的记录! 

  • 相关阅读:
    【CodeForces】CF10D LCIS
    SQL优化之慢查询日志和explain
    我这两年的CSDN博客创作经历
    [C题]2023 年全国大学生数学建模比赛思路、代码更新中.....
    Sentinel-流量防卫兵
    树莓派等Linux开发板上使用 SSD1306 OLED 屏幕,bullseye系统 ubuntu,debian
    Maven 【ERROR】 不再支持源选项 5。请使用 7或更高版本解决方案:修改Maven默认JDK(含完整代码及详细教程)
    Go:关于 Channel
    【Spring】20 解析Spring注解驱动的容器配置
    mysql-高级命令(1)和一些函数(悟已往之不谏,知来者之可追)
  • 原文地址:https://blog.csdn.net/qq_55906442/article/details/125988182