大数据实验教学系统
DataFrame的操作-使用DSL
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

掌握操作DataFrame的各种方法(转换、过滤、排序等)。
掌握DataFrame存储操作。
操作DataFrame的转换、过滤、排序等方法。具体包含如下内容:
- select(cols)
- selectExpr(expr)
- drop(cols)
- dropDuplicates(subset=None)
- dropna(how=’any’,thresh=None,subset=None)
- filter(condition)
- where(condition)
- limit(num)
- withColumn(colName,col)
- withColumnRename(existing,new)
- orderBy(cols,kwargs)
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。
在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh
2. $ cd /opt/spark
3. $ ./sbin/start-all.sh
4. $ zeppelin-daemon.sh start
2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:
1. $ hdfs dfs -mkdir -p /data/dataset/batch
2. $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:

1、select(*cols)通过表达式选取DataFRame中符合条件的数据,返回新的DataFrame。在zeppelin中执行如下代码:
1. // 读取csv数据
2. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
3.
4. // 通过select统计共有多少条数据
5. df.select("*").count()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. df: org.apache.spark.sql.DataFrame = [ID: string, Gener: string ... 2 more fields]
2. res196: Long = 40
通过表达式选取DataFRame中符合条件的数据。在zeppelin中执行如下代码:
1. // 选取Age和Gener列,获取前十条数据
2. df.select("Age","Gener").show(10)
3.
4. // 查找Age列,并重命名为age,输出前十条数据
5. df.select($"Age".alias("age")).show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+
2. |Age| Gener|
3. +---+------+
4. | 34| Male|
5. | 23|Female|
6. | 26|Female|
7. | 27|Female|
8. | 24| Male|
9. | 33| Male|
10. | 34| Male|
11. | 23| Male|
12. | 26| Male|
13. | 27|Female|
14. +---+------+
15. only showing top 10 rows
16.
17. +---+
18. |age|
19. +---+
20. | 34|
21. | 23|
22. | 26|
23. | 27|
24. | 24|
25. | 33|
26. | 34|
27. | 23|
28. | 26|
29. | 27|
30. +---+
31. only showing top 10 rows
2、selectExpr(*expr),这个方法是select方法的一个变体,可以接受一个SQL表达式,返回新的DataFrame。在zeppelin中执行如下代码:
1. // 分别求取Age的乘积,Age的平方根。显示前十条数据
2. df.selectExpr("Age * 2","sqrt(Age)").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------+-------------------------+
2. |(Age * 2)|SQRT(CAST(Age AS DOUBLE))|
3. +---------+-------------------------+
4. | 68.0| 5.830951894845301|
5. | 46.0| 4.795831523312719|
6. | 52.0| 5.0990195135927845|
7. | 54.0| 5.196152422706632|
8. | 48.0| 4.898979485566356|
9. | 66.0| 5.744562646538029|
10. | 68.0| 5.830951894845301|
11. | 46.0| 4.795831523312719|
12. | 52.0| 5.0990195135927845|
13. | 54.0| 5.196152422706632|
14. +---------+-------------------------+
15. only showing top 10 rows
3、drop(*cols)按照列名删除DataFrame中的列,返回新的DataFrame。在zeppelin中执行如下代码:
1. // 打印所有的列名DataFrame.columns.values.tolist()
2. println(df.columns.toList)
3.
4. // 删除Age列名
5. var df1 = df.drop("Age")
6.
7. // 打印删除后的列名
8. println(df1.columns.toList)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. List(ID, Gener, Age, Annual Income)
2. List(ID, Gener, Annual Income)
4、dropDuplicates(subset=None)删除重复行,subset用于指定删除重复行的时候考虑那几列。在zeppelin中执行如下代码:
1. // 导入所需依赖
2. import org.apache.spark.sql.Row
3. import org.apache.spark.sql._
4. import org.apache.spark.sql.types._
5.
6. // 创建RDD,转换为DataFrame
7. var df = sc.parallelize(List(Row("simple",15,175),Row("simple",15,175),Row("simpleBDP",2,180)),3)
8. val schema = StructType(
9. List(
10. StructField("name", StringType, true),
11. StructField("age", IntegerType, true),
12. StructField("height", IntegerType, true)
13. )
14. )
15.
16. var df1 = spark.createDataFrame(df, schema)
17.
18. // 展示数据
19. // df.show
20.
21. // 删除重复的数据
22. df1.dropDuplicates().show()
23.
24. // 指定重复的列进行删除
25. df1.dropDuplicates(List("age","name")).show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------+---+------+
2. | name|age|height|
3. +---------+---+------+
4. |simpleBDP| 2| 180|
5. | simple| 15| 175|
6. +---------+---+------+
7.
8. +---------+---+------+
9. | name|age|height|
10. +---------+---+------+
11. |simpleBDP| 2| 180|
12. | simple| 15| 175|
13. +---------+---+------+
5、na.drop删除DataFrame中的空数据,加入”any”和”all”指定如何删除控制,加入数字参数指定有多少个空值进行删除,加入字段名删除指定字段中的空值。在zeppelin中执行如下代码:
1. var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
2.
3. val schema = StructType(
4. List(
5. StructField("luck", DoubleType, true),
6. StructField("age", DoubleType, true),
7. StructField("weight", DoubleType, true)
8. )
9. )
10.
11. // 创建DataFrame
12. var df = spark.createDataFrame(dfs,schema)
13.
14. // 显示DataFrame
15. df.show()
16.
17. // 有任意一个为na进行删除
18. df.na.drop("any").show
19.
20. // 全部为na进行删除
21. df.na.drop("all").show
22.
23. // 删除有两个na值的数据
24. df.na.drop(2).show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +----+----+------+
2. |luck| age|weight|
3. +----+----+------+
4. |null|27.0| 170.0|
5. |44.0|27.0| 170.0|
6. |null|null| null|
7. +----+----+------+
8.
9. +----+----+------+
10. |luck| age|weight|
11. +----+----+------+
12. |44.0|27.0| 170.0|
13. +----+----+------+
14.
15. +----+----+------+
16. |luck| age|weight|
17. +----+----+------+
18. |null|27.0| 170.0|
19. |44.0|27.0| 170.0|
20. +----+----+------+
21.
22. +----+----+------+
23. |luck| age|weight|
24. +----+----+------+
25. |null|27.0| 170.0|
26. |44.0|27.0| 170.0|
27. +----+----+------+
6、filter(condition)按照传入的条件进行过滤,其实where方法就是filter方法的一个别名而已。。在zeppelin中执行如下代码:
1. var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
2.
3. val schema = StructType(
4. List(
5. StructField("luck", DoubleType, true),
6. StructField("age", DoubleType, true),
7. StructField("weight", DoubleType, true)
8. )
9. )
10.
11. // 创建DataFrame
12. var df = spark.createDataFrame(dfs,schema)
13.
14. // 过滤数据
15. // df.filter($"luck"==44.0).show
16. df.filter("luck is not null").show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +----+----+------+
2. |luck| age|weight|
3. +----+----+------+
4. |44.0|27.0| 170.0|
5. +----+----+------+
7、where(condition),这个方法和filter方法类似,更具传入的条件作出选择。在zeppelin中执行如下代码:
1. // 加载csv数据
2. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
3.
4. // 打印数据
5. // df.show
6.
7. // 数据筛选
8. df.where("Age >= 30").show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 6| Male| 33| 2000|
6. | 7| Male| 34| 3500|
7. | 12|Female| 33| 3500|
8. | 13|Female| 34| 2500|
9. | 18| Male| 33| 2500|
10. | 19|Female| 34| 4500|
11. | 24| Male| 33| 4500|
12. | 25| Male| 34| 5500|
13. | 30|Female| 33| 5500|
14. | 31| Male| 34| 2000|
15. | 36|Female| 33| 2000|
16. | 37|Female| 34| 3500|
17. +---+------+---+-------------+
8、limit(num)限制返回的数据的条数,防止返回到driver节点的数据过大造成OOM。在zeppelin中执行如下代码:
1. // 限制返回10条数据
2. df.limit(10).show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. | 6| Male| 33| 2000|
10. | 7| Male| 34| 3500|
11. | 8| Male| 23| 2500|
12. | 9| Male| 26| 4500|
13. | 10|Female| 27| 5500|
14. +---+------+---+-------------+
9、withColumn(colName,col),返回一个新的DataFrame,这个DataFrame中新增加的colNAme的列,如果原来本身就有colName的列,就替换掉。在zeppelin中执行如下代码:
1. // 添加Age列,Age本身存在,替换掉
2. df.withColumn("Age",$"Age"*$"Age").show(10)
3.
4. // 添加Age2列
5. df.withColumn("Age2",$"Age"*$"Age").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+------+-------------+
2. | ID| Gener| Age|Annual Income|
3. +---+------+------+-------------+
4. | 1| Male|1156.0| 2000|
5. | 2|Female| 529.0| 3500|
6. | 3|Female| 676.0| 2500|
7. | 4|Female| 729.0| 4500|
8. | 5| Male| 576.0| 5500|
9. | 6| Male|1089.0| 2000|
10. | 7| Male|1156.0| 3500|
11. | 8| Male| 529.0| 2500|
12. | 9| Male| 676.0| 4500|
13. | 10|Female| 729.0| 5500|
14. +---+------+------+-------------+
15. only showing top 10 rows
16.
17. +---+------+---+-------------+------+
18. | ID| Gener|Age|Annual Income| Age2|
19. +---+------+---+-------------+------+
20. | 1| Male| 34| 2000|1156.0|
21. | 2|Female| 23| 3500| 529.0|
22. | 3|Female| 26| 2500| 676.0|
23. | 4|Female| 27| 4500| 729.0|
24. | 5| Male| 24| 5500| 576.0|
25. | 6| Male| 33| 2000|1089.0|
26. | 7| Male| 34| 3500|1156.0|
27. | 8| Male| 23| 2500| 529.0|
28. | 9| Male| 26| 4500| 676.0|
29. | 10|Female| 27| 5500| 729.0|
30. +---+------+---+-------------+------+
31. only showing top 10 rows
10、withColumnRename(existing,new),对已经存在的列明重命名为new,若名称不存在则这个操作不做任何事情。在zeppelin中执行如下代码:
1. // 修改Age的列名
2. df.withColumnRenamed("Age","age").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. | 6| Male| 33| 2000|
10. | 7| Male| 34| 3500|
11. | 8| Male| 23| 2500|
12. | 9| Male| 26| 4500|
13. | 10|Female| 27| 5500|
14. +---+------+---+-------------+
15. only showing top 10 rows
11、orderBy(cols,*kwargs),返回按照指定列排好序的新的DataFrame。在zeppelin中执行如下代码:
1. // 对年龄进行排序
2. df.orderBy("Age").show(5)
3.
4. // 降序排序
5. df.orderBy($"Age".desc).show(5)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 8| Male| 23| 2500|
5. | 14| Male| 23| 4500|
6. | 2|Female| 23| 3500|
7. | 32| Male| 23| 3500|
8. | 20|Female| 23| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
11.
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. | 7| Male| 34| 3500|
16. | 13|Female| 34| 2500|
17. | 1| Male| 34| 2000|
18. | 31| Male| 34| 2000|
19. | 19|Female| 34| 4500|
20. +---+------+---+-------------+
21. only showing top 5 rows
使用ascending关键字参数指定升降序排列。除了这种方式,还可以通过pyspark.sql.functions中定义好的desc降序和asc方法来排序。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.functions._
2.
3. // 对年龄进行排序
4. df.orderBy(desc("Age")).show(5)
5.
6. // 降序排序
7. df.orderBy($"Age".desc).show(5)
8.
9. // 对年龄进行排序
10. df.orderBy(asc("Age")).show(5)
11.
12. // 生序排序
13. df.orderBy($"Age".asc).show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 7| Male| 34| 3500|
5. | 13|Female| 34| 2500|
6. | 1| Male| 34| 2000|
7. | 31| Male| 34| 2000|
8. | 19|Female| 34| 4500|
9. +---+------+---+-------------+
10. only showing top 5 rows
11.
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. | 7| Male| 34| 3500|
16. | 13|Female| 34| 2500|
17. | 1| Male| 34| 2000|
18. | 31| Male| 34| 2000|
19. | 19|Female| 34| 4500|
20. +---+------+---+-------------+
21. only showing top 5 rows
22.
23. +---+------+---+-------------+
24. | ID| Gener|Age|Annual Income|
25. +---+------+---+-------------+
26. | 8| Male| 23| 2500|
27. | 14| Male| 23| 4500|
28. | 2|Female| 23| 3500|
29. | 32| Male| 23| 3500|
30. | 20|Female| 23| 5500|
31. +---+------+---+-------------+
32. only showing top 5 rows
33.
34. +---+------+---+-------------+
35. | ID| Gener|Age|Annual Income|
36. +---+------+---+-------------+
37. | 8| Male| 23| 2500|
38. | 14| Male| 23| 4500|
39. | 2|Female| 23| 3500|
40. | 32| Male| 23| 3500|
41. | 20|Female| 23| 5500|
42. +---+------+---+-------------+
43. only showing top 5 rows
sort方法和orderBy方法类似。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.functions._
2.
3. // 降序
4. df.sort(desc("Age")).show(5)
5.
6. // 升序
7. df.sort(asc("Age")).show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 7| Male| 34| 3500|
5. | 13|Female| 34| 2500|
6. | 1| Male| 34| 2000|
7. | 31| Male| 34| 2000|
8. | 19|Female| 34| 4500|
9. +---+------+---+-------------+
10. only showing top 5 rows
11.
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. | 8| Male| 23| 2500|
16. | 14| Male| 23| 4500|
17. | 2|Female| 23| 3500|
18. | 32| Male| 23| 3500|
19. | 20|Female| 23| 5500|
20. +---+------+---+-------------+
21. only showing top 5 rows
实验结果运行准确,无误
经过本节实验的学习,通过DataFrame的操作-使用DSL,进一步巩固了我们的Spark基础。
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
1、数据写入MySQL中mode=’append’的意思是什么( A ){单选}
A、追加
B、覆盖
C、修改
D、删除
1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。
