DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数据集合,它提供了强类型支持,也就是给 RDD 的每行数据都添加了类型约束。
在 Spark 2.0 中,DataFrame 和 DataSet 被合并为 DataSet 。DataSet包含 DataFrame 的功能,DataFrame 表示为 DataSet[Row] ,即DataSet 的子集。
RDD 是DataFrame 和 DataSet 的底层,如果需要更多的控制功能(比如精确控制Spark 怎么执行一条查询),尽量使用 RDD。
如果希望在编译时获得更高的类型安全,建议使用 DataSet。
如果想统一简化 Spark 的API ,则使用 DataFrame 和 DataSet。
基于 DataFrame API 和 DataSet API 开发的程序会被自动优化,开发人员不需要操作底层的RDD API 来手动优化,大大提高了开发效率。但是RDD API 对于非结构化数据的处理有独特的优势(比如文本数据流),而且更方便我们做底层的操作。
- def main(args: Array[String]): Unit = {
- //local代表本地单线程模式 local[*]代表本地多线程模式
- val spark = SparkSession.builder()
- .appName("create dataset")
- .master("local[*]")
- .getOrCreate()
-
- //一定要导入它 不然无法创建DataSet对象
- import spark.implicits._
-
- val ds1 = spark.createDataset(1 to 5)
- ds1.show()
-
- val ds2 = spark.createDataset(spark.sparkContext.textFile("data/sql/people.txt"))
- ds2.show()
- spark.stop()
- }
运行结果:
- +-----+
- |value|
- +-----+
- | 1|
- | 2|
- | 3|
- | 4|
- | 5|
- +-----+
-
- +--------+
- | value|
- +--------+
- | Tom, 21|
- |Mike, 25|
- |Andy, 18|
- +--------+
- import org.apache.spark.sql.{Dataset, SparkSession}
-
- object DataSetCreate {
-
- //case类一定要写到main方法之外
- case class Person(name:String,age:Int)
-
- def main(args: Array[String]): Unit = {
- //local代表本地单线程模式 local[*]代表本地多线程模式
- val spark = SparkSession.builder()
- .appName("create dataset")
- .master("local[*]")
- .getOrCreate()
-
- //一定要导入 SparkSession对象下的implicits
- import spark.implicits._
-
-
- val data = List(Person("Tom",21),Person("Andy",22))
- val ds: Dataset[Person] = data.toDS()
- ds.show()
-
- spark.stop()
- }
- }
运行结果:
- +----+---+
- |name|age|
- +----+---+
- | Tom| 21|
- |Andy| 22|
- +----+---+
- object DataSetCreate{
- case class Person(name:String,age:Long,sex:String)
- def main(args: Array[String]): Unit = {
-
- //local代表本地单线程模式 local[*]代表本地多线程模式
- val spark = SparkSession.builder()
- .appName("create dataset")
- .master("local[*]")
- .getOrCreate()
-
- import spark.implicits._
-
- val df = spark.read.json("data/sql/people.json")
- val ds = df.as[Person]
-
- ds.show()
- spark.stop()
- }
- }
- package com.study.spark.core.sql
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{Row, SparkSession}
-
- object TransForm {
-
- case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Long
-
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder()
- .appName("transform")
- .master("local[*]")
- .getOrCreate()
-
- import spark.implicits._
-
- //1.RDD和DataFrame之间互相转换
- //1.1 创建RDD对象
- val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
- .map(_.split(","))
- .map(attr => Person(attr(0), attr(1).trim.toInt))
- rdd.foreach(println)
- /*
- Person(Andy,18)
- Person(Tom,21)
- Person(Mike,25)
- */
- //1.2 RDD转DataFrame
- val df = rdd.toDF()
- df.show()
- /*
- +----+---+
- |name|age|
- +----+---+
- | Tom| 21|
- |Mike| 25|
- |Andy| 18|
- +----+---+
- */
- //1.3 DataFrame转RDD
- val res: RDD[Row] = df.rdd
- /*
- [Andy,18]
- [Tom,21]
- [Mike,25]
- */
-
- res.foreach(println)
- spark.stop()
- }
- }
可以看到,RDD[Person]转为DataFrame后,再从DataFrame转回RDD就变成了RDD[Row] 类型了。
RDD 和 DataSet 之间的转换比较简单:
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{Row, SparkSession}
-
- object TransForm {
-
- case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Long
-
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder()
- .appName("transform")
- .master("local[*]")
- .getOrCreate()
-
- import spark.implicits._
-
- //1.RDD和DataSet之间互相转换
- //1.1 创建RDD对象
- val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
- .map(_.split(","))
- .map(attr => Person(attr(0), attr(1).trim.toInt))
- rdd.foreach(println)
- /*
- Person(Andy,18)
- Person(Tom,21)
- Person(Mike,25)
- */
- //1.2 RDD转DataSet
- val ds = rdd.toDS()
- ds.show()
- /*
- +----+---+
- |name|age|
- +----+---+
- | Tom| 21|
- |Mike| 25|
- |Andy| 18|
- +----+---+
- */
- //1.3 DataFrame转RDD
- val res: RDD[Person] = ds.rdd
- res.foreach(println)
- /*
- Person(Andy,18)
- Person(Tom,21)
- Person(Mike,25)
- */
- spark.stop()
- }
- }
可以看到,相比RDD和DataFrame互相转换,RDD和DataSet转换的过程中,不会有数据类型的变化,而DataFrame转RDD的过程就会把我们定义的case类转为Row对象。
- import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-
- object TransForm {
-
- case class Person(name:String,age:Long,sex:String) //txt文件age字段可以用Int,但json文件尽量用Long
-
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder()
- .appName("transform")
- .master("local[*]")
- .getOrCreate()
-
- import spark.implicits._
-
- //1.DataFrame和DataSet之间互相转换
- //1.1 创建DataFrame对象
- val df = spark.read.json("data/sql/people.json")
- df.show()
- /*
- +---+----------+---+
- |age| name|sex|
- +---+----------+---+
- | 30| Michael| 男|
- | 19| Andy| 女|
- | 19| Justin| 男|
- | 20|Bernadette| 女|
- | 23| Gretchen| 女|
- | 27| David| 男|
- | 33| Joseph| 女|
- | 27| Trish| 女|
- | 33| Alex| 女|
- | 25| Ben| 男|
- +---+----------+---+
- */
- //1.2 DataFrame转DataSet
- val ds = df.as[Person]
- ds.show()
- /*
- +---+----------+---+
- |age| name|sex|
- +---+----------+---+
- | 30| Michael| 男|
- | 19| Andy| 女|
- | 19| Justin| 男|
- | 20|Bernadette| 女|
- | 23| Gretchen| 女|
- | 27| David| 男|
- | 33| Joseph| 女|
- | 27| Trish| 女|
- | 33| Alex| 女|
- | 25| Ben| 男|
- +---+----------+---+
- */
- //1.3 DataSet转DataFrame
- val res: DataFrame = ds.toDF()
- res.show()
- /*
- +---+----------+---+
- |age| name|sex|
- +---+----------+---+
- | 30| Michael| 男|
- | 19| Andy| 女|
- | 19| Justin| 男|
- | 20|Bernadette| 女|
- | 23| Gretchen| 女|
- | 27| David| 男|
- | 33| Joseph| 女|
- | 27| Trish| 女|
- | 33| Alex| 女|
- | 25| Ben| 男|
- +---+----------+---+
- */
- spark.stop()
- }
- }
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder()
- .appName("create dataset")
- .master("local[*]")
- .getOrCreate()
-
- import spark.implicits._
-
- val res: Dataset[(String, Long)] = spark.read.text("data/word.txt").as[String]
- .flatMap(_.split(" "))
- .groupByKey(_.toLowerCase)
- .count()
-
- res.show()
-
- spark.stop()
- }
运行结果:
- | key|count(1)|
- +------+--------+
- | fast| 1|
- | is| 3|
- | spark| 2|
- |better| 1|
- | good| 1|
- |hadoop| 1|
- +------+--------+
剩下来就是不断练习各种DataFrame和DataSet的操作、熟悉各种转换和行动操作。