• IDEA 开发 SparkSQL


    一、添加依赖

    1. <!--SparkSQL依赖-->
    2. <dependency>
    3. <groupId>org.apache.spark</groupId>
    4. <artifactId>spark-sql_2.11</artifactId>
    5. <version>${spark.version}</version>
    6. </dependency>

    二、代码

    1.数据准备

    创建person.json文件

    1. {"name":"张三","age":20},
    2. {"name":"李四","age":22},
    3. {"name":"王五","age":19},
    4. {"name":"赵六","age":21},
    5. {"name":"田七","age":22}

    2.创建 DataFrame的SQL风格语法

    SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要 有临时视图或者全局视图来辅助

    1. object SparkSQLDemo1 {
    2. def main(args: Array[String]): Unit = {
    3. val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    4. import sparkSession.implicits._
    5. //读取 JSON 文件创建 DataFrame
    6. val df = sparkSession.read.json("input\\person.json")
    7. //对 DataFrame 创建一个临时表
    8. df.createOrReplaceTempView("person")
    9. //通过 SQL 语句实现查询年龄大于20
    10. val result = sparkSession.sql("select * from person where age > 20")
    11. //结果展示
    12. result.show()
    13. sparkSession.stop()
    14. }
    15. }

    运行结果:

    1. +---+----+
    2. |age|name|
    3. +---+----+
    4. | 22| 李四|
    5. | 21| 赵六|
    6. | 22| 田七|
    7. +---+----+

    3.创建DataFrame的DSL风格语法

    DSL 语法:DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

    1. object SparkSQLDemo2 {
    2. def main(args: Array[String]): Unit = {
    3. val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    4. import sparkSession.implicits._
    5. //创建一个 DataFrame
    6. val df = sparkSession.read.json("input\\person.json")
    7. //只查看"name"列数据,
    8. df.select("name").show()
    9. sparkSession.stop()
    10. }
    11. }

    运行结果:

    1. +----+
    2. |name|
    3. +----+
    4. | 张三|
    5. | 李四|
    6. | 王五|
    7. | 赵六|
    8. | 田七|
    9. +----+

    4.RDD转换DataFrame

    在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入

    import spark.implicits._

    这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

    spark-shell 中无需导入,自动完成此操作。

    1. object SparkSQLDemo3 {
    2. def main(args: Array[String]): Unit = {
    3. val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    4. import sparkSession.implicits._
    5. val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    6. val df = rdd.toDF("name","age")
    7. df.show()
    8. sparkSession.stop()
    9. }
    10. }

    运行结果:

    1. +----+---+
    2. |name|age|
    3. +----+---+
    4. | 张三| 20|
    5. | 李四| 19|
    6. | 王五| 21|
    7. +----+---+

    5.DataFrame转换DataSet

    DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。

    1. object SparkSQLDemo4 {
    2. def main(args: Array[String]): Unit = {
    3. val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    4. import sparkSession.implicits._
    5. val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    6. val df = rdd.toDF("name","age")
    7. val ds = df.as[Person2]
    8. ds.show()
    9. sparkSession.stop()
    10. }
    11. }
    12. //样例类
    13. case class Person2(name:String,age:Int)

    运行结果:

    1. +----+---+
    2. |name|age|
    3. +----+---+
    4. | 张三| 20|
    5. | 李四| 19|
    6. | 王五| 21|
    7. +----+---+

    6.DataFrame转换RDD

    DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD

    1. object SparkSQLDemo5 {
    2. def main(args: Array[String]): Unit = {
    3. val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    4. import sparkSession.implicits._
    5. val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    6. val df = rdd.toDF("name","age")
    7. val result = df.rdd
    8. result.foreach(line=>println(line.getString(0),line.getInt(1)))
    9. sparkSession.stop()
    10. }
    11. }

    运行结果:

    1. (李四,19)
    2. (张三,20)
    3. (王五,21)

    7.RDD转换DataSet

    SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结 构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结 构。

    1. object SparkSQLDemo6 {
    2. def main(args: Array[String]): Unit = {
    3. val sparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    4. import sparkSession.implicits._
    5. val rdd:RDD[(String,Int)]=sparkSession.sparkContext.makeRDD(List(("张三",20),("李四",19),("王五",21)))
    6. val ds = rdd.map{
    7. case(name,age)=>Person2(name,age)
    8. }.toDS()
    9. ds.show()
    10. sparkSession.stop()
    11. }
    12. }
    13. //样例类
    14. case class Person2(name:String,age:Int)

    运行结果:

    1. +----+---+
    2. |name|age|
    3. +----+---+
    4. | 张三| 20|
    5. | 李四| 19|
    6. | 王五| 21|
    7. +----+---+

  • 相关阅读:
    算法题笔记 6-10 (青蛙跳台阶)
    Python学习小组课程P6-Python办公(3)邮件与钉钉消息通知
    Python开发IDE的比较:PyCharm vs. VS Code vs. Jupyter
    Linux 权限系统
    【转】大数据安全--敏感数据识别和分级打标
    VisualStudio运行程序,点击应用程序时,弹出多个个窗体问题
    探究并发和并行、同步和异步、进程和线程、阻塞和非阻塞、响应和吞吐等
    接口自动化测试框架搭建【附教程加源码】
    字体号数与像素对应关系
    配置与管理Samba服务器
  • 原文地址:https://blog.csdn.net/m0_55834564/article/details/125489375