• 实验7 Spark初级编程实践


    一、实验目的

    • 掌握使用 Spark 访问本地文件和 HDFS 文件的方法
    • 掌握 Spark 应用程序的编写、编译和运行方法

    二、实验平台

    • 操作系统:Ubuntu18.04(或 Ubuntu16.04)
    • Spark 版本:2.4.0
    • Hadoop 版本:3.1.3

    三、实验内容和要求

    1. 安装 Hadoop 和 Spark

    进人 Linux 操作系统,完成 Hadoop 伪分布式模式的安装。完成 Hadoop 的安装以后,再安装 Spark (Local 模 式 ) 。具体安装过程 ,可以 参考教材官网(https://dblab.xmu.edu.cn/post/bigdata3/)的“教材配套大数据软件安装和编程实践指南”。

    2. Spark 读取文件系统的数据

    启动hadoop

    cd /usr/local/hadoop
    start-all.sh
    
    • 1
    • 2

    在这里插入图片描述

    (1) 在 spark-shell 中读取 Linux 系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数;

    echo -e "Hello\nThis is a test\nBye!" >> ~/test.txt
    
    • 1

    在这里插入图片描述

    启动spark-shell:

    cd  /usr/local/spark
    ./bin/spark-shell
    
    • 1
    • 2

    在这里插入图片描述
    注意此处的spark和Scala的版本
    输入命令:

    val textFile=sc.textFile("file:///home/hadoop/test.txt")
    
    • 1

    在这里插入图片描述

    textFile.count()
    
    • 1

    在这里插入图片描述

    (2) 在 spark-shell 中读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后统计出文件的行数;

    如果该文件不存在,创建:

    hadoop fs -mkdir -p /usr/hadoop
    
    • 1

    在这里插入图片描述
    在终端执行,上传test.txt文件至HDFS中:

    /usr/local/hadoop/bin/hdfs dfs -put ~/test.txt
    
    • 1

    在这里插入图片描述
    在Spark执行

    val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")
    textFile.count()
    
    • 1
    • 2

    在这里插入图片描述

    (3) 编写独立应用程序(推荐使用 Scala),读取 HDFS 文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后统计出文件的行数;通过 sbt 将整个应用程序编译打包成 JAR 包,并将生成的 JAR 包通过 spark-submit 提交到 Spark 中运行命令。
    进入spark安装目录:

    cd /usr/local/spark
    mkdir mycode && cd mycode
    
    • 1
    • 2

    创建HDFStset目录并编写Scala文件:

    mkdir -p HDFStest/src/main/scala
    vim ./HDFStest/src/main/scala/HDFStest.scala
    
    • 1
    • 2

    HDFStest.scala:

    /* HDFStest.scala */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
     
    object HDFStest {
        def main(args: Array[String]) {
            val logFile = "hdfs://localhost:9000/user/hadoop/test.txt"
            val conf = new SparkConf().setAppName("Simple Application")
            val sc = new SparkContext(conf)
            val logData = sc.textFile(logFile, 2)
            val num = logData.count()
            printf("The num of this file is %d\n", num)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    进入 HDFStest 目录,创建 simple.sbt:

    cd HDFStest
    vim simple.sbt
    
    • 1
    • 2

    注意这里的 scalaVersion 是我的 Scala 版本(2.11.12),spark-core 是我的 spark 版本(2.4.0)。

    name := "A Simple HDFS Test"
    version := "1.0"
    scalaVersion := "2.11.12"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    接下来,可以通过如下代码将整个应用程序打包成 JAR:

    sbt package
    
    • 1
    /usr/local/spark/bin/spark-submit  --class  "HDFStest" /usr/local/spark/mycode/HDFStest/target/scala-2.11/a-simple-hdfs-test_2.11-1.0.jar 2>& 1 | grep The
    
    • 1
    3. 编写独立应用程序实现数据去重

    对于两个输入文件 A 和 B,编写 Spark 独立应用程序(推荐使用 Scala),对两个文件进行
    合并,并剔除其中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,
    可供参考。
    输入文件 A 的样例如下:

    20170101 x
    20170102 y
    20170103 x
    20170104 y
    20170105 z
    20170106 z
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    输入文件 B 的样例如下:

    20170101 y
    20170102 y
    20170103 x
    20170104 z
    20170105 y
    
    • 1
    • 2
    • 3
    • 4
    • 5

    根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:

    20170101 x
    20170101 y
    20170102 y
    20170103 x
    20170104 y
    20170104 z
    20170105 y
    20170105 z
    20170106 z
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    cd /usr/local/spark/mycode
    mkdir -p RemDup/src/main/scala
    cd RemDup
    
    • 1
    • 2
    • 3
    mkdir datas
    
    • 1

    写入A数据:

    vim ./datas/A
    
    • 1

    写入B数据:

    vim ./datas/B
    
    • 1

    写入RemDup.scala:

    vim ./src/main/scala/RemDup.scala
    
    • 1

    编写Scale文件

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.HashPartitioner
    
    object RemDup {
        def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("RemDup")
            val sc = new SparkContext(conf)
            val dataFile = "file:///usr/local/spark/mycode/RemDup/datas"
            val data = sc.textFile(dataFile,2)
            val res = data.filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys
            res.saveAsTextFile("file:///usr/local/spark/mycode/RemDup/result")
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    编写simple.sbt文件:

    vim simple.sbt
    
    • 1

    注意此处的scale版本和spark版本

    name := "Remove Duplication"
    version := "1.0"
    scalaVersion := "2.12.15"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    打包:

    sbt package
    
    • 1
    /usr/local/spark/bin/spark-submit --class "RemDup"  /usr/local/spark/mycode/RemDup/target/scala-2.11/remove-duplication_2.11-1.0.jar
    
    • 1

    在这里插入图片描述

    查看结果:

    cat result/*
    
    • 1

    在这里插入图片描述

    4. 编写独立应用程序实现求平均值问题

    每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

    Algorithm 成绩:

    小明 92
    小红 87
    小新 82
    小丽 90
    
    • 1
    • 2
    • 3
    • 4

    Database 成绩:

    小明 95
    小红 81
    小新 89
    小丽 85
    
    • 1
    • 2
    • 3
    • 4

    Python 成绩:

    小明 82
    小红 83
    小新 94
    小丽 91
    
    • 1
    • 2
    • 3
    • 4

    平均成绩如下:

    (小红,83.67)
    (小新,88.33)
    (小明,89.67)
    (小丽,88.67)
    
    • 1
    • 2
    • 3
    • 4

    进入到 mycode 目录,新建 AvgScore 目录,

    cd /usr/local/spark/mycode
    mkdir -p AvgScore/src/main/scala
    cd AvgScore
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    新建 datas 目录,写入文件 algorithm、database、python:

    mkdir datas
    
    • 1

    注意这里 algorithm、database 和 python 文件内容不能有多余的换行符或者空格!

    vim ./datas/algorithm
    
    • 1
    vim ./datas/database
    
    • 1
    vim ./datas/python
    
    • 1

    编写 Scala 文件:

    vim ./src/main/scala/AvgScore.scala
    
    • 1

    代码如下:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.HashPartitioner
    
    object AvgScore {
        def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("AvgScore")
            val sc = new SparkContext(conf)
            val dataFile = "file:///usr/local/spark/mycode/AvgScore/datas"
            val data = sc.textFile(dataFile,3)
    
           val res = data.filter(_.trim().length>0).map(line=>(line.split(" ")(0).trim(),line.split(" ")(1).trim().toInt)).partitionBy(new HashPartitioner(1)).groupByKey().map(x => {
           	   	var n = 0
    	       	var sum = 0.0
    	       	for(i <- x._2){
    				sum = sum + i
    	       		n = n +1
        	    }
    	        val avg = sum/n
        	    val format = f"$avg%1.2f".toDouble
        	    (x._1,format)
    	    })
           res.saveAsTextFile("file:///usr/local/spark/mycode/AvgScore/result")
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    编写 simple.sbt 文件:

    vim simple.sbt
    
    • 1

    内容如下:

    name := "Average Score"
    version := "1.0"
    scalaVersion := "2.11.12"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    使用如下命令打包:

    sbt package
    
    • 1

    在这里插入图片描述

    使用生成的 jar 包:

    /usr/local/spark/bin/spark-submit --class "AvgScore"  /usr/local/spark/mycode/AvgScore/target/scala-2.11/average-score_2.11-1.0.jar
    
    • 1

    使用如下命令查看输出:

    cat result/*
    
    • 1

    输出如下:
    在这里插入图片描述

    四、遇到的问题:

    1、 输入/usr/local/sbt/sbt package打包时,显示找不到sbt
    2、 vim中无法退出
    3、 报错127.0.1.1 to hadoop:9000 failed on connection exception: 拒绝连接
    4、 没有文件夹/usr/Hadoop/test.txt

    五、解决办法:

    1、 将sbt package设置为全局变量,后续打包只需输入sbt package
    2、 vim退出方法:esc :wq
    3、 未开启hadoop,输入start-all.sh开启hadoop
    4、 新建文件夹mkdir -p test.txt

  • 相关阅读:
    C/C++程序,从命令行传入参数
    你看过字符画吗?用 Python 自己实现一个吧
    奈斯搜索。
    本地上传代码文件到github的一些问题处理
    自然语言处理实战项目18-NLP模型训练中的Logits与损失函数的计算应用项目
    计算机毕业设计node.js+Vue+Element企业员工信息管理系统
    【深度优先搜索遍历算法的实现,广度优先遍历(BFS-Breadth_First Search),构造最小生成树】
    【JavaScript】运算符及其优先级
    计算机毕业设计django基于python学生考试成绩数据分析与可视化系统(源码+系统+mysql数据库+Lw文档)
    LiteIDE主题定制教程
  • 原文地址:https://blog.csdn.net/weixin_51293984/article/details/128076728