目录
Dataset是从Spark1.6 Alpha版本中引入的一个新的数据抽线结构,最懂在Spark2.0版本被定义成Spark新特性。
RDD, DataFrame,Dataset数据对比
1 RDD数据没有数据类型和元数据信息
2 DataFrame添加了Schema信息,每一行的类型固定为Row,每一列的值无法直接访问
3 在RDD的基础上增加了一个数据类型,可以拥有严格的错误检查机制。
4 Dataset和DataFrame拥有完全相同的成员函数
从RDD生成Dataset
val personDataSet = spark.createDataset(sc.textFile("/spark/person.txt"))
查看结构
personDataSet.show

DateSet 可以直接转换为DataFrame
personDataSet.toDF()
personDataSet.toDF().show

通常有两种方法实现基于RDD转换到DataFrame,第一种情况当一直RDD的数据结构(元数据信息),可以通过反射机制来推断生产Schema,另外一种情况,如果不清楚Dataset的数据结构,可以通过编程接口实现。
Step1 添加依赖(在项目里的一个pom.xml的文件里面)
- #依赖需要添加到标签
- #<dependencies>
- <依赖添加位置>
- #</dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>2.3.2</version>
- </dependency>
Step 2 编写代码:
一:反射创建DataFrame
创建一个名为CaseClassSchema的scala项目
- package cn.itcast
-
- import org.apache.spark.SparkContext
-
- import org.apache.spark.rdd.RDD
-
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- case class Person(id:Int,name:String,age:Int)
-
- object CaseClassSchema {
-
- def main(args: Array[String]): Unit = {
-
- val spark : SparkSession=SparkSession.builder()
-
- .appName("CaseClassSchema")
-
- .master("local[2]")
-
- .getOrCreate()
-
- //2.获取SparkContext对象
-
- val sc:SparkContext=spark.sparkContext
-
- //设置日志打印级别
-
- sc.setLogLevel("WARN")
-
- //3.读取文件
-
- val data: RDD[Array[String]]=
-
- sc.textFile("F://spark_chapter02//src//main//scala//cn//itcast//person.txt").map(x=>x.split(" "))
-
- //4.将RDD与样例关联
-
-
- val personRdd: RDD[Person]=
-
- data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
-
- //5.获取DataFrame
-
- //手动导入隐式转换
-
- import spark.implicits._
-
- val personDF:DataFrame=personRdd.toDF
-
-
-
- //-----------DSL风格操作开始----------------
-
- //1.显示DataFrame的数据,默认显示20行
-
- personDF.show()
-
- //2.显示DataFrame的schema信息
-
- personDF.printSchema()
-
- //3.统计DataFrame中年龄大于30岁的人数
-
- println(personDF.filter($"age">30).count())
-
- //-----------DSL风格操作结束----------------
-
- //-----------SQL风格操作开始----------------
-
- //将DataFrame注册成表
-
- personDF.createOrReplaceTempView("t_person")
-
- spark.sql("select * from t_person").show()
-
- spark.sql("select * from t_person where name='zhangsan'").show()
-
- //-----------SQL风格操作结束----------------
-
- //关闭资源操作
-
- sc.stop()
-
- spark.stop()
-
- }
- }
编程方式的步骤
Step1 : 创建一个Row对象结构的RDD
创建一个名为SparkSqlSchema的scala文件
- package cn.itcast
-
-
-
- import org.apache.spark.SparkContext
-
- import org.apache.spark.rdd.RDD
-
- import org.apache.spark.sql.types.{IntegerType, StructType,StructField,StringType}
-
- import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-
- //case class Person(id:Int,name:String,age:Int)
-
- object SparkSqlSchema {
-
- def main(args: Array[String]): Unit = {
-
- val spark : SparkSession=SparkSession.builder()
-
- .appName("SparkSqlSchema")
-
- .master("local[2]")
-
- .getOrCreate()
-
- //2.获取SparkContext对象
-
- val sc:SparkContext=spark.sparkContext
-
- //设置日志打印级别
-
- sc.setLogLevel("WARN")
-
- //3.读取文件
-
- val dataRDD: RDD[String]=
-
- sc.textFile("F://spark_chapter02//src//main//scala//cn//itcast//person.txt")
-
- //4.将RDD与样例关联
-
- val dataArrayRDD:RDD[Array[String]]=dataRDD.map(_.split(" "))
-
- //5.加载数据到Row对象中
-
- val personRDD: RDD[Row]=dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
-
- //6.创建Schema
-
- val schema:StructType=StructType(Seq(
-
- StructField("id",IntegerType,false),
-
- StructField("name",StringType,false),
-
- StructField("age",IntegerType,false)
-
- ))
-
- //7.利用personRDD与Schema创建DataFrame
-
- val personDF:DataFrame = spark.createDataFrame(personRDD,schema)
-
- //8.DSL操作显示DataFrame的数据结果
-
- personDF.show()
-
- //9.将DataFrame注册成表
-
- personDF.createOrReplaceTempView("t_person")
-
- //10.sql语句操作
-
- spark.sql("select * from t_person").show()
-
- //11.关闭资源
-
- sc.stop()
-
- spark.stop()
-
- }
- }
