大数据实验教学系统
练习 Spark实现二次排序
二次排序就是对于<key,value>类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
Spark 二次排序解决方案:
我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。
掌握Spark RDD实现二次排序的方法。
掌握groupBy、orderBy等算子的使用。
使用Spark RDD实现二次排序。
假设我们有以下输入文件(逗号分割的分别是“年,月,总数”):
1. 2018,5,22
2. 2019,1,24
3. 2018,2,128
4. 2019,3,56
5. 2019,1,3
6. 2019,2,-43
7. 2019,4,5
8. 2019,3,46
9. 2018,2,64
10. 2019,1,4
11. 2019,1,21
12. 2019,2,35
13. 2019,2,0
我们期望的输出结果是:
1. 2018-2 64,128
2. 2018-5 22
3. 2019-1 3,4,21,24
4. 2019-2 -43,0,35
5. 2019-3 46,56
6. 2019-4 5
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
在终端窗口下,输入以下命令,启动Spark集群:
1. $ cd /opt/spark
2. $ ./sbin/start-all.sh
然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。
在终端窗口下,输入以下命令,启动zeppelin服务器:
1. $ zeppelin-daemon.sh start
然后使用jps命令查看进程,确保zeppelin服务器已经启动。
1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器。
http://localhost:9090
2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页。
3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示。
1、加载数据集。在zeppelin中执行如下代码。
1. val inputPath = "file:///data/dataset/data.txt"
2. val inputRDD = sc.textFile(inputPath)
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。
2、实现二次排序。在zeppelin中执行如下代码。
1. // 实现二次排序
2. val sortedRDD = inputRDD
3. .map(line => {
4. val arr = line.split(",")
5. val key = arr(0) + "-" + arr(1)
6. val value = arr(2)
7. (key,value)
8. })
9. .groupByKey()
10. .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(",")))
11. .sortByKey(true) // true:升序,false:降序
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。
3、结果输出。在zeppelin中执行如下代码。
1. sortedRDD.collect.foreach(t => println(t._1 + "\t" + t._2))
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。
1. 2018-2 64,128
2. 2018-5 22
3. 2019-1 3,4,21,24
4. 2019-2 -43,0,35
5. 2019-3 46,56
6. 2019-4 5
也可以将结果转换为DataFrame输出。在zeppelin中执行如下代码。
1. // 加载数据集
2. val inputPath = "file:///data/dataset/data.txt"
3. val inputRDD = sc.textFile(inputPath)
4.
5. // 实现二次排序
6. val sortedRDD = inputRDD
7. .map(line => {
8. val arr = line.split(",")
9. val key = arr(0) + "-" + arr(1)
10. val value = arr(2)
11. (key,value)
12. })
13. .groupByKey()
14. .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(",")))
15. .sortByKey(true) // true:升序,false:降序
16.
17. // 转换为DataFrame
18. val sortedDF = sortedRDD.toDF("key", "value")
19.
20. // 显示
21. sortedDF.show
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。
实验结果运行准确,无误
经过本节实验的学习,通过练习 Spark实现二次排序,进一步巩固了我们的Spark基础。
二次排序就是对于<key,value>类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
Spark 二次排序解决方案:
我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。