• 【Python】PySpark 数据计算 ③ ( RDD#reduceByKey 函数概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 语法 | 代码示例 )






    一、RDD#reduceByKey 方法




    1、RDD#reduceByKey 方法概念


    RDD#reduceByKey 方法 是 PySpark 中 提供的计算方法 ,

    • 首先 , 对 键值对 KV 类型 RDD 对象 数据相同 键 key 对应的 值 value 进行分组 ,
    • 然后 , 按照 开发者 提供的 算子 ( 逻辑 / 函数 ) 进行 聚合操作 ;

    上面提到的 键值对 KV 型 的数据 , 指的是 二元元组 , 也就是 RDD 对象中存储的数据是 二元元组 ;

    元组 可以看做为 只读列表 ;

    二元元组 指的是 元组 中的数据 , 只有两个 , 如 :

    ("Tom", 18)
    ("Jerry", 12)
    
    • 1
    • 2

    PySpark 中 , 将 二元元组 中

    • 第一个元素 称为 键 Key ,
    • 第二个元素 称为 值 Value ;

    按照 键 Key 分组 , 就是按照 二元元组 中的 第一个元素 的值进行分组 ;

    [("Tom", 18), ("Jerry", 12), ("Tom", 17), ("Jerry", 13)]
    
    • 1

    将上述列表中的 二元元组 进行分组 , 按照 二元元组 第一个元素进行分组 ,

    • ("Tom", 18)("Tom", 17) 元组分为一组 , 在这一组中 , 将 18 和 17 两个数据进行聚合 , 如 : 相加操作 , 最终聚合结果是 35 ;
    • ("Jerry", 12)("Jerry", 13) 分为一组 ;

    如果 键 Key 有 A, B, C 三个 值 Value 要进行聚合 , 首先将 A 和 B 进行聚合 得到 X , 然后将 X 与 C 进行聚合得到新的值 Y ;


    具体操作方法是 : 先将相同 键 key 对应的 值 value 列表中的元素进行 reduce 操作 , 返回一个减少后的值,并将该键值对存储在RDD中 ;


    2、RDD#reduceByKey 方法工作流程


    RDD#reduceByKey 方法 工作流程 : reduceByKey(func) ;

    • 首先 , 对 RDD 对象中的数据 分区 , 每个分区中的相同 键 key 对应的 值 value 被组成一个列表 ;
    • 然后 , 对于 每个 键 key 对应的 值 value 列表 , 使用 reduceByKey 方法提供的 函数参数 func 进行 reduce 操作 , 将列表中的元素减少为一个 ;
    • 最后 , 将减少后的 键值对 存储在新的 RDD 对象中 ;

    3、RDD#reduceByKey 函数语法


    RDD#reduceByKey 语法 :

    reduceByKey(func, numPartitions=None)
    
    • 1
    • func 参数 : 用于聚合的函数 ;
    • numPartitions 是可选参数 , 指定 RDD 对象的分区数 ;

    传入的 func 函数的类型为 :

    (V, V) -> V
    
    • 1

    V 是泛型 , 指的是任意类型 , 上面的 三个 V 可以是任意类型 , 但是必须是 相同的类型 ;

    该函数 接收 两个 V 类型的参数 , 参数类型要相同 , 返回一个 V 类型的返回值 , 传入的两个参数和返回值都是 V 类型的 ;


    使用 reduceByKey 方法 , 需要保证函数的

    • 可结合性 ( associativity ) : 将两个具有 相同 参数类型 和 返回类型 的方法结合在一起 , 不会改变它们的行为的性质 ; 两个方法结合使用的结果与执行顺序无关 ;
    • 可重入性 ( commutativity ) : 在多任务环境下 , 一个方法可以被多个任务调用 , 而不会出现数据竞争或状态错误的问题 ;

    以便在并行计算时能够正确地聚合值列表 ;





    二、代码示例 - RDD#reduceByKey 方法




    1、代码示例


    在下面的代码中 , 要处理的数据是 列表 , 列表元素是 二元元组 ;

    [("Tom", 18), ("Tom", 3), ("Jerry", 12), ("Jerry", 21)]
    
    • 1

    对 值 Value 进行的聚合操作就是相加 , 也就是把同一个 键 Key 下的多个 Value 值 进行相加操作 ,

    # 应用 reduceByKey 操作,将同一个 Key 下的 Value 相加
    rdd2 = rdd.reduceByKey(lambda a, b: a + b)
    
    • 1
    • 2

    代码示例 :

    """
    PySpark 数据处理
    """
    
    # 导入 PySpark 相关包
    from pyspark import SparkConf, SparkContext
    # 为 PySpark 配置 Python 解释器
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"
    
    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    # 创建 PySpark 执行环境 入口对象
    sparkContext = SparkContext(conf=sparkConf)
    
    # 打印 PySpark 版本号
    print("PySpark 版本号 : ", sparkContext.version)
    
    # 将 字符串列表 转为 RDD 对象
    rdd = sparkContext.parallelize([("Tom", 18), ("Tom", 3), ("Jerry", 12), ("Jerry", 21)])
    
    # 应用 reduceByKey 操作,将同一个 Key 下的 Value 相加
    rdd2 = rdd.reduceByKey(lambda a, b: a + b)
    
    # 打印新的 RDD 中的内容
    print(rdd2.collect())
    
    # 停止 PySpark 程序
    sparkContext.stop()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    2、执行结果


    D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
    23/08/01 10:16:04 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/08/01 10:16:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    PySpark 版本号 :  3.4.1
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    [('Jerry', 33), ('Tom', 21)]
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述





    三、代码示例 - 使用 RDD#reduceByKey 统计文件内容




    1、需求分析


    给定一个 文本文件 word.txt , 文件内容为 :

    Tom Jerry
    Tom Jerry Tom
    Jack Jerry
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    读取文件中的内容 , 统计文件中单词的个数 ;

    思路 :

    • 读取数据到 RDD 中 ,
    • 然后 按照空格分割开 再展平 , 获取到每个单词 ,
    • 根据上述单词列表 , 生成一个 二元元组 列表 , 列表中每个元素的 键 Key 为单词 , 值 Value 为 数字 1 ,
    • 对上述 二元元组 列表 进行 聚合操作 , 相同的 键 Key 对应的 值 Value 进行相加 ;

    2、代码示例


    首先 , 读取文件 , 将 文件转为 RDD 对象 , 该 RDD 对象中 , 列表中的元素是 字符串 类型 , 每个字符串的内容是 整行的数据 ;

    # 将 文件 转为 RDD 对象
    rdd = sparkContext.textFile("word.txt")
    # 内容为 ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry']
    
    • 1
    • 2
    • 3

    然后 , 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表 , 然后展平数据解除嵌套 ;

    # 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
    #   然后展平数据解除嵌套
    rdd2 = rdd.flatMap(lambda element: element.split(" "))
    # 内容为 : ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry']
    
    • 1
    • 2
    • 3
    • 4

    再后 , 将 rdd 数据 的 列表中的元素 转为二元元组 , 第一个元素设置为 单词 字符串 , 第二个元素设置为 1

    # 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
    rdd3 = rdd2.map(lambda element: (element, 1))
    # 内容为 [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1)]
    
    • 1
    • 2
    • 3

    最后 , 应用 reduceByKey 操作 , 对相同 键 Key 对应的 值 Value 进行聚合操作 , 将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数 ;

    # 应用 reduceByKey 操作,
    #   将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
    rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
    # [('Tom', 3), ('Jack', 1), ('Jerry', 3)]
    
    • 1
    • 2
    • 3
    • 4

    代码示例 :

    """
    PySpark 数据处理
    """
    
    # 导入 PySpark 相关包
    from pyspark import SparkConf, SparkContext
    # 为 PySpark 配置 Python 解释器
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"
    
    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    # 创建 PySpark 执行环境 入口对象
    sparkContext = SparkContext(conf=sparkConf)
    
    # 打印 PySpark 版本号
    print("PySpark 版本号 : ", sparkContext.version)
    
    # 将 文件 转为 RDD 对象
    rdd = sparkContext.textFile("word.txt")
    print("查看文件内容 : ", rdd.collect())
    
    # 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
    #   然后展平数据解除嵌套
    rdd2 = rdd.flatMap(lambda element: element.split(" "))
    print("查看文件内容展平效果 : ", rdd2.collect())
    
    # 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
    rdd3 = rdd2.map(lambda element: (element, 1))
    print("转为二元元组效果 : ", rdd3.collect())
    
    # 应用 reduceByKey 操作,
    #   将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
    rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
    print("最终统计单词 : ", rdd4.collect())
    
    # 停止 PySpark 程序
    sparkContext.stop()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    执行结果 :

    D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
    23/08/01 11:25:24 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/08/01 11:25:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    PySpark 版本号 :  3.4.1
    查看文件内容 :  ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry']
    查看文件内容展平效果 :  ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry']
    转为二元元组效果 :  [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1)]
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    最终统计单词 :  [('Tom', 3), ('Jack', 1), ('Jerry', 3)]
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

  • 相关阅读:
    滑动窗口 解题思路
    4-2计算小于1000的正整数的平方根
    面经综合总结
    RestTemplate 集成拦截器
    JavaScript —— APIs(四)
    java毕业生设计校园旺角超市外卖平台计算机源码+系统+mysql+调试部署+lw
    数据在内存中的存储——练习3
    阿里面试官:聊聊如何格式化Instant
    MyBatis-Plus学习笔记(Spring版)——(五)MyBatis-Plus条件构造器和常用接口
    java算法题 Day86
  • 原文地址:https://blog.csdn.net/han1202012/article/details/132034137