• Spark RDD的介绍及创建


    RDD介绍

    • RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。它用于囊括所有内存中和磁盘中的分布式数据实体。
    • RDD与数组的对比:
    对比项数组RDD
    概念类型数据结构实体数据模型抽象
    数据跨度单机进程内跨进程、跨计算节点
    数据构成数组元素数据分片
    数据定位数据下标、索引数据分片索引

    RDD的特性

    1. RDD是有分区的:RDD分区是RDD数据存储的最小单位,一份RDD数据本质上是分成了多个分区。
    2. RDD所有的方法都会作用在其所有分区上。
    3. RDD之间是有依赖关系的。
    4. KV型的RDD可以有分区器(默认是Hash分区,也可以手动设置。这个特性不是绝对的)
    5. RDD分区规划会尽量靠近数据所在的服务器(避免过多的网络读取)。

    RDD的属性

    • Partitions:数据分片
      每个阶段所产生的的具体实物。
    • Partitioner:分片切割规则
      按规则划分,数据分片的分布,是由RDD的Partitioner决定的。
    • Dependencies:RDD 依赖
      每个RDD都会通过Dependencies属性来记录它所依赖的前一个、或是前多个RDD,简称父RDD。
    • Compute:转换函数
      记录从父RDD到当前RDD的转换操作。

    RDD的创建

    并行化创建

    • 概念:并行化创建是指将本地集合转向分布式RDD,这是分布式的开端。代码如下图所示:
    # coding : utf8
    from  pyspark import SparkConf, SparkContext
    
    if __name__ == '__main__':
        # 创建sc对象
        conf = SparkConf().setMaster('local[*]').setAppName('test')
        sc = SparkContext(conf = conf)
    
        # 通过并行化集合创建RDD
        #rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
        #print(f"默认分区数是:{rdd.getNumPartitions()}")
    
        rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)
        print(f"分区数是 {rdd.getNumPartitions()}")
    
        #collect方法:将RDD中的每个分区的数据都发送到Driver,形成一个python list对象
        #Collect:分布式 → 本地集合
        print(f"RDD内容是 {rdd.collect()}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 获取RDD分区数:getNumPartitions API,获取RDD分区数量,返回值为INt数字
      用法:rdd.getNumPartitions()

    读取文件创建

    • textFile API,这个API可以读取本地数据也可以读取HDFS,还包括网络存储数据。
    # coding : utf8
    from  pyspark import SparkConf, SparkContext
    
    if __name__ == '__main__':
        # 创建sc对象
        conf = SparkConf().setMaster('local[*]').setAppName('test')
        sc = SparkContext(conf = conf)
    
        #读取本地文件数据
        rdd_file = sc.textFile("../Data/input/words.txt")
    
        #打印默认分区数与文件大小有关,如果是HDFS则与block块数量有关,与CPU无关
        print(f"rdd_file1分区数是 {rdd_file.getNumPartitions()}")
    
       #读取本地文件设置分区数设置分区数
        rdd_file1 = sc.textFile("../Data/input/words.txt", 3)
        rdd_file2 = sc.textFile("../Data/input/words.txt", 100)
        #最小分区数有参考值,spark有自己的判断
        print(f"rdd_file1分区数是 {rdd_file1.getNumPartitions()}")
        print(f"rdd_file2分区数是 {rdd_file2.getNumPartitions()}")
    
        #HDFS读取文件
        rdd_file3 = sc.textFile("hdfs://node1:8020/Test/WordCount.txt")
        print(f"rdd_file3分区数是 {rdd_file3.getNumPartitions()}")
    
        #打印内容
        print(f"RDD内容是 {rdd_file.collect()}")
        print(f"RDD内容是 {rdd_file1.collect()}")
        print(f"RDD内容是 {rdd_file2.collect()}")
        print(f"RDD内容是 {rdd_file3.collect()}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    在这里插入图片描述

    • wholeTextFile API,适合读取一堆小文件,代码如下:
    # coding : utf8
    from  pyspark import SparkConf, SparkContext
    
    if __name__ == '__main__':
        # 创建sc对象
        conf = SparkConf().setMaster('local[*]').setAppName('test')
        sc = SparkContext(conf = conf)
        # 读取一堆小文件
        rdd_files = sc.wholeTextFiles("../Data/input/tiny_files")
        # 提取小文件内容
        rdd_words = rdd_files.map(lambda x: x[1])
        #读取内容
        rdd_result = rdd_words.collect()
        print(f"内容是:{rdd_result}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    C++初阶-模板初阶
    spring5.0 源码解析(day05) initMessageSource();
    F检验临界值表(Frideman检验表)
    python使用matplotlib.pyplot画图时总会提示“内核似乎挂掉了,很快就会自动重启”
    Java学习路线(就业导向)
    vue中的模板语法
    【PowerMockito:编写单元测试过程中原方法没有注入的属性在跑单元测试时出现空指针】
    电子器件系列57:肖特基二极管(BAS7005)
    第九章 APP项目测试(54) 测试工具
    [Kettle] 生成记录
  • 原文地址:https://blog.csdn.net/sinat_31854967/article/details/127783472