• 使用Spark探索数据


    需求分析

    使用Spark来探索数据是一种高效处理大规模数据的方法,需要对数据进行加载、清洗和转换,选择合适的Spark组件进行数据处理和分析。需求分析包括确定数据分析的目的和问题、选择合适的Spark应用程序和算法、优化数据处理流程和性能、可视化和解释分析结果。同时,需要熟悉Spark的基本概念和操作,掌握Spark编程和调优技巧,以确保数据探索的准确性和效率。

    系统实现

    了解实验目的

    掌握python on Spark的使用理解探索数据的意义和方法,掌握使用Spark探索数据的过程。

    1.实验整体流程分析:

    • 准备环境,安装Hadoop和Spark组件
    • 准备数据,采用开源movielens数据集
    • 探索用户数据
    • 探索电影数据
    • 探索电影评级数据

     2.准备数据:

    • 打开终端,启动Hadoop和Spark集群

    • 下载相关数据集

    • 将数据集解压到/usr/目录下

    • 上传数据至HDFS
    1. # hadoop fs -mkdir /data
    2. # hadoop fs -ls /
    3. # hadoop fs -put /usr/data/u.user /data/u.user
    4. # hadoop fs -put /usr/data/u.data /data/u.data
    5. # hadoop fs -put /usr/data/u.genre /data/u.genre
    6. # hadoop fs -put /usr/data/u.info /data/u.info
    7. # hadoop fs -put /usr/data/u.item /data/u.item
    8. # hadoop fs -put /usr/data/u.occupation /data/u.occupation
    9. # hadoop fs -ls /data

    上传后的HDFS的data目录结构如图所示

    3.探索用户数据:

    • 打开终端,执行pyspark命令,进入Spark的python环境

    • 打印首行记录

    运行结果如下

    • 分别统计用户、性别和职业的个数
    1. # 以' | '切分每列,返回新的用户RDD
    2. user_fields = user_data.map(lambda line: line.split("|"))
    3. # 统计用户数
    4. num_users = user_fields.map(lambda fields: fields[0]).count()
    5. # 统计性别数
    6. num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
    7. # 统计职业数
    8. num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
    9. # 统计邮编数
    10. num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
    11. # 返回结果
    12. print ("用户数: %d, 性别数: %d, 职业数: %d, 邮编数: %d" % (num_users, num_genders, num_occupations, num_zipcodes))

    运行结果如下

    • 查看年龄分布情况,并用plt.show绘制

    • 查看职业分布情况,同样绘制图
    1. # 并行统计各职业人数的个数,返回职业统计RDD后落地
    2. count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
    3. # 生成x/y坐标轴
    4. x_axis1 = np.array([c[0for c in count_by_occupation])
    5. y_axis1 = np.array([c[1for c in count_by_occupation])
    6. x_axis = x_axis1[np.argsort(x_axis1)]
    7. y_axis = y_axis1[np.argsort(y_axis1)]
    8. # 生成x轴标签
    9. pos = np.arange(len(x_axis))
    10. width = 1.0
    11. ax = plt.axes()
    12. ax.set_xticks(pos + (width / 2))
    13. ax.set_xticklabels(x_axis)
    14. # 绘制职业人数条状图
    15. plt.xticks(rotation=30)
    16. plt.bar(pos, y_axis, width, color='lightblue')
    17. plt.show()

     

    • 统计各职业人数

    4.探索电影数据:

    • 重新打开终端,执行pyspark命令,进入Spark的python环境

    • 打印首行记录

    • 查看电影的数量

    • 过滤掉没有发现时间信息的记录

    注意,输入时需要手动缩进

    • 查看影片的年龄分布并绘图

    5.探索评级数据:

    • 重新打开终端,进入Spark的bin目录下,执行pyspark命令,进入Spark的python环境

    • 打印首行记录

    • 查看有多少人参与了评分

    • 统计最高、最低、平均、中位评分,以及平均每个用户的评分次数
    1. # 以' | '切分每列,返回新的用户RDD
    2. user_fields = user_data.map(lambda lineline.split("|"))
    3. # 统计用户数
    4. num_users = user_fields.map(lambda fields: fields[0]).count()
    5. # 获取电影数量
    6. num_movies = movie_data.count()
    7. # 获取评分RDD
    8. rating_data = rating_data_raw.map(lambda lineline.split("\t"))
    9. ratings = rating_data.map(lambda fields: int(fields[2]))
    10. # 计算最大/最小评分
    11. max_rating = ratings.reduce(lambda x, y: max(x, y))
    12. min_rating = ratings.reduce(lambda x, y: min(x, y))
    13. # 计算平均/中位评分
    14. mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
    15. median_rating = np.median(ratings.collect())
    16. # 计算每个观众/每部电影平均打分/被打分次数
    17. ratings_per_user = num_ratings / num_users
    18. ratings_per_movie = num_ratings / num_movies
    19. # 输出结果
    20. print("最低评分: %d" % min_rating)
    21. print("最高评分: %d" % max_rating)
    22. print("平均评分: %2.2f" % mean_rating)
    23. print("中位评分: %d" % median_rating)
    24. print("平均每个用户打分(次数): %2.2f" % ratings_per_user)
    25. print("平均每部电影评分(次数): %2.2f" % ratings_per_movie)

    • 统计评分分布情况
    1. # 生成评分统计RDD,并落地
    2. count_by_rating = ratings.countByValue()
    3. # 生成x/y坐标轴
    4. x_axis = np.array(count_by_rating.keys())
    5. y_axis = np.array([float(c) for c in count_by_rating.values()])
    6. # 对人数做标准化
    7. y_axis_normed = y_axis / y_axis.sum()
    8. # 生成x轴标签
    9. pos = np.arange(len(y_axis))
    10. width = 1.0
    11. ax = plt.axes()
    12. ax.set_xticks(pos + (width / 2))
    13. ax.set_xticklabels(y_axis)
    14. # 绘制评分分布柱状图
    15. plt.bar(pos, y_axis_normed, width, color='lightblue')
    16. plt.xticks(rotation=30)
    17. plt.show()

  • 相关阅读:
    文心一言 vs GPT-4 —— 全面横向比较
    嵌入式学习笔记(41)实时时钟RTC
    一遍关于vue基础语法下篇
    C++语法基础
    53页现代智慧社区数据物联解决方案
    再次登顶 GitHub,阿里大牛用 758 页讲清微服务 K8S 响应式的文案,真的太香了
    EfficientNet-V1论文阅读笔记
    Docker挂载镜像到本地(日常记录)
    java基础巩固3
    管道流:字节管道流、字符管道流
  • 原文地址:https://blog.csdn.net/2201_75642955/article/details/136359953