• 【Python-Spark(大规模数据)】


    ■ Spark

    Apache Spark 是用于大规模数据处理的统一分析引擎。
    PySpark是由Spark官方开发的Python语言第三方库。

    ■ PySparl编程模型

    • 通过SparkContext对象,完成数据输入
    • 输入数据后得到RDD对象,对RDD对象进行迭代计算
    • 最终通过RDD对象的成员方法,完成数据输出工作
      在这里插入图片描述

    ■ 基础准备

    # 导包
    from pyspark import SparkConf, SparkContext
    # 创建SparkConf类对象
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    # 基于SparkConf类对象创建SparkContext对象
    sc = SparkContext(conf=conf)
    # 打印PySpark的运行版本
    print(sc.version)
    # 停止SparkContext对象的运行(停止PySpark程序)
    sc.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    ■ 数据输入

    """
    演示通过PySpark代码加载数据,即数据输入
    """
    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
    # rdd1 = sc.parallelize([1, 2, 3, 4, 5])
    # rdd2 = sc.parallelize((1, 2, 3, 4, 5))
    # rdd3 = sc.parallelize("abcdefg")
    # rdd4 = sc.parallelize({1, 2, 3, 4, 5})
    # rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
    #
    # # 如果要查看RDD里面有什么内容,需要用collect()方法
    # print(rdd1.collect())
    # print(rdd2.collect())
    # print(rdd3.collect())
    # print(rdd4.collect())
    # print(rdd5.collect())
    
    # 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
    rdd = sc.textFile("D:/hello.txt")
    print(rdd.collect())
    rdd.map()
    sc.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    ■ RDD的map成员方法的使用

    """
    演示RDD的map成员方法的使用
    """
    from pyspark import SparkConf, SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    # 通过map方法将全部数据都乘以10
    # def func(data):
    #     return data * 10
    
    rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
    
    print(rdd2.collect())
    # (T) -> U
    # (T) -> T
    
    # 链式调用
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    ■ RDD的flatMap成员方法的使用

    """
    演示RDD的flatMap成员方法的使用
    """
    from pyspark import SparkConf, SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    rdd = sc.parallelize(["itheima itcast 666", "itheima itheima itcast", "python itheima"])
    
    # 需求,将RDD数据里面的一个个单词提取出来
    rdd2 = rdd.flatMap(lambda x: x.split(" "))
    print(rdd2.collect())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    ■ RDD的reduceByKey成员方法的使用

    """
    演示RDD的reduceByKey成员方法的使用
    """
    from pyspark import SparkConf, SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
    # 求男生和女生两个组的成绩之和
    rdd2 = rdd.reduceByKey(lambda a, b: a + b)
    print(rdd2.collect())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    ■ 单词计数统计

    """
    完成练习案例:单词计数统计
    """
    
    # 1. 构建执行环境入口对象
    from pyspark import SparkContext, SparkConf
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    # 2. 读取数据文件
    rdd = sc.textFile("D:/hello.txt")
    # 3. 取出全部单词
    word_rdd = rdd.flatMap(lambda x: x.split(" "))
    # 4. 将所有单词都转换成二元元组,单词为Key,value设置为1
    word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
    # 5. 分组并求和
    result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
    # 6. 打印输出结果
    print(result_rdd.collect())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    ■ RDD的filter成员方法的使用

    """
    演示RDD的filter成员方法的使用
    """
    from pyspark import SparkConf, SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    # 对RDD的数据进行过滤
    rdd2 = rdd.filter(lambda num: num % 2 == 0)
    
    print(rdd2.collect())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    ■ RDD的distinct成员方法的使用

    """
    演示RDD的distinct成员方法的使用
    """
    from pyspark import SparkConf, SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10])
    # 对RDD的数据进行去重
    rdd2 = rdd.distinct()
    
    print(rdd2.collect())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    ■ RDD的sortBy成员方法的使用

    """
    演示RDD的sortBy成员方法的使用
    """
    from pyspark import SparkConf, SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 1. 读取数据文件
    rdd = sc.textFile("D:/hello.txt")
    # 2. 取出全部单词
    word_rdd = rdd.flatMap(lambda x: x.split(" "))
    # 3. 将所有单词都转换成二元元组,单词为Key,value设置为1
    word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
    # 4. 分组并求和
    result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
    # 5. 对结果进行排序
    final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
    print(final_rdd.collect())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    ■ 案例:JSON商品统计

    """
    完成练习案例:JSON商品统计
    需求:
    1. 各个城市销售额排名,从大到小
    2. 全部城市,有哪些商品类别在售卖
    3. 北京市有哪些商品类别在售卖
    """
    from pyspark import SparkConf, SparkContext
    import os
    import json
    os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # TODO 需求1: 城市销售额排名
    # 1.1 读取文件得到RDD
    file_rdd = sc.textFile("D:/orders.txt")
    # 1.2 取出一个个JSON字符串
    json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
    # 1.3 将一个个JSON字符串转换为字典
    dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
    # 1.4 取出城市和销售额数据
    # (城市,销售额)
    city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
    # 1.5 按城市分组按销售额聚合
    city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
    # 1.6 按销售额聚合结果进行排序
    result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    print("需求1的结果:", result1_rdd.collect())
    # TODO 需求2: 全部城市有哪些商品类别在售卖
    # 2.1 取出全部的商品类别
    category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
    print("需求2的结果:", category_rdd.collect())
    # 2.2 对全部商品类别进行去重
    # TODO 需求3: 北京市有哪些商品类别在售卖
    # 3.1 过滤北京市的数据
    beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
    # 3.2 取出全部商品类别
    result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
    print("需求3的结果:", result3_rdd.collect())
    # 3.3 进行商品类别去重
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    ■ 将RDD输出为Python对象

    """
    演示将RDD输出为Python对象
    """
    
    from pyspark import SparkConf, SparkContext
    import os
    import json
    os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    
    # collect算子,输出RDD为list对象
    rdd_list: list = rdd.collect()
    print(rdd_list)
    print(type(rdd_list))
    # reduce算子,对RDD进行两两聚合
    num = rdd.reduce(lambda a, b: a + b)
    print(num)
    # take算子,取出RDD前N个元素,组成list返回
    take_list = rdd.take(3)
    print(take_list)
    # count,统计rdd内有多少条数据,返回值为数字
    num_count = rdd.count()
    print(f"rdd内有{num_count}个元素")
    
    sc.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    ■ 将RDD输出到文件中

    """
    演示将RDD输出到文件中
    """
    
    from pyspark import SparkConf, SparkContext
    import os
    import json
    os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
    os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    
    sc = SparkContext(conf=conf)
    
    # 准备RDD1
    rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
    
    # 准备RDD2
    rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)
    
    # 准备RDD3
    rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)
    
    # 输出到文件中
    rdd1.saveAsTextFile("D:/output1")
    rdd2.saveAsTextFile("D:/output2")
    rdd3.saveAsTextFile("D:/output3")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    ■ PySpark综合案例

    """
    演示PySpark综合案例
    """
    
    from pyspark import SparkConf, SparkContext
    import os
    import json
    os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
    os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    conf.set("spark.default.parallelism", "1")
    sc = SparkContext(conf=conf)
    
    # 读取文件转换成RDD
    file_rdd = sc.textFile("D:/search_log.txt")
    # TODO 需求1: 热门搜索时间段Top3(小时精度)
    # 1.1 取出全部的时间并转换为小时
    # 1.2 转换为(小时, 1) 的二元元组
    # 1.3 Key分组聚合Value
    # 1.4 排序(降序)
    # 1.5 取前3
    result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\
        reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(3)
    print("需求1的结果:", result1)
    
    # TODO 需求2: 热门搜索词Top3
    # 2.1 取出全部的搜索词
    # 2.2 (词, 1) 二元元组
    # 2.3 分组聚合
    # 2.4 排序
    # 2.5 Top3
    result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
        reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(3)
    print("需求2的结果:", result2)
    
    # TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
    # 3.1 过滤内容,只保留黑马程序员关键词
    # 3.2 转换为(小时, 1) 的二元元组
    # 3.3 Key分组聚合Value
    # 3.4 排序(降序)
    # 3.5 取前1
    result3 = file_rdd.map(lambda x: x.split("\t")).\
        filter(lambda x: x[2] == '黑马程序员').\
        map(lambda x: (x[0][:2], 1)).\
        reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(1)
    print("需求3的结果:", result3)
    
    # TODO 需求4: 将数据转换为JSON格式,写出到文件中
    # 4.1 转换为JSON格式的RDD
    # 4.2 写出为文件
    file_rdd.map(lambda x: x.split("\t")).\
        map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
        saveAsTextFile("D:/output_json")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    ■ PySpark综合案例

    """
    演示PySpark综合案例
    """
    
    from pyspark import SparkConf, SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python'
    os.environ['HADOOP_HOME'] = "/export/server/hadoop-3.3.1"
    conf = SparkConf().setAppName("spark_cluster")
    conf.set("spark.default.parallelism", "24")
    sc = SparkContext(conf=conf)
    
    # 读取文件转换成RDD
    file_rdd = sc.textFile("hdfs://m1:8020/data/search_log.txt")
    # TODO 需求1: 热门搜索时间段Top3(小时精度)
    # 1.1 取出全部的时间并转换为小时
    # 1.2 转换为(小时, 1) 的二元元组
    # 1.3 Key分组聚合Value
    # 1.4 排序(降序)
    # 1.5 取前3
    result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\
        reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(3)
    print("需求1的结果:", result1)
    
    # TODO 需求2: 热门搜索词Top3
    # 2.1 取出全部的搜索词
    # 2.2 (词, 1) 二元元组
    # 2.3 分组聚合
    # 2.4 排序
    # 2.5 Top3
    result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
        reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(3)
    print("需求2的结果:", result2)
    
    # TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
    # 3.1 过滤内容,只保留黑马程序员关键词
    # 3.2 转换为(小时, 1) 的二元元组
    # 3.3 Key分组聚合Value
    # 3.4 排序(降序)
    # 3.5 取前1
    result3 = file_rdd.map(lambda x: x.split("\t")).\
        filter(lambda x: x[2] == '黑马程序员').\
        map(lambda x: (x[0][:2], 1)).\
        reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(1)
    print("需求3的结果:", result3)
    
    # TODO 需求4: 将数据转换为JSON格式,写出到文件中
    # 4.1 转换为JSON格式的RDD
    # 4.2 写出为文件
    file_rdd.map(lambda x: x.split("\t")).\
        map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
        saveAsTextFile("hdfs://m1:8020/output/output_json")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
  • 相关阅读:
    【c++提高1】单调队列
    UTF-16究竟如何编码
    Vue过度与动画
    排序题:数组中的第k个最大元素及出现的次数 - 数组的正态分布排序
    【MySQL】存储引擎简介、存储引擎特点、存储引擎区别
    数据结构之时间复杂度与空间复杂度
    python处理xml文件
    Python生成随机数字/字符
    C生万物 | 从浅入深理解指针【最后部分】
    DownloadWithEscaping/下载数据并且转义返回数据, DownloadWithCombine/下载数据并且组合数据 的使用
  • 原文地址:https://blog.csdn.net/sinat_23896491/article/details/138061931