• 电影评分数据分析案例-Spark SQL


    1. # cording:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import IntegerType, StringType, StructType
    4. import pyspark.sql.functions as F
    5. if __name__ == '__main__':
    6. # 0.构建执行环境入口对象SparkSession
    7. spark = SparkSession.builder.\
    8. appName('movie_demo').\
    9. master('local[*]').\
    10. getOrCreate()
    11. sc = spark.sparkContext
    12. # 1.读取文件
    13. schema = StructType().add('user_id', StringType(), nullable=True). \
    14. add('movie_id', IntegerType(), nullable=True).\
    15. add('rank', IntegerType(), nullable=True).\
    16. add('ts', StringType(), nullable=True)
    17. df = spark.read.format('csv').\
    18. option('sep', '\t').\
    19. option('header', False).\
    20. option('encoding', 'utf-8').\
    21. schema(schema=schema).\
    22. load('../input/u.data')
    23. # TODO 1:用户平均分
    24. df.groupBy('user_id').\
    25. avg('rank').\
    26. withColumnRenamed('avg(rank)', 'avg_rank').\
    27. withColumn('avg_rank', F.round('avg_rank', 2)).\
    28. orderBy('avg_rank', ascending=False).\
    29. show()
    30. # TODO 2:电影的平均分查询
    31. df.createTempView('movie')
    32. spark.sql('''
    33. SELECT movie_id, ROUND(AVG(rank),2) as avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC
    34. ''').show()
    35. # TODO 3:查询大于平均分的电影数量
    36. print('大于平均分电影数量为:', df.where(df['rank'] > df.select(F.avg('rank')).first()['avg(rank)']).count())
    37. # TODO 4:查询高分电影中(>3)打分次数最多的用户,此人打分的平均分
    38. # 找出打分次数最多的人
    39. user_id = df.where('rank>3').\
    40. groupBy('user_id').\
    41. count(). \
    42. withColumnRenamed('count', 'cnt').\
    43. orderBy('cnt', ascennding=False).\
    44. limit(1).\
    45. first()['user_id']
    46. # 算平均分
    47. df.filter(df['user_id'] == user_id).\
    48. select(F.round(F.avg('rank'), 2)).show()
    49. # TODO 5: 查询每个用户的平均分打分,最低打分,最高打分
    50. df.groupBy('user_id').\
    51. agg(
    52. F.round(F.avg('rank'), 2).alias('avg_rank'),
    53. F.min('rank').alias('min_rank'),
    54. F.max('rank').alias('max_rank')
    55. ).show()
    56. # TODO 6:查询评分超过100次的电影的平均分 排名TOP10
    57. df.groupBy('movie_id').\
    58. agg(
    59. F.round(F.count('movie_id'),2).alias('cnt'),
    60. F.round(F.avg('rank'),2).alias('avg_rank')
    61. ).\
    62. where('cnt > 100').\
    63. orderBy('avg_rank', ascending=False).\
    64. limit(10).\
    65. show()
    66. '''
    67. 1.agg:它是GroupedData对象的API,作用是:在里面可以写多个聚合
    68. 2.alias:它是Column对象的API,可以针对一个列进行改名
    69. 3.withColumnRenamed:它是DataFrame的API,可以对DF中的列进行改名,一次改一个列,改多个列可以链式调用
    70. 4.orderBy:DataFrame的API,进行排序,参数1是被排序的列,参数2是 升序(True)或降序(False)
    71. 5.first:DataFrame的API,取出DF的第一行数据,返回值结果是Row对象
    72. ## Row对象:就是一个数组,可以通过row['列名']来取出当前行中,某一列具体数值,返回值不再是DF 或者GroupedData 或者Column 而是具体的值(字符串、数字等)
    73. '''

    1.

    2.

    3.

    4.

    5.

    6.

  • 相关阅读:
    嵌入式开发:嵌入式基础——代码和数据空间揭秘
    【论文笔记】Aleph_star
    Java实现的插件化策略模式
    docker中安装Ubuntu20,浏览器访问其图形界面
    跨域:利用JSONP、WebSocket实现跨域访问
    【面试题 - mysql】进阶篇 - 分库分表
    不同类型的RFID标签及其应用场景浅析
    班级校园网页设计作业 静态HTML我的班级网页 DW班级网站模板下载 大学生简单班级网页作品代码 我的大学网页制作 学生班级网页设计作业
    spring boot使用自定义过滤器实现接口认证
    立体库堆垛机提升电机运行动作功能块
  • 原文地址:https://blog.csdn.net/2202_75347029/article/details/133996839