• 【Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distinct 方法 - 对 RDD 中的元素去重 )






    一、RDD#filter 方法




    1、RDD#filter 方法简介


    RDD#filter 方法 可以 根据 指定的条件 过滤 RDD 对象中的元素 , 并返回一个新的 RDD 对象 ;

    RDD#filter 方法 不会修改原 RDD 数据 ;

    使用方法 :

    new_rdd = old_rdd.filter(func)
    
    • 1

    上述代码中 ,

    • old_rdd 是 原始的 RDD 对象 ,
    • 调用 filter 方法 , 传入的 func 参数是一个 函数 或者 lambda 匿名函数 , 用于定义过滤条件 ,
      • func 函数返回 True , 则保留元素 ;
      • func 函数返回 False , 则删除元素 ;
    • new_rdd 是过滤后的 RDD 对象 ;

    2、RDD#filter 函数语法


    RDD#filter 方法 语法 :

    rdd.filter(func)
    
    • 1

    上述 方法 接受一个 函数 作为参数 , 该 函数参数 定义了要过滤的条件 ; 符合条件的 元素 保留 , 不符合条件的删除 ;

    下面介绍 filter 函数中的 func 函数类型参数的类型 要求 ;


    func 函数 类型说明 :

    (T) -> bool
    
    • 1

    传入 filter 方法中的 func 函数参数 , 其函数类型 是 接受一个 任意类型 元素作为参数 , 并返回一个布尔值 , 该布尔值的作用是表示该元素是否应该保留在新的 RDD 中 ;

    • 返回 True 保留元素 ;
    • 返回 False 删除元素 ;


    3、代码示例 - RDD#filter 方法示例


    下面代码中的核心代码是 :

    # 创建一个包含整数的 RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
    
    # 使用 filter 方法过滤出偶数, 删除奇数
    even_numbers = rdd.filter(lambda x: x % 2 == 0)
    
    # 输出过滤后的结果
    print(even_numbers.collect())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    上述代码中 , 原始代码是 1 到 9 之间的整数 ;

    传入 lambda 匿名函数 , lambda x: x % 2 == 0 , 传入数字 ,

    • 如果是偶数返回 True , 保留元素 ;
    • 如果是 奇数 返回 False , 删除元素 ;

    代码示例 :

    """
    PySpark 数据处理
    """
    
    # 导入 PySpark 相关包
    from pyspark import SparkConf, SparkContext
    # 为 PySpark 配置 Python 解释器
    import os
    os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"
    
    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    # 创建 PySpark 执行环境 入口对象
    sc = SparkContext(conf=sparkConf)
    
    # 打印 PySpark 版本号
    print("PySpark 版本号 : ", sc.version)
    
    # 创建一个包含整数的 RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
    
    # 使用 filter 方法过滤出偶数, 删除奇数
    even_numbers = rdd.filter(lambda x: x % 2 == 0)
    
    # 输出过滤后的结果
    print(even_numbers.collect())
    
    # 停止 PySpark 程序
    sc.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    执行结果 :

    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
    23/08/02 21:07:55 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/08/02 21:07:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    PySpark 版本号 :  3.4.1
    [2, 4, 6, 8]
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述





    二、RDD#distinct 方法




    1、RDD#distinct 方法简介


    RDD#distinct 方法 用于 对 RDD 中的数据进行去重操作 , 并返回一个新的 RDD 对象 ;

    RDD#distinct 方法 不会修改原来的 RDD 对象 ;


    使用时 , 直接调用 RDD 对象的 distinct 方法 , 不需要传入任何参数 ;

    new_rdd = old_rdd.distinct()
    
    • 1

    上述代码中 , old_rdd 是原始 RDD 对象 , new_rdd 是元素去重后的新的 RDD 对象 ;


    2、代码示例 - RDD#distinct 方法示例


    代码示例 :

    """
    PySpark 数据处理
    """
    
    # 导入 PySpark 相关包
    from pyspark import SparkConf, SparkContext
    # 为 PySpark 配置 Python 解释器
    import os
    os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"
    
    # 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
    # setMaster("local[*]") 表示在单机模式下 本机运行
    # setAppName("hello_spark") 是给 Spark 程序起一个名字
    sparkConf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName("hello_spark")
    
    # 创建 PySpark 执行环境 入口对象
    sc = SparkContext(conf=sparkConf)
    
    # 打印 PySpark 版本号
    print("PySpark 版本号 : ", sc.version)
    
    # 创建一个包含整数的 RDD 对象
    rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 3, 4, 4, 5])
    
    # 使用 distinct 方法去除 RDD 对象中的重复元素
    distinct_numbers = rdd.distinct()
    
    # 输出去重后的结果
    print(distinct_numbers.collect())
    
    # 停止 PySpark 程序
    sc.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    执行结果 :

    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
    23/08/02 21:16:35 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/08/02 21:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    PySpark 版本号 :  3.4.1
    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
    [1, 2, 3, 4, 5]
    
    Process finished with exit code 0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

  • 相关阅读:
    基恩士软件的基本操作(四,快速编辑plc技巧)
    一文理解UDS安全访问服务(0x27)
    校招时间紧很迷茫?校招机会怎么把握?没有项目简历怎么写?
    【JAVA核心知识】深度了解MySql的innodb引擎
    发明专利申请指南
    DBSCAN 算法【python,机器学习,算法】
    nvidia-smi 报错
    AQS如何实现
    数据库数据采集利器FlinkCDC
    十三、CANdelaStudio入门-DTC编辑
  • 原文地址:https://blog.csdn.net/han1202012/article/details/132071321