• 【Python大数据】PySpark


    CSDN不支持多个资源绑定,另外两个数据文件下载:

    订单数据-json.zip

    search-log.zip


    Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎

    简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据

    Python语言则是Spark重点支持的方向

    使用pip安装PySpark库:pip install pyspark

    安装好之后让我们简单使用一下吧

    from pyspark import SparkConf,SparkContext
    # 创建SparkConf类对象
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    # 基于SparkConf类对象创建SparkContext对象
    sc = SparkContext(conf=conf)
    # 打印PySpark的运行版本
    print(sc.version)
    # 停止SparkContext对象的运行(停止PySpark程序)
    sc.stop()
    

    PySpark的编程,主要分为如下三大步骤:

    步骤

    RDD对象&&数据输入

    RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

    PySpark针对数据的处理,都是以RDD对象作为载体,即:

    • 数据存储在RDD内
    • 各类数据的计算方法,也都是RDD的成员方法
    • RDD的数据计算方法,返回值依旧是RDD对象

    PySpark支持通过SparkContext对象的parallelize成员方法,将Python中的容器转换为PySpark的RDD对象

    from pyspark import SparkConf,SparkContext
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
    rdd1 = sc.parallelize([1,2,3,4,5])
    rdd2 = sc.parallelize((1,2,3,4,5))
    rdd3 = sc.parallelize("root")
    rdd4 = sc.parallelize({1,2,3,4,5})
    rdd5 = sc.parallelize({"company":"bilibili","work":"sleep"})
    # 查看RDD里面有什么内容用collect()方法
    print(rdd1.collect())
    print(rdd2.collect())
    print(rdd3.collect())
    print(rdd4.collect())
    print(rdd5.collect())
    sc.stop()
    

    运行结果:

    [1, 2, 3, 4, 5]
    [1, 2, 3, 4, 5]
    ['r', 'o', 'o', 't']
    [1, 2, 3, 4, 5]
    ['company', 'work']
    

    注意

    • 字符串会被拆分成一个个的字符,存入RDD对象
    • 字典仅有key会被存入RDD对象

    读取文件转RDD对象

    PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象

    rdd = sc.textFile(文件路径)
    

    数据计算方法

    RDD对象内置成员方法(算子)

    map算子

    # map算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    # 路径写自己的路径,注意是反斜杠"/",一般默认是"python.exe",笔者这样写是因为重命名过
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5])
    '''通过map方法将全部数据都乘以10
    def func(data):
        return data*10
    rdd2 = rdd.map(func)
    '''
    rdd2 = rdd.map(lambda x: x*10)
    '''链式调用 返回值类型还是此类型可以继续在后面"."
    例如上面的:conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    那么将全部数据乘以10再加1就可以这样写
    rdd2 = rdd.map(lambda x: x*10).map(lambda x: x+1)
    '''
    print(rdd2.collect())
    sc.stop()
    

    运行结果:

    [10, 20, 30, 40, 50]
    

    flatMap算子

    对rdd执行map操作,然后进行"解除嵌套"操作

    # 嵌套的list
    lst = [[1,2,3],[4,5,6],[7,8,9]]
    # 如果解除了嵌套
    lst = [1,2,3,4,5,6,7,8,9]
    '
    运行

    示例:

    # flatMap算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize(["apple banana orange","tomato potato","wine beer"])
    # 需求:将rdd数据里面的单词一个个提取出来
    rdd1 = rdd.map(lambda x: x.split(" "))
    print(rdd1.collect())
    # 输出结果:[['apple', 'banana', 'orange'], ['tomato', 'potato'], ['wine', 'beer']]
    # 是嵌套列表的形式,需要解嵌套
    rdd2 = rdd.flatMap(lambda x: x.split(" "))
    print(rdd2.collect())
    # 输出结果:['apple', 'banana', 'orange', 'tomato', 'potato', 'wine', 'beer']
    sc.stop()
    

    reduceByKey算子

    功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。

    语法:

    rdd.reduceByKey(func)
    # func:(V,V) -> V
    # 接收2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。
    

    KV型RDD指二元元组

    reduceByKey中的聚合逻辑:

    比如,有[1,2,3,4,5],聚合函数是:lambda a,b: a+b
    

    聚合逻辑

    示例:

    # reduceByKey算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('男',95), ('女',90), ('男',88), ('女',92)])
    # 求男生和女生两个组的成绩之和
    rdd2 = rdd.reduceByKey(lambda a,b: a+b)
    print(rdd2.collect())
    sc.stop()
    

    运行结果:

    [('男', 183), ('女', 182)]
    

    综合案例1

    前置:在当前目录下创建一个"words.txt",里面存放内容如下:

    sheep tiger duck pig duck
    pig pig tiger sheep sheep
    tiger duck pig duck
    sheep sheep pig tiger
    

    需求:统计单词出现个数

    实现:

    # 综合案例1
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    # 读取数据文件
    rdd = sc.textFile("words.txt")
    # 取出全部单词
    word_rdd = rdd.flatMap(lambda x: x.split(" "))
    # 将所有单词都转换为二元元组,单词为key,value设置为1
    word_with_one_rdd = word_rdd.map(lambda word: (word,1))
    # 分组并求和
    result_rdd = word_with_one_rdd.reduceByKey(lambda a,b: a+b)
    print(result_rdd.collect())
    

    运行结果:

    [('sheep', 5), ('tiger', 4), ('duck', 4), ('pig', 5)]
    

    filter算子

    功能:对数据进行过滤

    语法:

    rdd.filter(func)
    # func:(T) -> bool 传入1个随意类型的参数,返回值为True(被保留)或False(被丢弃)
    

    示例:

    # filter算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5])
    # 对数据进行过滤,保留偶数
    rdd1 = rdd.filter(lambda num: num%2 == 0)
    print(rdd1.collect())
    

    运行结果:

    [2, 4]
    

    distinct算子

    功能:对RDD数据进行去重,返回新RDD;直接调用,无需传参

    示例:

    # distinct算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,2,3,3,3,4,4,4,4])
    # 对数据进行去重
    rdd1 = rdd.distinct()
    print(rdd1.collect())
    

    运行结果:

    [1, 2, 3, 4]
    

    sortBy算子

    功能:对RDD数据基于指定规则进行排序

    语法:

    rdd.sortBy(func, ascending=False, numPartitions=1)
    '''
    func:(T) -> U:告知按照rdd中的那个数据进行排序,比如
    lambda x: x[1] 表示按照rdd中的第二列元素进行排序
    ascending: True升序 False降序
    numPartitions: 用多少分区排序(与分布式有关,目前设置为1即可)
    '''
    

    示例:

    # sortBy算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([("C/C++","赵老师",13),("Java","孙老师",21),("Python","王老师",9)])
    # 假设有这样一组数据,内容分别为语言科目、科任老师、选课人数,对数据按照选课人数降序排序
    rdd1 = rdd.sortBy(lambda x: x[2], ascending=False, numPartitions=1)
    print(rdd1.collect())
    

    运行结果:

    [('Java', '孙老师', 21), ('C/C++', '赵老师', 13), ('Python', '王老师', 9)]
    

    综合案例2

    本案例所需数据在开头下载

    数据说明:内容为订单相关信息;格式为json,一行有多个json数据,用" | "分隔

    需求:

    1. 各个城市的销售额排名(降序)
    2. 各个城市的售卖商品类别
    3. 北京市的售卖商品类别

    实现:

    # 综合案例2
    from pyspark import SparkConf,SparkContext
    import json
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    # TODO 需求1:各个城市的销售额排名(降序)
    # 1.1 读取文件得到RDD
    file_rdd = sc.textFile("orders.txt")
    # 1.2 取出一个个JSON字符串
    json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
    # 1.3 将JSON字符串转换为字典
    dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
    #print(dict_rdd.collect())
    # 1.4 取出城市和销售额数据
    city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
    # 1.5 按城市分组,按销售额聚合
    city_result_rdd = city_with_money_rdd.reduceByKey(lambda a,b: a+b)
    # 1.6 按销售额聚合结果进行排序
    result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    print("需求1的结果:",result1_rdd.collect())
    # TODO 需求2:各个城市的售卖商品类别
    # 取出全部商品类别并进行去重
    category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
    print("需求2的结果:",category_rdd.collect())
    # TODO 需求3:北京市的售卖商品类别
    category_beijing_rdd = dict_rdd.filter(lambda x: x['areaName']=='北京').map(lambda x: x['category']).distinct()
    print("需求3的结果:",category_beijing_rdd.collect())
    

    运行结果:

    需求1的结果: [('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]
    需求2的结果: ['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']
    需求3的结果: ['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']
    

    数据输出

    collect算子

    • 功能:将RDD各个分区的数据统一收集到Driver中,形成一个List对象
    • 用法:rdd.collect()
    • 返回值是一个list

    前面一直在用,不再赘述

    reduce算子

    功能:对RDD数据集按照传入的逻辑进行聚合

    语法:

    rdd.reduce(func)
    # func: (T,T) -> T
    类比reduceByKey
    返回计算结果
    

    示例:

    # reduce算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5])
    result = rdd.reduce(lambda a,b: a+b)
    print(result)# 15
    

    take算子

    功能:取RDD的前N个元素,组合成list返回

    示例:

    # take算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5])
    take_list = rdd.take(3)
    print(take_list)
    

    运行结果:

    [1, 2, 3]
    

    count算子

    功能:计算RDD数据的数目并返回这个数值

    示例:

    # count算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1,2,3,4,5])
    count = rdd.count()
    print(count)# 5
    

    saveAsTextFile算子

    功能:将RDD的数据写入文本文件中

    支持本地写出,hdfs等文件系统

    需要先进行相关配置

    本文开头提供配置下载,内附说明

    示例:

    # saveAsTextFile算子
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    os.environ['HADOOP_HOME'] = "C:/00_Root-tools/hadoop-3.0.0"
    # hadoop安装路径
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    conf.set("spark.default.parallelism","1") # 设置全局并行度为1
    sc = SparkContext(conf=conf)
    rdd1 = sc.parallelize([1,2,3,4,5])
    rdd2 = sc.parallelize([("Hello",3), ("Spark",5), ("Hi",7)])
    rdd3 = sc.parallelize([[1,3,5], [6,7,9], [11,13,11]])
    # 输出到文件中,设置路径
    rdd1.saveAsTextFile("E:/output1")
    rdd2.saveAsTextFile("E:/output2")
    rdd3.saveAsTextFile("E:/output3")
    

    修改rdd分区为1个

    方式1:SparkConf对象设置属性全局并行度为1

    conf.set("spark.default.parallelism","1") # 设置全局并行度为1
    

    方式2:创建RDD的时候设置(parallelize方法传入numSlices参数为1)

    rdd1 = sc.parallelize([1,2,3,4,5], numSlices=1)#或
    rdd1 = sc.parallelize([1,2,3,4,5], 1)
    

    综合案例3

    数据:search_log.txt

    在本文开头提供下载

    需求:读取文件转换成RDD,并完成:

    1. 打印输出:热门搜索时间段(小时精度)Top3
    2. 打印输出:热门搜索词Top3
    3. 打印输出:统计"黑马程序员"关键字在哪个时段被搜索最多
    4. 将数据转换为JSON格式,写出为文件

    示例:

    # 综合案例3
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Leon/AppData/Local/Programs/Python/Python310/python310.exe"
    os.environ['HADOOP_HOME'] = "C:/00_Root-tools/hadoop-3.0.0"
    # hadoop安装路径
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    conf.set("spark.default.parallelism","1") # 设置全局并行度为1
    sc = SparkContext(conf=conf)
    # 读取文件转换成RDD
    file_rdd = sc.textFile("search_log.txt")
    # TODO 需求1:打印输出:热门搜索时间段(小时精度)Top3
    '''
    1.1 取出全部的时间并转换为小时
    1.2 转换为(小时,1)的二元元组
    1.3 Key分组聚合Value
    1.4 排序(降序)
    1.5 取前三
    '''
    # 链式调用
    '''
    file_rdd.map(lambda x: x.split("\t"))\
        .map(lambda  x: x[0][:2])\
        .map(lambda x: (x,1))\
    3个map可以写成1个
    '''
    result1 = \
    file_rdd.map(lambda x: (x.split("\t")[0][:2],1))\
        .reduceByKey(lambda a,b: a+b)\
        .sortBy(lambda x: x[1],ascending=False,numPartitions=1)\
        .take(3)
    print("需求1的结果:",result1)
    # TODO 需求2:打印输出:热门搜索词Top3
    '''
    2.1 取出全部的搜索词
    2.2 (词,1) 二元元组
    2.3 分组聚合
    2.4 排序
    2.5 取前三
    '''
    result2 = \
    file_rdd.map(lambda x: (x.split("\t")[2],1))\
        .reduceByKey(lambda a,b: a+b)\
        .sortBy(lambda x: x[1],ascending=False,numPartitions=1)\
        .take(3)
    print("需求2的结果:",result2)
    # TODO 需求3:打印输出:统计"黑马程序员"关键字在哪个时段被搜索最多
    '''
    3.1 过滤内容,只保留"黑马程序员"关键字
    3.2 转换为(小时,1)的二元元组
    3.3 分组聚合
    3.4 排序
    3.5 取前1
    '''
    result3 = \
    file_rdd.map(lambda x: x.split("\t"))\
        .filter(lambda x: x[2]=="黑马程序员")\
        .map(lambda x: (x[0][:2],1))\
        .reduceByKey(lambda a,b: a+b)\
        .sortBy(lambda x: x[1],ascending=False,numPartitions=1)\
        .take(1)
    print("需求3的结果:",result3)
    # TODO 需求4:将数据转换为JSON格式,写出为文件
    # 转换为JSON格式的RDD并写出为文件
    file_rdd.map(lambda x: x.split("\t"))\
        .map(lambda x: {"time": x[0], "user_id":x[1], "key_word":x[2], "rank1":x[3], "rank2":x[4], "url":x[5]})\
        .saveAsTextFile("E:/output_json")
    

    输出结果:

    需求1的结果: [('20', 3479), ('23', 3087), ('21', 2989)]
    需求2的结果: [('scala', 2310), ('hadoop', 2268), ('博学谷', 2002)]
    需求3的结果: [('22', 245)]
    //生成文件夹
    

    PySpark代码是可以在大数据集群上运行的

  • 相关阅读:
    CNI设计解读
    Spark中sc.textFile()读取文件路径
    HarmonyOS NEXT应用开发—使用弹簧曲线实现抖动动画及手机振动效果案例
    工业交换机的“自适应”是什么意思?
    介绍一下cpu主频越高越好吗?
    如何快速识别图片中的文字?建议使用者两种方法
    【Leetcode】2864. 最大二进制奇数
    A Survey on Bias and Fairness in Machine Learning 阅读笔记
    踹他GPT 之 弄个大乐透助手
    python+opencv+深度学习实现二维码识别 计算机竞赛
  • 原文地址:https://blog.csdn.net/qq_45951891/article/details/138923696