• Spark【Spark SQL(三)DataSet】


    DataSet

             DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数据集合,它提供了强类型支持,也就是给 RDD 的每行数据都添加了类型约束。

            在 Spark 2.0 中,DataFrame 和 DataSet 被合并为 DataSet 。DataSet包含 DataFrame 的功能,DataFrame 表示为 DataSet[Row] ,即DataSet 的子集。

    三种 API 的选择

            RDD 是DataFrame 和 DataSet 的底层,如果需要更多的控制功能(比如精确控制Spark 怎么执行一条查询),尽量使用 RDD。

            如果希望在编译时获得更高的类型安全,建议使用 DataSet。

            如果想统一简化 Spark 的API ,则使用 DataFrame 和 DataSet。

            基于 DataFrame API 和 DataSet API 开发的程序会被自动优化,开发人员不需要操作底层的RDD API 来手动优化,大大提高了开发效率。但是RDD API 对于非结构化数据的处理有独特的优势(比如文本数据流),而且更方便我们做底层的操作。

    DataSet 的创建

    1、使用createDataset()方法创建

    1. def main(args: Array[String]): Unit = {
    2. //local代表本地单线程模式 local[*]代表本地多线程模式
    3. val spark = SparkSession.builder()
    4. .appName("create dataset")
    5. .master("local[*]")
    6. .getOrCreate()
    7. //一定要导入它 不然无法创建DataSet对象
    8. import spark.implicits._
    9. val ds1 = spark.createDataset(1 to 5)
    10. ds1.show()
    11. val ds2 = spark.createDataset(spark.sparkContext.textFile("data/sql/people.txt"))
    12. ds2.show()
    13. spark.stop()
    14. }

    运行结果:

    1. +-----+
    2. |value|
    3. +-----+
    4. | 1|
    5. | 2|
    6. | 3|
    7. | 4|
    8. | 5|
    9. +-----+
    10. +--------+
    11. | value|
    12. +--------+
    13. | Tom, 21|
    14. |Mike, 25|
    15. |Andy, 18|
    16. +--------+

    2、通过 toDS 方法生成

    1. import org.apache.spark.sql.{Dataset, SparkSession}
    2. object DataSetCreate {
    3. //case类一定要写到main方法之外
    4. case class Person(name:String,age:Int)
    5. def main(args: Array[String]): Unit = {
    6. //local代表本地单线程模式 local[*]代表本地多线程模式
    7. val spark = SparkSession.builder()
    8. .appName("create dataset")
    9. .master("local[*]")
    10. .getOrCreate()
    11. //一定要导入 SparkSession对象下的implicits
    12. import spark.implicits._
    13. val data = List(Person("Tom",21),Person("Andy",22))
    14. val ds: Dataset[Person] = data.toDS()
    15. ds.show()
    16. spark.stop()
    17. }
    18. }

    运行结果:

    1. +----+---+
    2. |name|age|
    3. +----+---+
    4. | Tom| 21|
    5. |Andy| 22|
    6. +----+---+

    3、通过DataFrame 转换生成

    需要注意:json中的数
    1. object DataSetCreate{
    2. case class Person(name:String,age:Long,sex:String)
    3. def main(args: Array[String]): Unit = {
    4. //local代表本地单线程模式 local[*]代表本地多线程模式
    5. val spark = SparkSession.builder()
    6. .appName("create dataset")
    7. .master("local[*]")
    8. .getOrCreate()
    9. import spark.implicits._
    10. val df = spark.read.json("data/sql/people.json")
    11. val ds = df.as[Person]
    12. ds.show()
    13. spark.stop()
    14. }
    15. }

    RDD、DataFrame 和 DataSet 之间的相互转换

    RDD <=> DataFrame

    1. RDD 转 DataFrame ,也就是上一篇博客中介绍的两种方法:
      1. 能创建case类,就直接映射出一个RDD[Person],然后调用toDF方法利用反射机制推断RDD模式。
      2. 无法创建case类,就使用编程方式定义RDD模式,使用 createDataFrame(rowRDD,schema) 指定rowRDD:RDD[Row]和schema:StructType。
      3. 如果RDD是像:RDD[(Long, String)] 这样保存的是一个元组类型的RDD,那么也可以直接使用 toDF 方法转为 DataFrame 对象,因为元组的 k,v 数据类型是已知的,就相当于有了创建 DataFrame 的模式信息(schema)。
    2. DataFrame 转 RDD,直接使用 rdd() 方法。
    1. package com.study.spark.core.sql
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql.{Row, SparkSession}
    4. object TransForm {
    5. case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Long
    6. def main(args: Array[String]): Unit = {
    7. val spark = SparkSession.builder()
    8. .appName("transform")
    9. .master("local[*]")
    10. .getOrCreate()
    11. import spark.implicits._
    12. //1.RDD和DataFrame之间互相转换
    13. //1.1 创建RDD对象
    14. val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
    15. .map(_.split(","))
    16. .map(attr => Person(attr(0), attr(1).trim.toInt))
    17. rdd.foreach(println)
    18. /*
    19. Person(Andy,18)
    20. Person(Tom,21)
    21. Person(Mike,25)
    22. */
    23. //1.2 RDD转DataFrame
    24. val df = rdd.toDF()
    25. df.show()
    26. /*
    27. +----+---+
    28. |name|age|
    29. +----+---+
    30. | Tom| 21|
    31. |Mike| 25|
    32. |Andy| 18|
    33. +----+---+
    34. */
    35. //1.3 DataFrame转RDD
    36. val res: RDD[Row] = df.rdd
    37. /*
    38. [Andy,18]
    39. [Tom,21]
    40. [Mike,25]
    41. */
    42. res.foreach(println)
    43. spark.stop()
    44. }
    45. }

    可以看到,RDD[Person]转为DataFrame后,再从DataFrame转回RDD就变成了RDD[Row] 类型了。


    RDD <=> DataSet

    RDD 和 DataSet 之间的转换比较简单:

    1. RDD 转 DataSet 直接使用case 类(比如Person),然后映射出 RDD[Person] ,直接调用 toDS方法。
    2. DataSet 转 RDD 直接调用 rdd方法即可。
    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.sql.{Row, SparkSession}
    3. object TransForm {
    4. case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Long
    5. def main(args: Array[String]): Unit = {
    6. val spark = SparkSession.builder()
    7. .appName("transform")
    8. .master("local[*]")
    9. .getOrCreate()
    10. import spark.implicits._
    11. //1.RDD和DataSet之间互相转换
    12. //1.1 创建RDD对象
    13. val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
    14. .map(_.split(","))
    15. .map(attr => Person(attr(0), attr(1).trim.toInt))
    16. rdd.foreach(println)
    17. /*
    18. Person(Andy,18)
    19. Person(Tom,21)
    20. Person(Mike,25)
    21. */
    22. //1.2 RDD转DataSet
    23. val ds = rdd.toDS()
    24. ds.show()
    25. /*
    26. +----+---+
    27. |name|age|
    28. +----+---+
    29. | Tom| 21|
    30. |Mike| 25|
    31. |Andy| 18|
    32. +----+---+
    33. */
    34. //1.3 DataFrame转RDD
    35. val res: RDD[Person] = ds.rdd
    36. res.foreach(println)
    37. /*
    38. Person(Andy,18)
    39. Person(Tom,21)
    40. Person(Mike,25)
    41. */
    42. spark.stop()
    43. }
    44. }

    可以看到,相比RDD和DataFrame互相转换,RDD和DataSet转换的过程中,不会有数据类型的变化,而DataFrame转RDD的过程就会把我们定义的case类转为Row对象。

    DataFrame <=> DataSet

    1. DataFrame 转 DataSet 先使用case类,然后直接使用 as[case 类] 方法。
    2. DataSet 转 DataFrame 直接使用 toDF 方法。
    1. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    2. object TransForm {
    3. case class Person(name:String,age:Long,sex:String) //txt文件age字段可以用Int,但json文件尽量用Long
    4. def main(args: Array[String]): Unit = {
    5. val spark = SparkSession.builder()
    6. .appName("transform")
    7. .master("local[*]")
    8. .getOrCreate()
    9. import spark.implicits._
    10. //1.DataFrame和DataSet之间互相转换
    11. //1.1 创建DataFrame对象
    12. val df = spark.read.json("data/sql/people.json")
    13. df.show()
    14. /*
    15. +---+----------+---+
    16. |age| name|sex|
    17. +---+----------+---+
    18. | 30| Michael| 男|
    19. | 19| Andy| 女|
    20. | 19| Justin| 男|
    21. | 20|Bernadette| 女|
    22. | 23| Gretchen| 女|
    23. | 27| David| 男|
    24. | 33| Joseph| 女|
    25. | 27| Trish| 女|
    26. | 33| Alex| 女|
    27. | 25| Ben| 男|
    28. +---+----------+---+
    29. */
    30. //1.2 DataFrame转DataSet
    31. val ds = df.as[Person]
    32. ds.show()
    33. /*
    34. +---+----------+---+
    35. |age| name|sex|
    36. +---+----------+---+
    37. | 30| Michael| 男|
    38. | 19| Andy| 女|
    39. | 19| Justin| 男|
    40. | 20|Bernadette| 女|
    41. | 23| Gretchen| 女|
    42. | 27| David| 男|
    43. | 33| Joseph| 女|
    44. | 27| Trish| 女|
    45. | 33| Alex| 女|
    46. | 25| Ben| 男|
    47. +---+----------+---+
    48. */
    49. //1.3 DataSet转DataFrame
    50. val res: DataFrame = ds.toDF()
    51. res.show()
    52. /*
    53. +---+----------+---+
    54. |age| name|sex|
    55. +---+----------+---+
    56. | 30| Michael| 男|
    57. | 19| Andy| 女|
    58. | 19| Justin| 男|
    59. | 20|Bernadette| 女|
    60. | 23| Gretchen| 女|
    61. | 27| David| 男|
    62. | 33| Joseph| 女|
    63. | 27| Trish| 女|
    64. | 33| Alex| 女|
    65. | 25| Ben| 男|
    66. +---+----------+---+
    67. */
    68. spark.stop()
    69. }
    70. }

    DataSet 实现 WordCount

    1. def main(args: Array[String]): Unit = {
    2. val spark = SparkSession.builder()
    3. .appName("create dataset")
    4. .master("local[*]")
    5. .getOrCreate()
    6. import spark.implicits._
    7. val res: Dataset[(String, Long)] = spark.read.text("data/word.txt").as[String]
    8. .flatMap(_.split(" "))
    9. .groupByKey(_.toLowerCase)
    10. .count()
    11. res.show()
    12. spark.stop()
    13. }

    运行结果:

    1. | key|count(1)|
    2. +------+--------+
    3. | fast| 1|
    4. | is| 3|
    5. | spark| 2|
    6. |better| 1|
    7. | good| 1|
    8. |hadoop| 1|
    9. +------+--------+

    总结

            剩下来就是不断练习各种DataFrame和DataSet的操作、熟悉各种转换和行动操作。

  • 相关阅读:
    Dr4g0n b4ll: 1 ~ VulnHub
    AIGC之Stable Diffusion
    MongoDB聚合运算符:$bitAnd
    Java进阶知识点及案例总结(续2)
    Python大数据之linux学习总结——day10_hive调优
    [学习笔记]TypeScript查缺补漏(一):类
    !与~有什么区别
    [BPU部署教程] 教你搞定YOLOV5部署 (版本: 6.2)
    在C语言中,堆和栈是两种不同的内存分配机制
    进度条小程序
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/132792106