大数据实验教学系统
RDD的创建方式
在调用任何transformation或action操作之前,必须先要有一个RDD。Spark提供了创建RDDs的三种方法。一种方法是将现有的集合并行化。另一种方法是加载外部存储系统中的数据集,比如文件系统。第三种方法是在现有RDD上进行转换来得到新的RDD。
使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。
使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。
掌握Spark基于内存创建RDD的方法。
掌握Spark基于外部存储创建RDD的方法。
1、读取内存数据,构造RDD。
2、读取外部存储数据,构造RDD。
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1,Scala-2.11.11
在终端窗口下,输入以下命令,启动Spark集群:
1. $ cd /opt/spark
2. $ ./sbin/start-all.sh
然后使用jps
命令查看进程,确保Spark的Master进程和Worker进程已经启动。
在终端窗口下,输入以下命令,启动zeppelin服务器:
1. $ zeppelin-daemon.sh start
然后使用jps
命令查看进程,确保zeppelin服务器已经启动。
1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器。
http://localhost:9090
2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页。
3、点击【Create new note
】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示。
创建RDD的第一种方法是将对象集合并行化,这意味着将其转换为可以并行操作的分布式数据集。对象集合的并行化方法是调用SparkContext类的parallelize方法。
在上一步打开的”rdd_demo” notebook的代码单元中,执行以下代码。
1、从列表中并行化构造RDD。
1. // 定义一个集合
2. val list1 = List(1,2,3,4,5,6,7,8,9,10)
3.
4. // 构造RDD
5. val rdd1 = sc.parallelize(list1)
6.
7. // 将集合返回到Driver端
8. rdd1.collect
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。
list1: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
从上面的输出可以看出,zeppelin会显示每一行代码的执行结果。
2、也可以像下面这样快速创建list,然后由list创建RDD。
1. val list2 = List.range(1,11)
2. val rdd2 = sc.parallelize(list2)
3. rdd2.collect
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示:
list2: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
3、也可以像下面这样使用数组创建RDD。
1. val strList = Array("明月几时有","把酒问青天","不知天上宫阙","今夕是何年")
2. val strRDD = sc.parallelize(strList)
3. strRDD.collect
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示。
strList: Array[String] = Array(明月几时有, 把酒问青天, 不知天上宫阙, 今夕是何年)
strRdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:27
res14: Array[String] = Array(明月几时有, 把酒问青天, 不知天上宫阙, 今夕是何年)
创建RDD的第二种方法是从存储系统读取数据集,存储系统可以是本地计算机文件系统、HDFS、Cassandra、Amazon S3等等。
请按下面的步骤操作,加载HDFS中数据存储并构造RDD。
1. 启动HDFS集群。在Linux终端窗口下,键入以下命令,启动HDFS集群。
1. $ start-dfs.sh
然后使用jps命令查看进程,确保HDFS集群已经启动。
2、将本地数据上传至HDFS上。在Linux终端窗口下,键入以下命令。
1. $ hdfs dfs -mkdir -p /data/dataset/
2. $ hdfs dfs -put /data/dataset/resources /data/dataset/
以上命令会将/data/dataset/下的resources文件夹上传到HDFS的/data/spark_demo/目录下。在resources文件夹中包含有不同类型的数据文件。可以使用以下命令查看。
1. $ hdfs dfs -ls /data/dataset/resources
可以看到,在HDFS的/data/spark_demo/resources/下已经有了各种类型的数据文件:
Found 9 items
-rw-r—r— 1 root supergroup 130 2019-07-05 10:40 /data/dataset/resources/employees.json
-rw-r—r— 1 root supergroup 240 2019-07-05 10:40 /data/dataset/resources/full_user.avsc
-rw-r—r— 1 root supergroup 5812 2019-07-05 10:41 /data/dataset/resources/kv1.txt
-rw-r—r— 1 root supergroup 49 2019-07-05 10:41 /data/dataset/resources/people.csv
-rw-r—r— 1 root supergroup 73 2019-07-05 10:41 /data/dataset/resources/people.json
-rw-r—r— 1 root supergroup 32 2019-07-05 10:41 /data/dataset/resources/people.txt
-rw-r—r— 1 root supergroup 185 2019-07-05 10:41 /data/dataset/resources/user.avsc
-rw-r—r— 1 root supergroup 334 2019-07-05 10:41 /data/dataset/resources/users.avro
-rw-r—r— 1 root supergroup 615 2019-07-05 10:41 /data/dataset/resources/users.parquet
3、加载HDFS上存储的txt文本文件,构造一个RDD。在zeppelin中执行如下代码。
1. // 定义HDFS上文件的存储路径
2. val hdfsFile = "/data/dataset/resources/people.txt"
3.
4. // 使用SparkContext的textFile方法,加载文本文件并构造RDD
5. // 注意:传入的参数是要加载的HDFS上的数据文件路径
6. val lines = sc.textFile(hdfsFile)
7.
8. // 返回RDD所有元素到Driver端
9. lines.collect()
将光标放在代码单元中任意位置,然后同时按下【shift+enter】键,执行以上代码。输出结果如下所示:
hdfsFile: String = /data/dataset/resources/people.txt
lines: org.apache.spark.rdd.RDD[String] = /data/dataset/resources/people.txt MapPartitionsRDD[4] at textFile at <console>:30
res14: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)
实验结果运行准确,无误
经过本节实验的学习,通过学习RDD的创建方式,进一步巩固了我们的Spark基础。
在调用任何transformation或action操作之前,必须先要有一个RDD。Spark提供了创建RDDs的三种方法。一种方法是将现有的集合并行化。另一种方法是加载外部存储系统中的数据集,比如文件系统。第三种方法是在现有RDD上进行转换来得到新的RDD。
本次实验演示了前两种构造RDD的方法。
1、SparkContext通过哪个方法来加载外部的数据集(){C}
A、map()
B、text()
C、textFile()
D、read()
1. 在本实验环境的HDFS中,存在如下的数据文件:/data/spark_demo/resources/people.csv。请在Zeppelin中编写Spark代码,加载该CSV文件构造RDD。