• 【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )






    一、RDD 简介




    1、RDD 概念


    RDD 英文全称为 " Resilient Distributed Datasets " , 对应中文名称 是 " 弹性分布式数据集 " ;

    Spark 是用于 处理大规模数据 的 分布式计算引擎 ;

    RDD 是 Spark 的基本数据单元 , 该 数据结构 是 只读的 , 不可写入更改 ;

    RDD 对象 是 通过 SparkContext 执行环境入口对象 创建的 ;

    SparkContext 读取数据时 , 通过将数据拆分为多个分区 , 以便在 服务器集群 中进行并行处理 ;

    每个 RDD 数据分区 都可以在 服务器集群 中的 不同服务器节点 上 并行执行 计算任务 , 可以提高数据处理速度 ;


    2、RDD 中的数据存储与计算


    PySpark 中 处理的 所有的数据 ,

    • 数据存储 : PySpark 中的数据都是以 RDD 对象的形式承载的 , 数据都存储在 RDD 对象中 ;
    • 计算方法 : 大数据处理过程中使用的计算方法 , 也都定义在了 RDD 对象中 ;
    • 计算结果 : 使用 RDD 中的计算方法对 RDD 中的数据进行计算处理 , 获得的结果数据也是封装在 RDD 对象中的 ;

    PySpark 中 , 通过 SparkContext 执行环境入口对象 读取 基础数据到 RDD 对象中 , 调用 RDD 对象中的计算方法 , 对 RDD 对象中的数据进行处理 , 得到新的 RDD 对象 其中有 上一次的计算结果 , 再次对新的 RDD 对象中的数据进行处理 , 执行上述若干次计算 , 会 得到一个最终的 RDD 对象 , 其中就是数据处理结果 , 将其保存到文件中 , 或者写入到数据库中 ;





    二、Python 容器数据转 RDD 对象




    1、RDD 转换


    在 Python 中 , 使用 PySpark 库中的 SparkContext # parallelize 方法 , 可以将 Python 容器数据 转换为 PySpark 的 RDD 对象 ;


    PySpark 支持下面几种 Python 容器变量 转为 RDD 对象 :

    • 列表 list : 可重复 , 有序元素 ;
    • 元组 tuple : 可重复 , 有序元素 , 可读不可写 , 不可更改 ;
    • 集合 set : 不可重复 , 无序元素 ;
    • 字典 dict : 键值对集合 , 键 Key 不可重复 ;
    • 字符串 str : 字符串 ;

    2、转换 RDD 对象相关 API


    调用 SparkContext # parallelize 方法 可以将 Python 容器数据转为 RDD 对象 ;

    # 将数据转换为 RDD 对象
    rdd = sparkContext.parallelize(data)
    
    • 1
    • 2

    调用 RDD # getNumPartitions 方法 , 可以获取 RDD 的分区数 ;

    print("RDD 分区数量: ", rdd.getNumPartitions())
    
    • 1

    调用 RDD # collect 方法 , 可以查看 RDD 数据 ;

    print("RDD 元素: ", rdd.collect())
    
    • 1

    完整代码示例 :

    # 创建一个包含列表的数据
    data = [1, 2, 3, 4, 5]
    
    # 将数据转换为 RDD 对象
    rdd = sparkContext.parallelize(data)
    
    # 打印 RDD 的分区数和元素
    print("RDD 分区数量: ", rdd.getNumPartitions())
    print("RDD 元素: ", rdd.collect())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3、代码示例 - Python 容器转 RDD 对象 ( 列表 )


    在下面的代码中 ,

    首先 , 创建 SparkConf 对象 , 并将 PySpark 任务 命名为 " hello_spark " , 并设置为本地单机运行 ;

    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    然后 , 创建了一个 SparkContext 对象 , 传入 SparkConf 实例对象作为参数 ;

    # 创建 PySpark 执行环境 入口对象
    sparkContext = SparkContext(conf=sparkConf)
    
    • 1
    • 2

    再后 , 创建一个包含整数的简单列表 ;

    # 创建一个包含列表的数据
    data = [1, 2, 3, 4, 5]
    
    • 1
    • 2

    再后 , 并使用 parallelize() 方法将其转换为 RDD 对象 ;

    # 将数据转换为 RDD 对象
    rdd = sparkContext.parallelize(data)
    
    • 1
    • 2

    最后 , 我们打印出 RDD 的分区数和所有元素 ;

    # 打印 RDD 的分区数和元素
    print("RDD 分区数量: ", rdd.getNumPartitions())
    print("RDD 元素: ", rdd.collect())
    
    • 1
    • 2
    • 3

    代码示例 :

    """
    PySpark 数据处理
    """
    
    # 导入 PySpark 相关包
    from pyspark import SparkConf, SparkContext
    
    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    # 创建 PySpark 执行环境 入口对象
    sparkContext = SparkContext(conf=sparkConf)
    
    # 打印 PySpark 版本号
    print("PySpark 版本号 : ", sparkContext.version)
    
    # 创建一个包含列表的数据
    data = [1, 2, 3, 4, 5]
    
    # 将数据转换为 RDD 对象
    rdd = sparkContext.parallelize(data)
    
    # 打印 RDD 的分区数和元素
    print("RDD 分区数量: ", rdd.getNumPartitions())
    print("RDD 元素: ", rdd.collect())
    
    # 停止 PySpark 程序
    sparkContext.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    执行结果 :

    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
    23/07/30 20:11:35 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/07/30 20:11:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    PySpark 版本号 :  3.4.1
    RDD 分区数量:  12
    RDD 元素:  [1, 2, 3, 4, 5]
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述


    4、代码示例 - Python 容器转 RDD 对象 ( 列表 / 元组 / 集合 / 字典 / 字符串 )


    除了 列表 list 之外 , 还可以将其他容器数据类型 转换为 RDD 对象 , 如 : 元组 / 集合 / 字典 / 字符串 ;


    调用 RDD # collect 方法 , 打印出来的 RDD 数据形式 :

    • 列表 / 元组 / 集合 转换后的 RDD 数据打印出来都是列表 ;
    data1 = [1, 2, 3, 4, 5]
    data2 = (1, 2, 3, 4, 5)
    data3 = {1, 2, 3, 4, 5}
    
    # 输出结果
    rdd1 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
    rdd2 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
    rdd3 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 字典 转换后的 RDD 数据打印出来只有 键 Key , 没有值 ;
    data4 = {"Tom": 18, "Jerry": 12}
    
    # 输出结果
    rdd4 分区数量和元素:  12  ,  ['Tom', 'Jerry']
    
    • 1
    • 2
    • 3
    • 4
    • 字符串 转换后的 RDD 数据打印出来 是 列表 , 元素是单个字符 ;
    data5 = "Tom"
    
    # 输出结果
    rdd5 分区数量和元素:  12  ,  ['T', 'o', 'm']
    
    • 1
    • 2
    • 3
    • 4

    代码示例 :

    """
    PySpark 数据处理
    """
    
    # 导入 PySpark 相关包
    from pyspark import SparkConf, SparkContext
    
    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    # 创建 PySpark 执行环境 入口对象
    sparkContext = SparkContext(conf=sparkConf)
    
    # 打印 PySpark 版本号
    print("PySpark 版本号 : ", sparkContext.version)
    
    # 创建一个包含列表的数据
    data1 = [1, 2, 3, 4, 5]
    data2 = (1, 2, 3, 4, 5)
    data3 = {1, 2, 3, 4, 5}
    data4 = {"Tom": 18, "Jerry": 12}
    data5 = "Tom"
    
    # 将数据转换为 RDD 对象
    rdd1 = sparkContext.parallelize(data1)
    rdd2 = sparkContext.parallelize(data2)
    rdd3 = sparkContext.parallelize(data3)
    rdd4 = sparkContext.parallelize(data4)
    rdd5 = sparkContext.parallelize(data5)
    
    # 打印 RDD 的元素
    print("rdd1 分区数量和元素: ", rdd1.getNumPartitions(), " , ", rdd1.collect())
    print("rdd2 分区数量和元素: ", rdd2.getNumPartitions(), " , ", rdd2.collect())
    print("rdd3 分区数量和元素: ", rdd3.getNumPartitions(), " , ", rdd3.collect())
    print("rdd4 分区数量和元素: ", rdd4.getNumPartitions(), " , ", rdd4.collect())
    print("rdd5 分区数量和元素: ", rdd5.getNumPartitions(), " , ", rdd5.collect())
    
    # 停止 PySpark 程序
    sparkContext.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    执行结果 :

    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
    23/07/30 20:37:03 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/07/30 20:37:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    PySpark 版本号 :  3.4.1
    rdd1 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
    rdd2 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
    rdd3 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
    rdd4 分区数量和元素:  12  ,  ['Tom', 'Jerry']
    rdd5 分区数量和元素:  12  ,  ['T', 'o', 'm']
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述





    三、文件文件转 RDD 对象



    调用 SparkContext#textFile 方法 , 传入 文件的 绝对路径 或 相对路径 , 可以将 文本文件 中的数据 读取并转为 RDD 数据 ;


    文本文件数据 :

    Tom
    18
    Jerry
    12
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    代码示例 :

    """
    PySpark 数据处理
    """
    
    # 导入 PySpark 相关包
    from pyspark import SparkConf, SparkContext
    
    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    # 创建 PySpark 执行环境 入口对象
    sparkContext = SparkContext(conf=sparkConf)
    
    # 打印 PySpark 版本号
    print("PySpark 版本号 : ", sparkContext.version)
    
    # 读取文件内容到 RDD 中
    rdd = sparkContext.textFile("data.txt")
    
    # 打印 RDD 的元素
    print("rdd1 分区数量和元素: ", rdd.getNumPartitions(), " , ", rdd.collect())
    
    # 停止 PySpark 程序
    sparkContext.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    执行结果 :

    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
    23/07/30 20:43:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/07/30 20:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    PySpark 版本号 :  3.4.1
    rdd1 分区数量和元素:  2  ,  ['Tom', '18', 'Jerry', '12']
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述

  • 相关阅读:
    带你一分钟看懂 “Docker”
    第五章 多态
    数据分享|R语言逻辑回归、线性判别分析LDA、GAM、MARS、KNN、QDA、决策树、随机森林、SVM分类葡萄酒交叉验证ROC...
    2316. 统计无向图中无法互相到达点对数
    LintCode 89: k Sum (背包问题)
    【C】程序环境和预处理
    信号与槽和lambda表达式
    基于WPF技术的换热站智能监控系统03--实现左侧加载动画
    英语单词(二)
    轮式装载机铲斗模拟仿真
  • 原文地址:https://blog.csdn.net/han1202012/article/details/132006013