• Spark SQL案例【电商购买数据分析】


    数据说明

    27280253004d499b9c6869b7ab5a3550.png

    Spark 数据分析 (Scala)

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.sql.{DataFrame, SparkSession}
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. import java.io.{File, PrintWriter}
    5. object Taobao {
    6. case class Info(userId: Long,itemId: Long,action: String,time: String)
    7. def main(args: Array[String]): Unit = {
    8. // 使用2个CPU核心
    9. val conf = new SparkConf().setMaster("local[2]").setAppName("tao bao product")
    10. val spark = SparkSession.builder().config(conf).getOrCreate()
    11. import spark.implicits._
    12. val sc = spark.sparkContext
    13. // 从本地文件系统加载文件生成RDD对象
    14. val rdd: RDD[Array[String]] = sc.textFile("data/practice2/Processed_UserBehavior.csv").map(_.split(","))
    15. // RDD 转为 DataFrame对象
    16. val df: DataFrame = rdd.map(attr => Info(attr(0).trim.toInt, attr(1).trim.toInt, attr(2), attr(3))).toDF()
    17. // Spark 数据分析
    18. //1.用户行为信息统计
    19. val behavior_count: DataFrame = df.groupBy("action").count()
    20. val result1 = behavior_count.toJSON.collectAsList().toString
    21. // val writer1 = new PrintWriter(new File("data/practice2/result1.json"))
    22. // writer1.write(result1)
    23. // writer1.close()
    24. //2.销量前十的商品信息统计
    25. val top_10_item:Array[(String,Int)] = df.filter(df("action") === "buy").select(df("itemId"))
    26. .rdd.map(v => (v(0).toString,1))
    27. .reduceByKey(_+_)
    28. .sortBy(_._2,false)
    29. .take(10)
    30. val result2 = sc.parallelize(top_10_item).toDF().toJSON.collectAsList().toString
    31. // val writer2 = new PrintWriter(new File("data/practice2/result2.json"))
    32. // writer2.write(result2)
    33. // writer2.close()
    34. //3.购物数量前十的用户信息统计
    35. val top_10_user: Array[(String,Int)] = df.filter(df("action") === "buy").select(df("userId"))
    36. .rdd.map(v => (v(0).toString, 1))
    37. .reduceByKey(_ + _)
    38. .sortBy(_._2, false)
    39. .take(10)
    40. val result3 = sc.parallelize(top_10_user).toDF().toJSON.collectAsList().toString
    41. // val writer3 = new PrintWriter(new File("data/practice2/result3.json"))
    42. // writer3.write(result3)
    43. // writer3.close()
    44. // 4.时间段内平台商品销量统计
    45. val buy_order_by_date: Array[(String,Int)] = df.filter(df("action") === "buy").select(df("time"))
    46. .rdd.map(v => (v.toString().replace("[","").replace("]","").split(" ")(0),1)
    47. ).reduceByKey(_+_).sortBy(_._1).collect()
    48. //转为dataframe
    49. // buy_order_by_date.foreach(println)
    50. /*
    51. (2017-11-25,21747)
    52. (2017-11-26,22265)
    53. (2017-11-27,24583)
    54. (2017-11-28,23153)
    55. (2017-11-29,24102)
    56. (2017-11-30,23994)
    57. (2017-12-01,23153)
    58. (2017-12-02,28512)
    59. */
    60. val result4 = sc.parallelize(buy_order_by_date).toDF().toJSON.collectAsList().toString
    61. val writer4 = new PrintWriter(new File("data/practice2/result4.json"))
    62. writer4.write(result4)
    63. writer4.close()
    64. sc.stop()
    65. spark.stop()
    66. }
    67. }

    数据可视化(pyecharts)

    1、 用户行为数据分析

    64c660b23457453c8ac63a6eaceda211.png

    2、销量前 10 的商品数据

    7d64eb295fbd48989f1f46f3b80482c6.png

    3、用户购买量前 10

    04e22777931c4d5e9fd2d30fa8cad644.png

    4、时间段商品销量波动

    558e7f63039349f28ebed387e1c843c4.png

  • 相关阅读:
    常用随机变量的数学期望和方差
    墙面想贴好墙布,这些方法指南一定要看~好佳居窗帘十大品牌
    c++通过tensorRT调用模型进行推理
    mysql主从异常处理
    【面试算法——动态规划 21】正则表达式匹配(hard)&& 交错字符串
    基于Python的热门音乐特征数据分析
    【web框架】——Django——如桃花来
    Tangle:不同于区块链的分布式账本
    VMware 打开运行一段时间后卡死,CPU占比增至100%
    基于JavaWeb+SpringBoot+Vue健身俱乐部系统的设计和实现
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/133186020