• Spark 【分区与并行度】


    RDD 并行度和分区

    SparkConf

    setMaster("local[*]")

    我们在创建 SparkContext 对象时通常会指定 SparkConf 参数,它包含了我们运行时的配置信息。如果我们的 setMaster 中的参数是 "local[*]" 时,通常代表使用的CPU核数为当前环境的最大值。

    1. val conf = new SparkConf()
    2. .setMaster("local[*]")
    3. .setAppName("test partition")
    4. val sc = new SparkContext(conf)
    5. val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    6. rdd.saveAsTextFile("test_par_out")
    7. sc.stop()

    运行结果:

    在设备管理器中查看CPU核数:

     setMaster("local")

    这时的使用 CPU 核数的默认值为 1 。

    1. val conf = new SparkConf()
    2. .setMaster("local")
    3. .setAppName("test partition")

     

    setMaster["local[2]"]

    设置使用的 CPU 核数为 2

    1. val conf = new SparkConf()
    2. .setMaster("local[2]")
    3. .setAppName("test partition")

    创建RDD时指定分区数

    我们也可以在创建 RDD 对象时指定切片数 numSlices(切片数就是分区的数量(通常一个分区对应一个Task,一个Task对应一个Excutor(一个CPU核心)))。

    1. val conf = new SparkConf()
    2. .setMaster("local[*]")
    3. .setAppName("test partition")
    4. val sc = new SparkContext(conf)
    5. //第二个参数用来指定并行度(分区数)
    6. val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),1)
    7. rdd.saveAsTextFile("test_par_out")
    8. sc.stop()

     

    conf.set 指定并行度

    1. val conf = new SparkConf()
    2. .setMaster("local[*]")
    3. .setAppName("test partition")
    4. conf.set("spark.default.parallelism","5")

    读取内存数据(集合)的分区规则

    核心源码:

    1. def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
    2. (0 until numSlices).iterator.map { i =>
    3. val start = ((i * length) / numSlices).toInt
    4. val end = (((i + 1) * length) / numSlices).toInt
    5. (start, end)
    6. }
    7. }

    比如我们读取集合数组 List(1,2,3,4,5),我们在创建RDD对象时设置分区数为 3 。

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5),3)

     当我们保存时,会输出三个文件,文件内容分别是:

    • part-00000:1
    • part-00001:2,3
    • part-000002:4,5

    因为此时我们源码的 positions 的参数是 (length:5,numSilces:3),它会返回三个元组(start,end),对应我们数组的下标,并且左闭右开。

    • part-00000:(0,1)
    • part-00001:(1,3)
    • part-000002:(3,5)

    读取文件数据的分区规则

    我们在通过读取本地文件系统的文件来创建 RDD 时:

    1. val conf = new SparkConf()
    2. .setMaster("local[*]")
    3. .setAppName("test partition")
    4. conf.set("spark.default.parallelism","5")
    5. val sc = new SparkContext(conf)
    6. val rdd = sc.textFile("data/1.txt")
    7. sc.stop()

    默认的分区数量是最小分区数量(2):

    1. //defaultParallelism取决于 setMaster("local[*]") ,如果是 local[*] 代表分区数=CPU核数 但是min方法返回最小值,最小值=2
    2. def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

    Spark 分区规则和 Hadoop 是一样的,只是切片规则和数据读取规则有差异。

    案例-文件a.txt:

    1. 1
    2. 2
    3. 3

    Spark 分区数量的计算方式:

    源码:

    1. long totalSize = 0
    2. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

     对于我们上面的文件 a.txt:

    1. // totalSize 是文件的总字节数,一个回车占两个字节
    2. totalSize = 7
    3. goalSize = 7 / (2 == 0 ? 1 : 2) = 3 (单位:byte) //也就是每个分区占用3个字节
    4. //分区数= totalSize/gogalSize=2...1 (余数1byte,根据hadoop的规则,如果余数>每个分区的字节数的1.1倍,就要产生新的分区,否则就不会产生新的分区)
    5. //这里余数是 1 , 1/3 = 33.3% > 0.1 所以会产生一个新的分区
    6. //所以分区数 = 3

    数据分区的分配

    1. Spark 数据分区以行为单位进行读取

    2. 数据读取时,以偏移量为单位

    以上面的 a.txt 为例(@@代表一个回车)

    1. 1@@ => 012
    2. 2@@ => 345
    3. 3 => 6

     

    3. 数据分区的偏移量范围的计算

    1. //注意: 左右都是闭区间,
    2. //偏移量不会被重复读取
    3. part-00000 => [0,3] => 1@@,2@@ //读到3的时候已经到了第二行,要读就读一整行,所以2@@都会被读取
    4. part-00001 => [3,6] => 3 [3,6]对应的第二行的第1个字节(2)~第3行第1个字节(3),而2已经被读过了,所以只剩3
    5. part-00002 => [6,7] =>

    coalesce 和 repartition

    coalesce 和 repartition 分别用于缩减分区节省资源和扩大分区提高并行度。

    coalesce

    根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。

    当 Spark 程序中,存在过多的小任务的时候,可以使用 coalesce 方法,收缩合并分区,减少分区的个数,减少任务调度成本。

    1. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
    2. val newRDD1 = rdd.coalesce(2)
    3. /*
    4. coalesce 默认情况下不会将分区内的数据打乱重新组合,这里是直接将三个分区中两个分区合并为一个分区,另外一个仍然是一个分区
    5. 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
    6. 如果想要数据均衡,可以进行shuffle处理
    7. 分区结果:
    8. part-00000: 1 2
    9. part-00001: 3 4 5 6
    10. */
    11. val newRDD2 = rdd.coalesce(2,true)
    12. /*
    13. 分区结果:
    14. part-0000: 1 4 5
    15. part-0001: 2 3 6
    16. */

    repartition

    repartition 的底层其实就是 coalesce ,为了区分缩减和扩大分区(都可以由coalesce实现),所以分成了两个方法。

    1. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    2. /*
    3. length=6,numSlices=2
    4. part-00000: 1 2 3
    5. part-00001: 4 5 6
    6. */
    7. // 想要扩大分区数量 提高并行度 shuffle 必须为true 因为我们要把2个分区的数据分为3个分区 就必须打乱分区内的数据重新排
    8. // 如果不设置shuffle为true是没有意义的 结果还是2个分区
    9. val newRdd1 = rdd.coalesce(3,true)
    10. /*
    11. 分区结果:
    12. part-00000: 3 5
    13. part-00001: 2 4
    14. part-00002: 1 6
    15. */
    16. // 缩减分区用 coalesce,如果要数据均衡可以采用 shuffle
    17. // 扩大分区用 repartition , repartition底层就是 coalesce(numSlices,true)
    18. rdd.repartition(2)

    repartition 底层代码

    1. def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    2. coalesce(numPartitions, shuffle = true)
    3. }

  • 相关阅读:
    SQL Prepare
    ArcGIS标注的各种用法和示例
    asp.net core在发布时排除配置文件
    2022-08-23 学习笔记 day46-sql语言-TCL
    程序员35岁之后有什么出路?
    适配器模式
    WebDAV之葫芦儿·派盘+PassStore
    [附源码]SSM计算机毕业设计智能超市导购系统JAVA
    HiAI Foundation助力端侧音视频AI能力,高性能低功耗释放云侧成本
    (ECCV-2022)GaitEdge:超越普通的端到端步态识别,提高实用性
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/133070125