• Spark实现二次排序



    申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址
    全文共计3020字,阅读大概需要3分钟

    1. 实验室名称:

    大数据实验教学系统

    2. 实验项目名称:

    练习 Spark实现二次排序

    3. 实验学时:

    4. 实验原理:

    二次排序就是对于<key,value>类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
      Spark 二次排序解决方案:
      我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。

    5. 实验目的:

    掌握Spark RDD实现二次排序的方法。
      掌握groupBy、orderBy等算子的使用。

    6. 实验内容:

    使用Spark RDD实现二次排序。
       假设我们有以下输入文件(逗号分割的分别是“年,月,总数”):

    1.	    2018,5,22
    2.	    2019,1,24
    3.	    2018,2,128
    4.	    2019,3,56
    5.	    2019,1,3
    6.	    2019,2,-43
    7.	    2019,4,5
    8.	    2019,3,46
    9.	    2018,2,64
    10.	    2019,1,4
    11.	    2019,1,21
    12.	    2019,2,35
    13.	    2019,2,0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    我们期望的输出结果是:

    1.	    2018-2  64,128
    2.	    2018-5  22
    3.	    2019-1  3,4,21,24
    4.	    2019-2  -43,0,35
    5.	    2019-3  46,56
    6.	    2019-4  5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    7. 实验器材(设备、虚拟机名称):

    硬件:x86_64 ubuntu 16.04服务器
      软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1

    8. 实验步骤:

    8.1 启动Spark集群

    在终端窗口下,输入以下命令,启动Spark集群:

    1.	$ cd /opt/spark
    2.	$ ./sbin/start-all.sh
    
    • 1
    • 2

    然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。

    8.2 启动zeppelin服务器

    在终端窗口下,输入以下命令,启动zeppelin服务器:

    1.	$ zeppelin-daemon.sh start
    
    • 1

    然后使用jps命令查看进程,确保zeppelin服务器已经启动。

    8.3 创建notebook文档

    1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器。
    http://localhost:9090
      2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页。
    在这里插入图片描述

    3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示。
    在这里插入图片描述

    8.4 二次排序实现

    1、加载数据集。在zeppelin中执行如下代码。

    1.	val inputPath = "file:///data/dataset/data.txt"
    2.	val inputRDD = sc.textFile(inputPath)
    
    • 1
    • 2

    将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。
      2、实现二次排序。在zeppelin中执行如下代码。

    1.	// 实现二次排序
    2.	val sortedRDD = inputRDD
    3.	     .map(line => {
    4.	          val arr = line.split(",")
    5.	          val key = arr(0) + "-" + arr(1)
    6.	          val value = arr(2)
    7.	          (key,value)
    8.	     })
    9.	     .groupByKey()
    10.	     .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(",")))
    11.	     .sortByKey(true)          // true:升序,false:降序
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。
      3、结果输出。在zeppelin中执行如下代码。

    1.	sortedRDD.collect.foreach(t => println(t._1 + "\t" + t._2))
    
    • 1

    将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。

    1.	2018-2    64,128
    2.	2018-5    22
    3.	2019-1    3,4,21,24
    4.	2019-2    -43,0,35
    5.	2019-3    46,56
    6.	2019-4    5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    8.5 转换为DataFrame输出

    也可以将结果转换为DataFrame输出。在zeppelin中执行如下代码。

    1.	// 加载数据集
    2.	val inputPath = "file:///data/dataset/data.txt"
    3.	val inputRDD = sc.textFile(inputPath)
    4.	     
    5.	// 实现二次排序
    6.	val sortedRDD = inputRDD
    7.	                  .map(line => {
    8.	                    val arr = line.split(",")
    9.	                    val key = arr(0) + "-" + arr(1)
    10.	                    val value = arr(2)
    11.	                    (key,value)
    12.	                  })
    13.	                  .groupByKey()
    14.	                  .map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(",")))
    15.	                  .sortByKey(true)          // true:升序,false:降序
    16.	     
    17.	// 转换为DataFrame
    18.	val sortedDF = sortedRDD.toDF("key", "value")
    19.	     
    20.	// 显示
    21.	sortedDF.show
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。

    9. 实验结果及分析:

    实验结果运行准确,无误

    10. 实验结论:

    经过本节实验的学习,通过练习 Spark实现二次排序,进一步巩固了我们的Spark基础。

    11. 总结及心得体会:

    二次排序就是对于<key,value>类型的数据,不但按key排序,而且每个Key对应的value也是有序的。
      Spark 二次排序解决方案:
      我们只需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。

    在这里插入图片描述

  • 相关阅读:
    lua 光速入门
    AI图书推荐:利用生成式AI实现业务流程超自动化
    还在用Excel做固定资产管理?那就OUT了
    每日一题 518零钱兑换2(完全背包)
    动态切换 Spring Boot 打包配置:使用 Maven Profiles 管理 JAR 和 WAR
    MySql运维篇---009:分库分表:垂直拆分、水平拆分、通过MyCat进行分片,读写分离:一主一从、 双主双从
    【python小游戏】飞机大作战源码分享(附完整源码+图片资源可直接运行)
    JavaScript对象与内置对象
    如果忘记了 iPhone 密码
    uni-app进阶之自定义【day13】
  • 原文地址:https://blog.csdn.net/qq_44807756/article/details/125570119