• DataFrame的操作-使用DSL



    申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址
    全文共计12163字,阅读大概需要3分钟

    1. 实验室名称:

    大数据实验教学系统

    2. 实验项目名称:

    DataFrame的操作-使用DSL

    3. 实验学时:

    4. 实验原理:

    在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

    在这里插入图片描述

    5. 实验目的:

    掌握操作DataFrame的各种方法(转换、过滤、排序等)。
      掌握DataFrame存储操作。

    6. 实验内容:

    操作DataFrame的转换、过滤、排序等方法。具体包含如下内容:
      - select(cols)
      - selectExpr(expr)
      - drop(cols)
      - dropDuplicates(subset=None)
      - dropna(how=’any’,thresh=None,subset=None)
      - filter(condition)
      - where(condition)
      - limit(num)
      - withColumn(colName,col)
      - withColumnRename(existing,new)
      - orderBy(cols,kwargs)

    7. 实验器材(设备、虚拟机名称):

    硬件:x86_64 ubuntu 16.04服务器
      软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1

    8. 实验步骤:

    8.1 环境准备

    1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。
      在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:

    1.	$ start-dfs.sh
    2.	$ cd /opt/spark
    3.	$ ./sbin/start-all.sh
    4.	$ zeppelin-daemon.sh start
    
    • 1
    • 2
    • 3
    • 4

    2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:

    1.	$ hdfs dfs -mkdir -p /data/dataset/batch
    2.	$ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
    
    • 1
    • 2

    3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:
    在这里插入图片描述

    8.2 DataFrame的方法

    1、select(*cols)通过表达式选取DataFRame中符合条件的数据,返回新的DataFrame。在zeppelin中执行如下代码:

    1.	// 读取csv数据
    2.	var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
    3.	     
    4.	// 通过select统计共有多少条数据
    5.	df.select("*").count()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	df: org.apache.spark.sql.DataFrame = [ID: string, Gener: string ... 2 more fields]
    2.	res196: Long = 40
    
    • 1
    • 2

    通过表达式选取DataFRame中符合条件的数据。在zeppelin中执行如下代码:

    1.	// 选取Age和Gener列,获取前十条数据
    2.	df.select("Age","Gener").show(10)
    3.	     
    4.	// 查找Age列,并重命名为age,输出前十条数据
    5.	df.select($"Age".alias("age")).show(10)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---+------+
    2.	|Age| Gener|
    3.	+---+------+
    4.	| 34|  Male|
    5.	| 23|Female|
    6.	| 26|Female|
    7.	| 27|Female|
    8.	| 24|  Male|
    9.	| 33|  Male|
    10.	| 34|  Male|
    11.	| 23|  Male|
    12.	| 26|  Male|
    13.	| 27|Female|
    14.	+---+------+
    15.	only showing top 10 rows
    16.	
    17.	+---+
    18.	|age|
    19.	+---+
    20.	| 34|
    21.	| 23|
    22.	| 26|
    23.	| 27|
    24.	| 24|
    25.	| 33|
    26.	| 34|
    27.	| 23|
    28.	| 26|
    29.	| 27|
    30.	+---+
    31.	only showing top 10 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2、selectExpr(*expr),这个方法是select方法的一个变体,可以接受一个SQL表达式,返回新的DataFrame。在zeppelin中执行如下代码:

    1.	// 分别求取Age的乘积,Age的平方根。显示前十条数据
    2.	df.selectExpr("Age * 2","sqrt(Age)").show(10)
    
    • 1
    • 2

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---------+-------------------------+
    2.	|(Age * 2)|SQRT(CAST(Age AS DOUBLE))|
    3.	+---------+-------------------------+
    4.	|     68.0|        5.830951894845301|
    5.	|     46.0|        4.795831523312719|
    6.	|     52.0|       5.0990195135927845|
    7.	|     54.0|        5.196152422706632|
    8.	|     48.0|        4.898979485566356|
    9.	|     66.0|        5.744562646538029|
    10.	|     68.0|        5.830951894845301|
    11.	|     46.0|        4.795831523312719|
    12.	|     52.0|       5.0990195135927845|
    13.	|     54.0|        5.196152422706632|
    14.	+---------+-------------------------+
    15.	only showing top 10 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3、drop(*cols)按照列名删除DataFrame中的列,返回新的DataFrame。在zeppelin中执行如下代码:

    1.	// 打印所有的列名DataFrame.columns.values.tolist()
    2.	println(df.columns.toList)
    3.	     
    4.	// 删除Age列名
    5.	var df1 = df.drop("Age")
    6.	     
    7.	// 打印删除后的列名
    8.	println(df1.columns.toList)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	List(ID, Gener, Age, Annual Income)
    2.	List(ID, Gener, Annual Income)
    
    • 1
    • 2

    4、dropDuplicates(subset=None)删除重复行,subset用于指定删除重复行的时候考虑那几列。在zeppelin中执行如下代码:

    1.	// 导入所需依赖
    2.	import org.apache.spark.sql.Row
    3.	import org.apache.spark.sql._
    4.	import org.apache.spark.sql.types._
    5.	     
    6.	// 创建RDD,转换为DataFrame
    7.	var df = sc.parallelize(List(Row("simple",15,175),Row("simple",15,175),Row("simpleBDP",2,180)),3)
    8.	val schema = StructType(
    9.	      List(
    10.	        StructField("name", StringType, true),
    11.	        StructField("age", IntegerType, true),
    12.	        StructField("height", IntegerType, true)
    13.	      )
    14.	    )
    15.	     
    16.	var df1 = spark.createDataFrame(df, schema)
    17.	     
    18.	// 展示数据
    19.	// df.show
    20.	     
    21.	// 删除重复的数据
    22.	df1.dropDuplicates().show()
    23.	     
    24.	// 指定重复的列进行删除
    25.	 df1.dropDuplicates(List("age","name")).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---------+---+------+
    2.	|     name|age|height|
    3.	+---------+---+------+
    4.	|simpleBDP|  2|   180|
    5.	|   simple| 15|   175|
    6.	+---------+---+------+
    7.	
    8.	+---------+---+------+
    9.	|     name|age|height|
    10.	+---------+---+------+
    11.	|simpleBDP|  2|   180|
    12.	|   simple| 15|   175|
    13.	+---------+---+------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    5、na.drop删除DataFrame中的空数据,加入”any”和”all”指定如何删除控制,加入数字参数指定有多少个空值进行删除,加入字段名删除指定字段中的空值。在zeppelin中执行如下代码:

    1.	var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
    2.	     
    3.	val schema = StructType(
    4.	      List(
    5.	        StructField("luck", DoubleType, true),
    6.	        StructField("age", DoubleType, true),
    7.	        StructField("weight", DoubleType, true)
    8.	      )
    9.	    )
    10.	     
    11.	// 创建DataFrame
    12.	var df = spark.createDataFrame(dfs,schema)
    13.	     
    14.	// 显示DataFrame
    15.	df.show()
    16.	     
    17.	// 有任意一个为na进行删除
    18.	df.na.drop("any").show
    19.	     
    20.	// 全部为na进行删除
    21.	df.na.drop("all").show
    22.	     
    23.	// 删除有两个na值的数据
    24.	df.na.drop(2).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+----+----+------+
    2.	|luck| age|weight|
    3.	+----+----+------+
    4.	|null|27.0| 170.0|
    5.	|44.0|27.0| 170.0|
    6.	|null|null|  null|
    7.	+----+----+------+
    8.	
    9.	+----+----+------+
    10.	|luck| age|weight|
    11.	+----+----+------+
    12.	|44.0|27.0| 170.0|
    13.	+----+----+------+
    14.	
    15.	+----+----+------+
    16.	|luck| age|weight|
    17.	+----+----+------+
    18.	|null|27.0| 170.0|
    19.	|44.0|27.0| 170.0|
    20.	+----+----+------+
    21.	
    22.	+----+----+------+
    23.	|luck| age|weight|
    24.	+----+----+------+
    25.	|null|27.0| 170.0|
    26.	|44.0|27.0| 170.0|
    27.	+----+----+------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    6、filter(condition)按照传入的条件进行过滤,其实where方法就是filter方法的一个别名而已。。在zeppelin中执行如下代码:

    1.	var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
    2.	     
    3.	val schema = StructType(
    4.	      List(
    5.	        StructField("luck", DoubleType, true),
    6.	        StructField("age", DoubleType, true),
    7.	        StructField("weight", DoubleType, true)
    8.	      )
    9.	    )
    10.	     
    11.	// 创建DataFrame
    12.	var df = spark.createDataFrame(dfs,schema)
    13.	     
    14.	// 过滤数据
    15.	// df.filter($"luck"==44.0).show
    16.	df.filter("luck is not null").show
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+----+----+------+
    2.	|luck| age|weight|
    3.	+----+----+------+
    4.	|44.0|27.0| 170.0|
    5.	+----+----+------+
    
    • 1
    • 2
    • 3
    • 4
    • 5

    7、where(condition),这个方法和filter方法类似,更具传入的条件作出选择。在zeppelin中执行如下代码:

    1.	// 加载csv数据
    2.	var df  =  spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
    3.	     
    4.	// 打印数据
    5.	// df.show
    6.	     
    7.	// 数据筛选
    8.	df.where("Age >= 30").show
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  1|  Male| 34|         2000|
    5.	|  6|  Male| 33|         2000|
    6.	|  7|  Male| 34|         3500|
    7.	| 12|Female| 33|         3500|
    8.	| 13|Female| 34|         2500|
    9.	| 18|  Male| 33|         2500|
    10.	| 19|Female| 34|         4500|
    11.	| 24|  Male| 33|         4500|
    12.	| 25|  Male| 34|         5500|
    13.	| 30|Female| 33|         5500|
    14.	| 31|  Male| 34|         2000|
    15.	| 36|Female| 33|         2000|
    16.	| 37|Female| 34|         3500|
    17.	+---+------+---+-------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    8、limit(num)限制返回的数据的条数,防止返回到driver节点的数据过大造成OOM。在zeppelin中执行如下代码:

    1.	// 限制返回10条数据
    2.	df.limit(10).show
    
    • 1
    • 2

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  1|  Male| 34|         2000|
    5.	|  2|Female| 23|         3500|
    6.	|  3|Female| 26|         2500|
    7.	|  4|Female| 27|         4500|
    8.	|  5|  Male| 24|         5500|
    9.	|  6|  Male| 33|         2000|
    10.	|  7|  Male| 34|         3500|
    11.	|  8|  Male| 23|         2500|
    12.	|  9|  Male| 26|         4500|
    13.	| 10|Female| 27|         5500|
    14.	+---+------+---+-------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    9、withColumn(colName,col),返回一个新的DataFrame,这个DataFrame中新增加的colNAme的列,如果原来本身就有colName的列,就替换掉。在zeppelin中执行如下代码:

    1.	// 添加Age列,Age本身存在,替换掉
    2.	df.withColumn("Age",$"Age"*$"Age").show(10)
    3.	     
    4.	// 添加Age2列
    5.	df.withColumn("Age2",$"Age"*$"Age").show(10)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---+------+------+-------------+
    2.	| ID| Gener|   Age|Annual Income|
    3.	+---+------+------+-------------+
    4.	|  1|  Male|1156.0|         2000|
    5.	|  2|Female| 529.0|         3500|
    6.	|  3|Female| 676.0|         2500|
    7.	|  4|Female| 729.0|         4500|
    8.	|  5|  Male| 576.0|         5500|
    9.	|  6|  Male|1089.0|         2000|
    10.	|  7|  Male|1156.0|         3500|
    11.	|  8|  Male| 529.0|         2500|
    12.	|  9|  Male| 676.0|         4500|
    13.	| 10|Female| 729.0|         5500|
    14.	+---+------+------+-------------+
    15.	only showing top 10 rows
    16.	     
    17.	+---+------+---+-------------+------+
    18.	| ID| Gener|Age|Annual Income|  Age2|
    19.	+---+------+---+-------------+------+
    20.	|  1|  Male| 34|         2000|1156.0|
    21.	|  2|Female| 23|         3500| 529.0|
    22.	|  3|Female| 26|         2500| 676.0|
    23.	|  4|Female| 27|         4500| 729.0|
    24.	|  5|  Male| 24|         5500| 576.0|
    25.	|  6|  Male| 33|         2000|1089.0|
    26.	|  7|  Male| 34|         3500|1156.0|
    27.	|  8|  Male| 23|         2500| 529.0|
    28.	|  9|  Male| 26|         4500| 676.0|
    29.	| 10|Female| 27|         5500| 729.0|
    30.	+---+------+---+-------------+------+
    31.	only showing top 10 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    10、withColumnRename(existing,new),对已经存在的列明重命名为new,若名称不存在则这个操作不做任何事情。在zeppelin中执行如下代码:

    1.	// 修改Age的列名
    2.	df.withColumnRenamed("Age","age").show(10)
    
    • 1
    • 2

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  1|  Male| 34|         2000|
    5.	|  2|Female| 23|         3500|
    6.	|  3|Female| 26|         2500|
    7.	|  4|Female| 27|         4500|
    8.	|  5|  Male| 24|         5500|
    9.	|  6|  Male| 33|         2000|
    10.	|  7|  Male| 34|         3500|
    11.	|  8|  Male| 23|         2500|
    12.	|  9|  Male| 26|         4500|
    13.	| 10|Female| 27|         5500|
    14.	+---+------+---+-------------+
    15.	only showing top 10 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    11、orderBy(cols,*kwargs),返回按照指定列排好序的新的DataFrame。在zeppelin中执行如下代码:

    1.	// 对年龄进行排序
    2.	df.orderBy("Age").show(5)
    3.	     
    4.	// 降序排序
    5.	df.orderBy($"Age".desc).show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  8|  Male| 23|         2500|
    5.	| 14|  Male| 23|         4500|
    6.	|  2|Female| 23|         3500|
    7.	| 32|  Male| 23|         3500|
    8.	| 20|Female| 23|         5500|
    9.	+---+------+---+-------------+
    10.	only showing top 5 rows
    11.	     
    12.	+---+------+---+-------------+
    13.	| ID| Gener|Age|Annual Income|
    14.	+---+------+---+-------------+
    15.	|  7|  Male| 34|         3500|
    16.	| 13|Female| 34|         2500|
    17.	|  1|  Male| 34|         2000|
    18.	| 31|  Male| 34|         2000|
    19.	| 19|Female| 34|         4500|
    20.	+---+------+---+-------------+
    21.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    使用ascending关键字参数指定升降序排列。除了这种方式,还可以通过pyspark.sql.functions中定义好的desc降序和asc方法来排序。在zeppelin中执行如下代码:

    1.	import org.apache.spark.sql.functions._
    2.	     
    3.	// 对年龄进行排序
    4.	df.orderBy(desc("Age")).show(5)
    5.	     
    6.	// 降序排序
    7.	df.orderBy($"Age".desc).show(5)
    8.	     
    9.	// 对年龄进行排序
    10.	df.orderBy(asc("Age")).show(5)
    11.	     
    12.	// 生序排序
    13.	df.orderBy($"Age".asc).show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    同时按下【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  7|  Male| 34|         3500|
    5.	| 13|Female| 34|         2500|
    6.	|  1|  Male| 34|         2000|
    7.	| 31|  Male| 34|         2000|
    8.	| 19|Female| 34|         4500|
    9.	+---+------+---+-------------+
    10.	only showing top 5 rows
    11.	     
    12.	+---+------+---+-------------+
    13.	| ID| Gener|Age|Annual Income|
    14.	+---+------+---+-------------+
    15.	|  7|  Male| 34|         3500|
    16.	| 13|Female| 34|         2500|
    17.	|  1|  Male| 34|         2000|
    18.	| 31|  Male| 34|         2000|
    19.	| 19|Female| 34|         4500|
    20.	+---+------+---+-------------+
    21.	only showing top 5 rows
    22.	     
    23.	+---+------+---+-------------+
    24.	| ID| Gener|Age|Annual Income|
    25.	+---+------+---+-------------+
    26.	|  8|  Male| 23|         2500|
    27.	| 14|  Male| 23|         4500|
    28.	|  2|Female| 23|         3500|
    29.	| 32|  Male| 23|         3500|
    30.	| 20|Female| 23|         5500|
    31.	+---+------+---+-------------+
    32.	only showing top 5 rows
    33.	     
    34.	+---+------+---+-------------+
    35.	| ID| Gener|Age|Annual Income|
    36.	+---+------+---+-------------+
    37.	|  8|  Male| 23|         2500|
    38.	| 14|  Male| 23|         4500|
    39.	|  2|Female| 23|         3500|
    40.	| 32|  Male| 23|         3500|
    41.	| 20|Female| 23|         5500|
    42.	+---+------+---+-------------+
    43.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    sort方法和orderBy方法类似。在zeppelin中执行如下代码:

    1.	import org.apache.spark.sql.functions._
    2.	     
    3.	// 降序
    4.	df.sort(desc("Age")).show(5)
    5.	     
    6.	// 升序
    7.	df.sort(asc("Age")).show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    同时按下【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  7|  Male| 34|         3500|
    5.	| 13|Female| 34|         2500|
    6.	|  1|  Male| 34|         2000|
    7.	| 31|  Male| 34|         2000|
    8.	| 19|Female| 34|         4500|
    9.	+---+------+---+-------------+
    10.	only showing top 5 rows
    11.	     
    12.	+---+------+---+-------------+
    13.	| ID| Gener|Age|Annual Income|
    14.	+---+------+---+-------------+
    15.	|  8|  Male| 23|         2500|
    16.	| 14|  Male| 23|         4500|
    17.	|  2|Female| 23|         3500|
    18.	| 32|  Male| 23|         3500|
    19.	| 20|Female| 23|         5500|
    20.	+---+------+---+-------------+
    21.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    9. 实验结果及分析:

    实验结果运行准确,无误

    10. 实验结论:

    经过本节实验的学习,通过DataFrame的操作-使用DSL,进一步巩固了我们的Spark基础。

    11. 总结及心得体会:

    在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

    12、 实验测试

    1、数据写入MySQL中mode=’append’的意思是什么( A ){单选}
      A、追加
      B、覆盖
      C、修改
      D、删除

    13、实验拓展

    1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。

    在这里插入图片描述

  • 相关阅读:
    制作翻页电子画册,手机观看更快捷
    jenkins 发布项目到k8s tomcat
    IPEmotion曲线平滑计算
    Java集合之ArrayList与LinkedList
    redis(其它操作、管道)、django中使用redis(通用方案、 第三方模块)、django缓存、celery介绍(celery的快速使用)
    GD32L233RCT6学习开发(一)
    Web前端系列技术之JavaScript基础(从入门开始)⑤
    前端开发中的 TypeScript 泛型:深入解析
    竞赛题-6233. 温度转换
    来测测你的智力——智力题
  • 原文地址:https://blog.csdn.net/qq_44807756/article/details/125570716