• Spark分布式计算原理


    一、Spark WordCount运行原理

    二、划分Stage

    数据本地化

            移动计算,而不是移动数据

            保证一个Stage内不会发生数据移动

    三、Spark Shuffle过程

    在分区之间重新分配数据

            父RDD中同一分区中的数据按照算子要求重新进入RDD的不同分区中

            中间结果写入磁盘

            有子RDD拉取数据,而不是由父RDD推送

            默认情况下,shuffle不会改变分区数量

    四、RDD的依赖关系

    Lineage:血统、依赖

            RDD最重要的特征之一,保存了RDD的依赖关系

            RDD实现了基于Lineage的容错机制

    依赖关系

            宽依赖:一个父RDD的分区被子RDD的多个分区使用

            窄依赖:一个父RDD的分区被子RDD的一个分区使用

    宽依赖对比窄依赖

    宽依赖对应shuffle操作,需要在运行时将同一个父RDD的分区传入到不同的子RDD分区中,不同的分区可能位于不同的节点,就可能涉及多个节点间数据传输

    当RDD分区丢失时,Spark会对数据进行重新计算,对于窄依赖只需要重新计算一次子RDD的父RDD分区

    补:最多22个元组,元组可以套元组

    宽依赖容错图

    结论:

    相比于宽依赖,窄依赖对优化更有利

    练习:判断RDD依赖关系

    Map  flatMap  filter  distinct  reduceByKey  groupByKey  sortByKey  union  join

    1. scala> val b = sc.parallelize(Array(("hello",11),("java",12),("scala",13),("kafka",14),("sun",15)))
    2. // b: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at :24
    3. scala> val c = sc.parallelize(Array(("hello",11),("java",12),("scala",13),("kafka",14),("sun",15)))
    4. // c: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at :24
    5. scala> b.glom.collect
    6. // collect collectAsync
    7. scala> b.glom.collect
    8. // res4: Array[Array[(String, Int)]] = Array(Array((hello,11)), Array((java,12)), Array((scala,13)), Array((kafka,14), (sun,15)))
    9. scala> c.glom.collect
    10. // collect collectAsync
    11. scala> c.glom.collect
    12. // res5: Array[Array[(String, Int)]] = Array(Array((hello,11)), Array((java,12)), Array((scala,13)), Array((kafka,14), (sun,15)))
    13. scala> val d = a.jion(b)
    14. // :27: error: value jion is not a member of org.apache.spark.rdd.RDD[(String, Int, Int, Int)]
    15. // val d = a.jion(b) //拼写错误
    16. ^
    17. scala> val d = a.join(b)
    18. // :27: error: value join is not a member of org.apache.spark.rdd.RDD[(String, Int, Int, Int)]
    19. // val d = a.join(b) //应该是bc不是ab
    20. ^
    21. scala> val d = b.join(c) //Tab成功
    22. // d: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[8] at join at :27
    23. scala> d.glom.collect
    24. // collect collectAsync
    25. scala> d.glom.collect
    26. // res6: Array[Array[(String, (Int, Int))]] = Array(Array((sun,(15,15))), Array(), Array((scala,(13,13)), (hello,(11,11)), (java,(12,12)), (kafka,(14,14))), Array())

    五、DAG工作原理

    (1)根据RDD之间的依赖关系,形成一个DAG(有向无环图)

    (2)DAGScheduler将DAG划分为多个Stage

            划分依据:是否发生宽依赖(Shuffle)

            划分规则:从后往前,遇到宽依赖切割为新的Stage

            每个Stage由一组并行的Task组成的

    六、Shuffle实践

    最佳实践

            提前部分聚合减少数据移动

            尽量避免Shuffle  

          

    七、RDD优化

    • RDD持久化
    • RDD共享变量
    • RDD分区设计
    • 数据倾斜

    (一)RDD持久化

    (1)RDD缓存机制:缓存数据至内存/磁盘,可大幅度提升Spark应用性能

    (2)缓存策略StorageLevel

    ①RDD存储级别介绍(StorageLevel )

    object StorageLevel {
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

    ②缓存应用场景

    从文件加载数据之后,因为重新获取文件成本较高

    经过较多的算子变换之后,重新计算成本较高

    单个非常消耗资源的算子之后

    ③使用注意事项

    Cache() 或persist() 后不能再有其他的算子

    Cache() 或persist() 遇到Action算子完成后才生效

    ④操作

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.storage.StorageLevel
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. object CacheDemo {
    5. def main(args: Array[String]): Unit = {
    6. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")
    7. val sc: SparkContext = SparkContext.getOrCreate(conf)
    8. val rdd: RDD[String] = sc.textFile("in/users.csv")
    9. // stack堆栈 heap栈
    10. // rdd.cache() //设置默认缓存Memory_Only
    11. rdd.persist(StorageLevel.MEMORY_ONLY)
    12. val value: RDD[(String, Int)] = rdd.map(x=>(x,1))
    13. // val start: Long = System.currentTimeMillis()
    14. // println(value.count())
    15. // val end: Long = System.currentTimeMillis()
    16. // println("1:"+(end-start))
    17. for (i<- 1 to 10){
    18. val start: Long = System.currentTimeMillis()
    19. println(value.count())
    20. val end: Long = System.currentTimeMillis()
    21. println(i+":"+(end-start))
    22. Thread.sleep(10)
    23. if (i>6){
    24. rdd.unpersist()
    25. }
    26. }
    27. }
    28. }

    (3)检查点:类似于快照

    1. sc.setCheckpointDir("hdfs:/checkpoint0918")
    2. val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
    3. rdd.checkpoint
    4. rdd.collect //生成快照
    5. rdd.isCheckpointed
    6. rdd.getCheckpointFile

    (4)检查点与缓存的区别

            检查点会删除RDD lineage,而缓存不会

            SparkContext被销毁后,检查点数据不会被删除

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object CheckPointDemo {
    4. def main(args: Array[String]): Unit = {
    5. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")
    6. val sc: SparkContext = SparkContext.getOrCreate(conf)
    7. sc.setCheckpointDir("file://文件路径")
    8. val rdd: RDD[Int] = sc.parallelize(1 to 20)
    9. rdd.checkpoint()
    10. rdd.glom().collect.foreach(x=>println(x.toList))
    11. println(rdd.isCheckpointed)
    12. println(rdd.getCheckpointFile)
    13. }
    14. }

    (二)RDD共享变量

    (1)广播变量:允许开发者将一个只读变量(Driver)缓存到每个节点(Executor)上,而不是每个任务传递一个副本

    1. import org.apache.spark.broadcast.Broadcast
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. object BroadCastDemo {
    5. def main(args: Array[String]): Unit = {
    6. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")
    7. val sc: SparkContext = SparkContext.getOrCreate(conf)
    8. var arr = Array("hello","hi","Come here")//一维数组
    9. var arr2 = Array((1,"hello"),(2,"hello"),(3,"hi"))//二维数组
    10. //广播变量
    11. val broadcastVar: Broadcast[Array[String]] = sc.broadcast(arr)
    12. val broadcastVar2: Broadcast[Array[(Int, String)]] = sc.broadcast(arr2)
    13. //普通RDD
    14. val rdd: RDD[(Int, String)] = sc.parallelize(Array((1,"leader"),(2,"teamleader"),(3,"worker")))
    15. val rdd2: RDD[(Int, String)] = rdd.mapValues(x => {
    16. println("value is:" + x)
    17. // broadcastVar.value(0) + ":" + x
    18. broadcastVar2.value(1)._2 + ":" + x
    19. })
    20. rdd2.collect.foreach(println)
    21. }
    22. }

    (2)累加器:只允许added操作,常用于实现计算

    (三)RDD分区设计

    (1)分区大小限制为2GB

    (2)分区太少

            不利于并发

            更容易数据倾斜影响

            groupBy,reduceByKey,sortByKey等内存压力增大

    (3)分区过多

            Shuffle开销越大

            创建任务开销越大

    (4)经验

            每个分区大约128MB

            如果分区小于但接近2000,则设置为大于2000

    (四)数据倾斜

    1、指分区中的数据分配不均匀,数据集中在少数分区中

            严重影响性能

            通常发生在groupBy,jion等之后

    2、解决方案

            使用新的Hash值(如对key加盐)重新分区

    3、实战

    [root@kb23 jars]# pwd

    /opt/soft/spark312/examples/jars

    [root@kb23 jars]# ls

    scopt_2.12-3.7.1.jar  spark-examples_2.12-3.1.2.jar

    [root@kb23 sbin]# pwd

    /opt/soft/spark312/sbin

    [root@kb23 sbin]# start-all.sh

    [root@kb23 spark312]# ./bin/spark-submit --master spark://192.168.91.11:7077 --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.12-3.1.2.jar 100

    Pi is roughly 3.1407527140752713

    重新分区(../spark312目录)

    1. ./bin/spark-submit \
    2. --master spark://192.168.78.131:7077
    3. --executor-memory 6G \
    4. --executor-cores 4 \
    5. --driver-memory 1G \
    6. --conf spark.default.parallelism=1000 \
    7. --conf spark.storage.memoryFraction=0.5 \
    8. --conf spark.shuffle.memoryFraction=0.3 \
    9. --class org.apache.spark.examples.SparkPi \
    10. ./examples/jars/spark-examples_2.12-3.1.2.jar \
    11. 1000

  • 相关阅读:
    JavaScript-BOM编程
    Java IDEA maven 配置
    【技术积累】Vue.js中的核心知识【五】
    论文浅尝 | AdaLoGN: 基于推理的机器阅读理解的自适应逻辑图网络
    02-MongoDB基本概念
    计算机毕业设计课题题目列表信息
    实用技术-Restful
    【前后端分离系列】 Spring Boot + Vue 实现 EasyPOI 导入导出
    文件I/O_03PageCache和Mmap
    6、Raft协议
  • 原文地址:https://blog.csdn.net/berbai/article/details/133753271