• spark学习笔记(九)——sparkSQL核心编程-DataFrame/DataSet/DF、DS、RDD三者之间的转换关系


    目录

    前言

    DataFrame 

    创建DataFrame 

    SQL语法

    DSL语法

    RDD转换为DataFrame

    DataFrame转换为RDD

    DataSet

    创建DataSet

    RDD转换为DataSet

    DataSet转换为RDD

    DataSet和DataFrame的转换

    RDD、DataFrame、DataSet之间的关系

    相同点

    区别点

    相互转换

    sparkSQL-IDEA编程

    添加依赖

    RDD<=>DataSet<=>DataFrame转换编码实现


    前言

    Spark SQL可以理解为Spark Core的一种封装,在模型上和上下文环境对象上进行了封装;

    SQLContext查询起始点:用于Spark自己提供的SQL查询;

    HiveContext查询起始点:用于连接Hive的查询。

    SparkSession:是Spark最新的SQL查询起始点,是SQLContext和HiveContext的组合,在 SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。

    注:Spark Core首先构建上下文环境对象SparkContext才可以执行应用程序,sparkSQL和spark core类似。使用spark-shell的时候, spark框架会自动创建一个名称叫做spark的SparkSession对象, 就像我们以前可以自动获取到一个sc来表示SparkContext对象一样。

    DataFrame 

    创建DataFrame 

    Spark SQLSparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:

    (1) 从spark数据源创建

    spark支持创建的数据源格式:csv format jdbc json load option options orc parquet schema

    table text textFile

    在bin目录下创建input目录,在input目录下创建user.json,内容为:

    {"username":"zj","age":20}
    {"username":"xx","age":21}
    {"username":"yy","age":22}

    读取bin/input/user.json文件

    (2)从RDD转换

    (3)从hive table查询返回

    SQL语法

    SQL语法是指查询数据的时候使用SQL语句查询,这种查询必须要有临时视图或全局视图来辅助。

    (1)读取json文件

    (2)创建临时表

    (3)实现查询

    (4)创建全局表

    df.createOrReplaceGlobalTempView("user2")

     注:普通临时表是Session范围内的;如果想扩大有效应用范围,可以使用全局临时表。使用全局临时表时需要全路径访问,global_temp.user2

    (5)实现查询

    1. #查询
    2. spark.sql("select * from global_temp.user2").show()
    3. #使用新的session查询
    4. spark.newSession().sql("select * from global_temp.user2").show()
    5. spark.newSession().sql("select age from global_temp.user2").show()

    DSL语法

    DataFrame提供一个特定领域语言DSL(domain-specific language)去管理结构化的数据。可以在Scala, Java, Python、R中使用DSL,使用DSL语法不用去创建临时视图。

     (1)创建DataFrame

    (2)查看DataFrame的schema信息

    (3)查看数据

     1)查看age数据

    2)查看username和age+1数据

    3)filter查看大于21数据

    4)groupBy按age分组查看数据条数

    RDD转换为DataFrame

    (1)sc.textFile创建RDD,转换为DataFrame

    1. val wordRDD = sc.textFile("input/word.txt")
    2. wordRDD.toDF("word").show()

     

     (2)makeRDDR创建RDD并直接转换为DataFrame

    1. case class user(name:String,age:Int)
    2. sc.makeRDD(List(("zj",20),("zx",21),("xc",23))).map(t => user(t._1,t._2)).toDF.show()

    注:在IDEA中开发程序时,如果需要RDDDF或者DS之间互相操作,那么需要引入import spark.implicits._

    import spark.implicits._:必须先创建SparkSession对象再导入,这里的spark是创建的sparkSession对象的变量名称。Scala只支持val修饰的对象的引入,切记这里的spark对象不能使用var声明。

    spark-shell自动完成此操作。

    DataFrame转换为RDD

    DataFrame其实就是对RDD的封装,可以直接获取内部的RDD。

    (1)创建RDD并转换为DataFrame,DataFrame转换为RDD

    1. val df = sc.makeRDD(List(("zj",20),("zx",21),("xc",23))).map(t => user(t._1,t._2)).toDF
    2. val rdd = df.rdd

    (2)RDD的collect操作 

    DataSet

    创建DataSet

    (1)使用样例类序列创建DataSet

    1. case class person(name:String,age:Long)
    2. val caseClassDS = Seq(person("zj",2)).toDS()
    3. caseClassDS.show

    (2)使用基本类型的序列创建DataSet

    1. val ds = Seq(1,2,3,4).toDS
    2. ds.show

     

    RDD转换为DataSet

    SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。case类可以包含诸如Seq、Array等复杂的结构。

    注:实际中很少把序列转换成DataSet,更多的是通过RDD来得到DataSet。

    1. case class user(name:String,age:Int)
    2. sc.makeRDD(List(("zj",21),("zz",22),("xx",24))).map(t => user(t._1,t._2)).toDS

    DataSet转换为RDD

    DataSet也是对RDD的封装,可以直接获取内部的RDD。

    1. #创建RDD并转换为DataSet
    2. case class user(name:String,age:Int)
    3. val ds = sc.makeRDD(List(("zj",21),("zz",22),("xx",24))).map(t => user(t._1,t._2)).toDS
    4. #DataSet转换为RDD
    5. val rdd = ds.rdd
    6. #RDD的collect操作
    7. rdd.collect

    DataSet和DataFrame的转换

    DataFrameDataSet的特例,它们之间可以互相转换。

    (1)DataFrame转换为DataSet

    1. val df = sc.makeRDD(List(("zj",21),("zz",22),("xx",24))).map(t => user(t._1,t._2)).toDF("name","age")
    2. val ds = df.as[user]

    (2)DataSet转化为DataFrame

    val df = ds.toDF

    RDD、DataFrame、DataSet之间的关系

    相同点

    (1)三者都有partition的概念;

    (2)三者有许多共同的函数,如map、filter等;

    (3)RDDDataFrameDataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;

    (4)三者都有惰性机制,在进行创建转换时不会立即执行,只有在遇到Action时三者才会开始运算;

    (5)在对DataFrameDataset进行操作许多操作都需要这个包import spark.implicits._;

    (6)三者都会根据Spark的内存情况自动缓存运算,即使数据量很大,也不用担心内存溢出;

    (7)DataFrameDataSet均可使用模式匹配获取各个字段的值和类型。

    区别点

    (1)RDD不支持sparksql操作;

    (2)RDD一般和spark mllib同时使用;

    (3)DataFrame和DataSet一般不与spark mllib同时使用;

    (4)DataFrameDataSet支持一些特别方便的保存方式,比如:csv,csv可以带上表头;

    (5)DataFrame与RDDDataset不同,每一行的类型固定为Row,每一列的值没法直 接访问,只有通过解析才能获取各个字段的值;

    (6)DataFrame与DataSet均支持SparkSQL的操作,还能注册临时表/视窗进行sql语句操作;

    (7)DataFrame和Dataset拥有完全相同的成员函数,区别只是每一行的数据类型不同,DataFrame就是DataSet的一个特例:type DataFrame = Dataset[Row]

    (8)DataFrame每一行的类型是Row,不解析各个字段是什么类型无从得知,只能用模式匹配拿出特定字段;而Dataset中每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息。

    相互转换

    sparkSQL-IDEA编程

    添加依赖

    1. <dependency>
    2. <groupId>org.apache.sparkgroupId>
    3. <artifactId>spark-sql_2.12artifactId>
    4. <version>3.0.0version>
    5. dependency>

    全部依赖展示

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.sparkgroupId>
    4. <artifactId>spark-core_2.12artifactId>
    5. <version>3.0.0version>
    6. dependency>
    7. <dependency>
    8. <groupId>org.apache.sparkgroupId>
    9. <artifactId>spark-sql_2.12artifactId>
    10. <version>3.0.0version>
    11. dependency>
    12. dependencies>
    13. <build>
    14. <plugins>
    15. <plugin>
    16. <groupId>net.alchim31.mavengroupId>
    17. <artifactId>scala-maven-pluginartifactId>
    18. <version>3.2.2version>
    19. <executions>
    20. <execution>
    21. <goals>
    22. <goal>testCompilegoal>
    23. goals>
    24. execution>
    25. executions>
    26. plugin>
    27. <plugin>
    28. <groupId>org.apache.maven.pluginsgroupId>
    29. <artifactId>maven-assembly-pluginartifactId>
    30. <version>3.1.0version>
    31. <configuration>
    32. <descriptorRefs>
    33. <descriptorRef>jar-with-dependenciesdescriptorRef>
    34. descriptorRefs>
    35. configuration>
    36. <executions>
    37. <execution>
    38. <id>make-assemblyid>
    39. <phase>packagephase>
    40. <goals>
    41. <goal>singlegoal>
    42. goals>
    43. execution>
    44. executions>
    45. plugin>
    46. plugins>
    47. build>

    RDD<=>DataSet<=>DataFrame转换编码实现

    下面是代码部分 

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    4. object sparkSQL_Basic {
    5. def main(args: Array[String]): Unit = {
    6. //TODO 创建sparkSQL运行环境
    7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    8. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    9. import spark.implicits._
    10. //TODO 执行逻辑操作
    11. //DataFrame
    12. println("DataFrame")
    13. val df = spark.read.json("datas/user.json")
    14. println("输出源数据")
    15. df.show()
    16. //DataFrame => SQL 要创建视图
    17. df.createOrReplaceTempView("user")
    18. println("输出所有")
    19. spark.sql("select * from user").show()
    20. println("输出age")
    21. spark.sql("select age from user").show()
    22. //DataFrame => DSL 不用创建视图
    23. println("输出所有")
    24. df.select("age","username").show()
    25. println("输出age+1")
    26. df.select($"age" + 1).show()
    27. df.select('age + 1).show()
    28. println(">>>>>>>>>>>>>>>>>>>>>>")
    29. //DataSet
    30. println("DataSet")
    31. val seq = Seq(1,2,3,4)
    32. val ds = seq.toDS()
    33. ds.show()
    34. println(">>>>>>>>>>>>>>>>>>>>>>>>")
    35. //RDD <=> DataFrame
    36. println("RDD <=> DataFrame DF->RDD")
    37. val rdd = spark.sparkContext.makeRDD(List((1,"zj",10),(2,"as",20),(3,"sd",30)))
    38. val frame = rdd.toDF("ID", "NAME", "AGE")
    39. val rowRDD : RDD[Row] = frame.rdd
    40. frame.show()
    41. println(rowRDD)
    42. //RDD <=> DataSet
    43. println("RDD <=> DataSet DS->RDD")
    44. val ds2 = rdd.map {
    45. case (id, name, age) => {
    46. User(id, name, age)
    47. }
    48. }.toDS()
    49. val userRDD = ds2.rdd
    50. ds2.show()
    51. println(userRDD)
    52. //DataFrame <=> DataSet
    53. println("DataFrame <=> DataSet DS->DF")
    54. val ds1:Dataset[User] = frame.as[User]
    55. val df1:DataFrame = ds1.toDF()
    56. ds1.show()
    57. df1.show()
    58. //TODO 关闭环境
    59. spark.stop()
    60. }
    61. case class User(id:Int,name:String,age:BigInt)
    62. }

    运行结果展示! 

     

      

    本文为学习笔记的记录!!

  • 相关阅读:
    Hadoop安装2
    Windows 下 Sublime Text 3.2.2 下载及配置
    win11安装ubuntu(by wsl2)
    【算法证明 二】快速排序的时间复杂度分析
    java入门-----基本语法之语句的使用
    MATLAB坐标区应用
    【金融项目】尚融宝项目(十五)
    【ACM学习】【STL】多重集合multiset和多重映射multimap
    supOS APP开发者课程练习册
    MMORPG网络游戏开发之Protobuf的基本使用
  • 原文地址:https://blog.csdn.net/qq_55906442/article/details/126222331