目录
4.1 RDD对象概念:PySpark支持多种数据的输入,完成后会返回RDD类的对象;
4.2 Python数据容器转RDD对象.parallelize(数据容器对象)
4.4 读取文件转RDD对象:通过SparkContext入口对象来读取文件,构建RDD对象;
5.1 给Spark设置环境变量(不设置的时候,控制台会报错,出现找不到python.exe解释器的情况)
5.2 RDD的map方法:将RDD的数据根据函数进行一条条处理
5.3 RDD的flatMap方法:基本和map一样,但是多一个功能:将嵌套list给转成单list;[[1, 2, 3], [4, 5, 6]]转成[1, 2, 3, 4, 5, 6]
5.4 RDD的reduceByKey方法:将key分组后进行value逻辑处理;
6. 数据计算案例(一):完成使用PySpark进行单词技术的案例
7.1 RDD的filter方法:传入T泛型数据,返回bool,为false 的数据丢弃,为true的数据保留;(函数对RDD数据逐个处理,得到True的保留至返回值的RDD中)
7.2 RDD的distinct方法:对RDD数据进行去重,返回新RDD;
7.3 RDD的sortBy方法:对RDD的容器按照指定规则排序,返回新RDD;
8.5 需求三实现:过滤除北京的数据,并只返回一个参数category,是list列表,并进行去重,去重后的结果进行collect输出;
导航:
Python第二语言(一、Python start)-CSDN博客
Python第二语言(二、Python语言基础)-CSDN博客
Python第二语言(三、Python函数def)-CSDN博客
Python第二语言(四、Python数据容器)-CSDN博客
Python第二语言(五、Python文件相关操作)-CSDN博客
Python第二语言(九、Python第一阶段实操)-CSDN博客
Python第二语言(十、Python面向对象(上))-CSDN博客
Python第二语言(十一、Python面向对象(下))-CSDN博客
Python第二语言(十二、SQL入门和实战)-CSDN博客
Python第二语言(十三、PySpark实战)-CSDN博客
PySpark第三方库:
安装命令: pip install pyspark
加速下载命令:pip install -i
https://pypi.tuna.tsinghua.edu.cn/simple pyspark
setMaster(xxx).\setAppName(xxx)
是用来控制集群的代码,图中代码用的是单机的;setAppName
是Spark任务的名称;- # 导包
- from pyspark import SparkConf, SparkContext
-
- # 创建SparkConf类对象
- conf = SparkConf().setMaster("local[*]").\
- setAppName("test_spark_app")
-
- # 基于SparkConf类对象创建SparkContext类对象
- sc = SparkContext(conf=conf)
-
- # 打印PySpark的运行版本
- print(sc.version)
-
- # 停止SparkContext对象的运行
- sc.stop()
SparkContext类对象,是PySpark编程中一切功能的入口;
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets);
- from pyspark import SparkConf, SparkContext
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- # 通过parallelize方法将Python对象加载到Spark内,称为RDD对象
- rdd1 = sc.parallelize([1, 2, 3, 4, 5])
- rdd2 = sc.parallelize((1, 2, 3, 4, 5))
- rdd3 = sc.parallelize("abcdefg")
- rdd4 = sc.parallelize({1, 2, 3, 4, 5})
- rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
-
- # 使用collect方法查看RDD中的内容
- print(rdd1.collect())
- print(rdd2.collect())
- print(rdd3.collect())
- print(rdd4.collect())
- print(rdd5.collect())
-
- sc.stop()
- from pyspark import SparkConf, SparkContext
-
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- # 通过textFile方法,读取文件数据加载到Spark内,成为RDD对象
- rdd = sc.textFile("dataText")
-
- # 打印RDD内容
- print(rdd.collect())
- sc.stop()
小结:
os.path.exists
返回值为True或False;
- from pyspark import SparkConf, SparkContext
- import os
-
- # 配置Spark环境变量
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
-
- # 检查PYSPARK_PYTHON路径
- print(os.path.exists('C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'))
- # 检查PYSPARK_DRIVER_PYTHON路径
- print(os.path.exists('C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'))
1. 介绍:
(T) → U
:T代表传入一个参数,U代表一个返回值;(意思代表传入的参数是一个,还有一个返回值,T是泛型,不用指定数据类型)(T) → T
:T代表传入一个参数,T代表一个返回值;(意思代表传入的参数是一个,还有一个返回值,T是泛型,传入的是什么值,那么返回的就是什么类型)2. func函数传递:
func函数作为参数:代表的是RDD中的每个值,都会进行func函数的处理;是RDD中的每一个元素都会被RDD处理一遍;
可以简写成:rdd2 = rdd.map(lambda x: x * 10) # 简写的函数
;
3. 案例:
Python worker exited unexpectedly (crashed)
,降低版本即可,我用的版本10;- from pyspark import SparkConf, SparkContext
- import os
-
- # 配置Spark环境变量
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
-
- sc = SparkContext(conf=conf)
-
- # 准备一个RDD
- rdd = sc.parallelize([1, 2, 3, 4, 5])
- rdd2 = rdd.map(lambda x: x * 10) # 简写的函数
- print(rdd2.collect())
- sc.stop()
4. map链式调用:
- from pyspark import SparkConf, SparkContext
- import os
-
- # 配置Spark环境变量
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
-
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([1, 2, 3, 4, 5])
-
- rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5) # 链式调用:将map进行第一个*10数据计算,再进行map+5数据计算
-
- print(rdd2.collect())
5. 小结:
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize(["zhangSan lisi yiyi", "zhangSan yiyi wangWu", "wangWu yiyi zhangSan"])
-
- print(rdd.map(lambda x: x.split(" ")).collect())
- print("-----------------------------------------")
- print(rdd.flatMap(lambda x: x.split(" ")).collect()) # 将嵌套list转成单list,对数据接触嵌套
二元元组:[('a', 1), ('a', 1), ('b', 1)]
这就是二元元组,元组中只有两个元素;
自动按照key分组,完成组内数据(value)的聚合操作:就是会按照元组中的key,就是'a', 'a', 'b'
进行key的value聚合,1, 1, 1
是value;(value聚合的逻辑是,按照传入的func函数逻辑来进行聚合)
假设这是二元元组数据要进行reduceByKey算子处理:
reduceByKey计算方式:
1. 思路:
lambda a, b: a+b
进行处理,也即是分组后,a=a+a, b=b+b+b;结果[('b', 3), ('a', 2)]
lambda a, b: a+b
中表示的是b:1, 1, 1
的三个值,去进行函数处理的时候,先是第一个1和第二1进行相加,这时候相加是a+b
,分组后与key无关系,那么第一个1和第二个1相加后等于2,这时候发现还有第三个1,这时候再次把第一次相加的结果,与第三个1进行a+b
处理,2+1
;是前后者参数的相加处理;最终得到按照key分组聚合value的结果;lambda a, b: a + b
操作,每个组中的数据,进行a + b操作,意思就是将当前组的所有value进行相加操作;2. 实现:
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
-
- result = rdd.reduceByKey(lambda a, b: a + b) # 分组计算
- print(result.collect())
思路:
先将字符串进行读取,然后按照空格分割['key', 'key']
,在进行分割后的数组重组为(key, 1)
的形式,后面利用rdd的reduceByKey方法,将分组后的key,进行聚合操作,因为value都是1,所以可以得出对单词出现的次数,进行统计操作;
根据 (key, 1)
重组后的数据应该是:
[('key1', 1), ['key1', 1], ('key2', 1), ['key2', 1]]
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- # 1.读取数据文件
- """
- 假设你有一个大文件,里面有 300MB 的数据,如果你指定分区数为 3,Spark 会尝试将这个文件分成 3 个分区,每个分区大约 100MB。
- 如果你的集群有 3 个节点,每个节点可以并行处理一个分区,这样就可以更快地完成任务。
- """
- file = sc.textFile("word", 3) # ("xx" , 3):3是指文件被分成的最小分区数(partitions)
-
- # 2.将所有单词读取出来
- words = file.flatMap(lambda line: line.split(' ')) # 结果:['python', 'java', ...]
-
- # 3.将所有单词加1做value
- word_one = words.map(lambda x: (x, 1)) # 结果:[('python', 1), ('java', 1), ('php', 1), ('c#', 1),...]
-
- # 4.分组并求和
- result = word_one.reduceByKey(lambda a, b: a + b)
-
- # 5.打印结果
- print(result.collect())
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([1, 2, 3, 4, 5])
-
- # 保留基数
- print(rdd.filter(lambda x: x % 2 == 1).collect())
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([1, 1, 2, 3, 4, 5, 4, 5])
-
- # 对rdd对象进行去重
- print(rdd.distinct().collect())
func: (T) → U
:告知按照rdd中的哪个数据进行排序,比如lambda x: x[1]
表示按照rdd中的第二列元素进行排序;numPartitions
:目前默认就为1;结果:
按照元组tople中的第二位元素进行排序,按照降序;
lambda x: x[1]
:计算规则,将所有容器的每一个元素按照函数规则处理,x是遍历的元组,x[1]
是传入的元组的第二位元素,所以规则就是按照元组的第二位元素进行降序排序;
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- rdd = sc.parallelize([("zhangSan", 99), ("lisi", 88), ("wangWu", 100)])
-
- # 对结果进行排序
- final_rdd = rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
- print(final_rdd.collect())
需求一:各个城市销售额排名,从大到小;
先按行读取文件,并对json进行split分割,按照|符号,得到最终的字典,使用Spark.reduceByKey进行分组,分组时传递func计算函数,将所有分组后的城市销售额进行a+b的形式,聚合起来,最终得到结果,并按照降序的排序方式排序输出;
需求二:全部城市,有哪些商品类别在售卖;
文件读取后,将城市的categpry商品类别,distinct使用去重;
需求三:北京市有哪些商品类别在售卖;
将除了北京市的所有数据进行filter过滤,过滤后只留下category并进行去重得到结果;
- {"id":1,"timestamp":"2024-06-01T01:03.00Z","category":"电脑","areaName":"杭州","money":"3000"}|{"id":2,"timestamp":"2024-06-01T01:03.00Z","category":"电脑","areaName":"杭州","money":"3500"}
- {"id":3,"timestamp":"2024-06-01T01:03.00Z","category":"食品","areaName":"杭州","money":"3000"}|{"id":4,"timestamp":"2024-06-01T01:03.00Z","category":"食品","areaName":"杭州","money":"3700"}
- {"id":5,"timestamp":"2024-06-01T01:03.00Z","category":"服饰","areaName":"北京","money":"3000"}|{"id":6,"timestamp":"2024-06-01T01:03.00Z","category":"服饰","areaName":"北京","money":"3900"}
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- # 1.读取文件得到RDD
- file_rdd = sc.textFile("orders")
- # 2. 取出一个个JSON字符串
- json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
- # 3. 将一个个JSON字符串转换为字典
- dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
-
- # print(dict_rdd.collect())
-
- # 4.取出城市和销售额数据
- city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
-
- # 5.按城市分组按销售额聚合
- city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
-
- # 6.按销售额聚合结果进行排序
- result_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
- print("需求1的结果:", result_rdd.collect())
前三步数据结果:
完整数据结果:
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- # 1.读取文件得到RDD
- file_rdd = sc.textFile("orders")
- # 2. 取出一个个JSON字符串
- json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
- # 3. 将一个个JSON字符串转换为字典
- dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
- # 4.取出全部的商品类别
- category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
- print("需求2的结果:", category_rdd.collect())
- from pyspark import SparkConf, SparkContext
- import os
-
- if __name__ == '__main__':
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- # 1.读取文件得到RDD
- file_rdd = sc.textFile("orders")
- # 2. 取出一个个JSON字符串
- json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
- # 3. 将一个个JSON字符串转换为字典
- dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
-
- # 4. 过滤北京的数据
- beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
- # 5.取出全部商品类别
- result_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
- print("需求3的结果:", result_rdd.collect())
- from pyspark import SparkConf, SparkContext
- import os
-
- os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
- conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
- sc = SparkContext(conf=conf)
-
- if __name__ == '__main__':
- rdd = sc.parallelize([1, 2, 3, 4, 5])
-
- # collect算子,输出RDD为list对象
- rdd_list: list = rdd.collect()
- print("collect算子结果:", rdd_list)
- print("collect算子类型是:", type(rdd_list))
-
- # reduce算子,对RDD进行两两聚合
- num = rdd.reduce(lambda a, b: a + b)
- print("reduce算子结果:", num)
-
- # take算子,取出RDD前N个元素,组成list返回
- take_list = rdd.take(3)
- print("take算子结果:", take_list)
-
- # count,统计rdd内有多少条数据,返回值为数字
- num_count = rdd.count()
- print("count算子结果:", num_count)