• Spark基础【RDD分区和并行度】


    一 RDD分区和并行度

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合

    默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量称之为并行度。这个数量可以在构建RDD时指定。

    注意这里的并行执行的任务数量,并不是指的切分任务的数量。

    1 分区原理

    可分区:让数据能够并行计算,采用分区的概念,与kafka中的分区略有不同,kafka的分区可以保存数据,而RDD的分区不保存数据

    既然RDD中的分区不存储数据,那么分区有什么用呢?

    答:当有多个Executor多个Task时,为防止热点,过载问题,通过分区指示数据走哪个分区,可以将分区理解为管道,数据理解为水,水在管道中流,流过之后水并不会留在管道中

    2 分区数量

    saveAsTextFile方法可以生成分区文件,将数据按照分区的形式保存为文件,有几个分区就会保存几份文件

    文件的保存过程由Executor完成,目前分区数量为系统默认(local[*])

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
      val sc = new SparkContext(conf)
    
      val rdd1 = sc.makeRDD(
        List(1,2,3,4)
      )
      rdd1.saveAsTextFile("output")
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    如果构建RDD时,没有指定数据处理分区的数量,就会使用默认分区的数量

    mkRDD方法存在第二个参数,代表分区的数量numSlices(存在默认值)
    
    默认值为:scheduler.conf.getInt("spark.default.parallelism", totalCores)
    	
    	其中conf指向的是以下代码中的conf
    	
    	取到值使用spark.default.parallelism,取不到使用totalCores
        	如:conf.set("spark.default.parallelism","4")
        	
    	totalCores : 当前Master环境的总(虚拟)核数,非系统的总核数,不是固定值
    		local:1个
    		local[2]:2个
    		local[*]:机器的总核数
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    分区设置的优先级 : 方法参数 > 配置参数 > 环境配置,只会使用一个

    方法参数:
    val rdd1 = sc.makeRDD(
      List(1,2,3,4),1
    )
    
    配置参数:
    conf.set("spark.default.parallelism","2")
    
    环境配置:
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3 集合数据

    (1)当list中数据能被分区数整除时

    val rdd1 = sc.makeRDD(
      List(1,2,3,4),2
    )
    rdd1.saveAsTextFile("output")
    
    • 1
    • 2
    • 3
    • 4

    kafka生产者分区策略

    ​ 【1, 2】【3, 4】 范围

    ​ 【1, 3】【2, 4】 轮询

    Spark分区策略

    ​ 【1, 2】【3, 4】 范围

    (2)当list中的数据不能被分区数整除时

    • 五条数据,二个分区

    kafka生产者分区策略
    【1,3,5】【2,4】 轮询
    【1,2,3】【4,5】 范围

    spark分区策略
    【1,2】【3,4,5】 范围

    • 五条数据,三个分区

    kafka生产者分区策略

    ​ 【1,2】【3,4】【5】 范围

    spark分区策略

    ​ 【1】【2,3】【4,5】 范围

    4 磁盘文件

    文件分区如何设定,每个分区中数据如何流转

    (1)源码

    读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark核心源码如下:

    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下:

    public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }
    
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    
    ...
    
    for (FileStatus file: files) {
    
        ...
    
    if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    
          ...
    
    }
    protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    (2)分区数量

    注意以下内容字节数为7(1,2后面分别有一个换行符,占两个字节)

    1
    2
    3
    
    • 1
    • 2
    • 3

    将data/word.txt 文件的内容字节大小设置为3

    def main(args: Array[String]): Unit = {
      val con = new SparkConf().setMaster("local[*]").setAppName("spark")
      val sc = new SparkContext(con)
    
      val rdd = sc.textFile("data/word.txt")
      rdd.saveAsTextFile("output")
    
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    textFile方法可以在读取文件时,设定分区

    设定分区时,应该传递第二个参数,如果不设定,存在默认值

    ​ 默认值为math.min(defaultParallelism, 2)

    ​ defaultParallelism来自于配置参数,如果没有配置,则取当前环境的值【“local[*]”】

    其中第二个参数表示最小分区数,所以最终的分区数量可以大于这个值的

    由于spark基于hadoop开发,应该注意:

    • spark读取文件其实底层就是hadoop读取文件
    • spark的分区数量其实就来自于hadoop读取文件的切片

    hadoop的切片规则(参照上面源码):

    想要的切片数量 numSplits = 2(math.min(defaultParallelism, 2))
    
    总的文件大小 totalSize = 3(word.txt文件中的内容字节大小为3)
    
    预计每个分区的字节大小 goalSize = totalSize / numSplits = 3 / 2 = 1
    
    分区数量:3 / 1 = 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    修改word.txt文件中的内容字节大小为5

    想要的切片数量 numSplits = 2(math.min(defaultParallelism, 2))
    
    总的文件大小 totalSize = 5(word.txt文件中的内容字节大小为5)
    
    预计每个分区的字节大小 goalSize = totalSize / numSplits = 5 / 2 = 2
    
    分区数量:5 / 2 = 2 ... 1 (余数为1),那么剩下的一个数据放在哪里?
    
    放在新的分区中不合理,所以在源码中会经过一个判断:
    
    剩余字节数 / 切片大小(numSplits) < 1.1
    
    数据剩余量在10%以内,不用新建分区,否则创建新的分区:1 / 2 = 0.5 > 0.1 创建新的分区
    
    分区数量:2 + 1 = 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    将分区数修改为3

    想要的切片数量 numSplits = 3(val rdd = sc.textFile("data/word.txt",3))
    
    总的文件大小 totalSize = 5(word.txt文件中的内容字节大小为5)
    
    预计每个分区的字节大小 goalSize = totalSize / numSplits = 5 / 3 = 1
    
    分区数量:5 / 1 = 5 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    想切分10G文件,分2个区,并不会一个分区5G,太大了,会进行如下操作

    minSize 默认为1
    goalSize = totalSize / numSplits = 10 / 2 = 5
    blockSize 在本地为32M,集群为128M
    
    splitSize = Math.max(minSize, Math.min(goalSize, blockSize(128M))
    splitSize = Math.max(minSize(1), Math.min(goalSize(5G), blockSize(128M))) = 128
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    (3)分区如何存储数据

    分区数据的处理也是由Hadoop决定的

    hadoop在计算分区时会处理数据时的逻辑不一样

    Spark读取文件数据底层使用的就是hadoop读取的,所以读取规则用的是hadoop的

    hadoop读取规则:

    • hadoop读取数据是按行读取的,不是按字节读取
    • hadoop读取数据是偏移量读取的,不是按照顺序读取
    • hadoop读取数据时,不会重复读取相同的偏移量

    例一

    word.txt中数据

    1
    2
    3
    
    • 1
    • 2
    • 3

    @@为换行符,表示两个字节

    真实字节表示(7字节)偏移量
    1@@012
    2@@345
    36
    val rdd = sc.textFile("data/word.txt",2)
    
    • 1

    计算偏移量:经计算,每个分区大小 7 / 2 = 3,7 / 3 = 2 … 1 = 2 + 1 = 3个分区,注意hadoop按行,偏移量读取,不重复读

    计算平移量(应该怎样读)分区内数据(实际怎样读)
    [0, 3][1 2] 分区一内的数据
    [3, 6][3] 分区二内的数据
    [6, 7][] 分区三内的数据

    例二

    word.txt中数据

    123
    456
    789
    
    • 1
    • 2
    • 3
    val rdd = sc.textFile("data/word.txt",3)
    
    • 1
    1. 计算有多少个分区?
    13 / 3 = 4(每个分区放4字节)
    13 / 4 = 3...1 = 3 + 1 = 4(分区数量)
    
    2. 计算每个分区放什么数据?
    123@@ => 01234		对应的偏移量
    456@@ => 56789		对应的偏移量
    789   => 101112		对应的偏移量
    ************************************
    
    [0, 4] => [123]		分区一内的数据
    [4, 8] => [456]		分区二内的数据
    [8, 12] => [789]	分区三内的数据
    [12, 13] => []		分区四内的数据
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    先计算每个分区放几个字节,再计算分区数量

  • 相关阅读:
    linux————zabbix搭建
    一种JavaScript响应式系统实现
    前端开发神器之 VsCode AI 辅助插件 DevChat
    20230919在WIN10下使用python3将PDF文档转为DOCX格式的WORD文档
    【算法】传智杯练习赛:平等的交易
    Django实战项目-学习任务系统-自定义URL拦截器
    Redis主从复制,哨兵和Cluster集群
    济宁市中考报名照片要求及手机拍照采集证件照方法
    字节应届生薪资都能2万+,年薪30万,这样工作真的开心吗?
    python作业3
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/126215849