准备数据
- //company app visit_times
- 腾讯,腾讯视频,800
- 腾讯,QQ音乐,900
- 腾讯,微信读书,100
- 腾讯,微信,900
- 腾讯,腾讯课堂,200
- 阿里,支付宝,900
- 阿里,优酷视频,700
- 阿里,虾米音乐,500
- 阿里,飞猪,700
- 阿里,钉钉,600
- 百度,百度App,700
- 百度,百度地图,800
- 百度,爱奇艺,800
- 百度,百度钱包,100
- 百度,百度贴吧,200
- import org.apache.spark.sql.functions._
- val df = spark.read.textFile("./data/test")
- .map(_.split(","))
- .map(x => (x(0), x(1), x(2)))
- .toDF("company", "app", "vst_times")
- .groupBy("company","app")
- .agg(sum("vst_times") as "vst_times")
- .cache()
- val windows = Window.partitionBy("company","app").orderBy(col("vst_times").desc)
- //取出BAT三大公司访问量Top2的app
- df.select("company", "app", "vst_times")
- .withColumn("row_number", row_number().over(windows))
- .where("row_number <= 2 ")
- .select("company", "app", "vst_times")
- .show()
- val apprdd = spark.read.textFile("test.log")
- .map(line => {
- val x = line.split(",")
- ((x(0), x(1)), x(2))
- })
- val reduced: RDD[((String, String), Int)] = apprdd.reduceByKey(_+_)
- val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
- //按照公司分组
- val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))
- //输出结果
- sorted.foreach(println(_))
- //释放资源
- sc.stop()
结果输出:
- +-------+--------+---------+
- |company| app|vst_times|
- +-------+--------+---------+
- | 腾讯| QQ音乐| 900|
- | 腾讯| 微信| 900|
- | 百度|百度地图| 800|
- | 百度| 爱奇艺| 800|
- | 阿里| 支付宝| 900|
- | 阿里|优酷视频| 700|
- +-------+--------+---------+