数据说明
Spark 数据分析 (Scala)
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import java.io.{File, PrintWriter}
case class Info(userId: Long,itemId: Long,action: String,time: String)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("tao bao product")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
val rdd: RDD[Array[String]] = sc.textFile("data/practice2/Processed_UserBehavior.csv").map(_.split(","))
val df: DataFrame = rdd.map(attr => Info(attr(0).trim.toInt, attr(1).trim.toInt, attr(2), attr(3))).toDF()
val behavior_count: DataFrame = df.groupBy("action").count()
val result1 = behavior_count.toJSON.collectAsList().toString
val top_10_item:Array[(String,Int)] = df.filter(df("action") === "buy").select(df("itemId"))
.rdd.map(v => (v(0).toString,1))
val result2 = sc.parallelize(top_10_item).toDF().toJSON.collectAsList().toString
val top_10_user: Array[(String,Int)] = df.filter(df("action") === "buy").select(df("userId"))
.rdd.map(v => (v(0).toString, 1))
val result3 = sc.parallelize(top_10_user).toDF().toJSON.collectAsList().toString
val buy_order_by_date: Array[(String,Int)] = df.filter(df("action") === "buy").select(df("time"))
.rdd.map(v => (v.toString().replace("[","").replace("]","").split(" ")(0),1)
).reduceByKey(_+_).sortBy(_._1).collect()
val result4 = sc.parallelize(buy_order_by_date).toDF().toJSON.collectAsList().toString
val writer4 = new PrintWriter(new File("data/practice2/result4.json"))
数据可视化(pyecharts)
1、 用户行为数据分析
2、销量前 10 的商品数据
3、用户购买量前 10
4、时间段商品销量波动