• Spark SQL 概述


    Spark SQL 概述

    Spark SQL 是 Apache Spark 的一个模块,专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能,使得处理大数据变得更加高效和简便。通过 Spark SQL,用户可以直接在 Spark 中使用 SQL 查询,或者使用 DataFrame 和 DataSet API 进行数据操作。

    一、Spark SQL 架构

    Spark SQL 的架构主要由以下几个组件组成:

    1. SparkSession:Spark 应用的统一入口点,用于创建 DataFrame、DataSet 和执行 SQL 查询。
    2. Catalyst 优化器:Spark SQL 的查询优化引擎,负责解析、分析、优化和生成物理执行计划。
    3. DataFrame 和 DataSet API:提供面向对象的编程接口,支持丰富的数据操作方法。
    4. 数据源接口:支持多种数据源,如 HDFS、S3、HBase、Cassandra、Hive 等。
    5. 执行引擎:将优化后的查询计划转换为执行任务,并在分布式集群上并行执行这些任务。

    二、Spark SQL 特点

    • 统一数据访问接口:支持多种数据源(如 CSV、JSON、Parquet、Hive、JDBC、HBase 等)并提供一致的查询接口。
    • DataFrame 和 Dataset API:提供面向对象的编程接口,支持类型安全的操作,便于数据处理。
    • Catalyst 优化器:自动将用户的查询转换为高效的执行计划,提升查询性能。
    • 与 Hive 的集成:无缝集成 Hive,能够直接访问现存的 Hive 数据,并使用 Hive 的 UDF 和 UDAF。
    • 高性能:通过 Catalyst 优化器和 Tungsten 执行引擎,实现高效的查询性能和内存管理。
    • 多种操作方式:支持 SQL 和 API 编程两种操作方式,灵活性高。
    • 外部工具接口:提供 JDBC/ODBC 接口供第三方工具借助 Spark 进行数据处理。
    • 高级接口:提供了更高层级的接口,方便地处理数据。

    三、Spark SQL 运行原理

    在这里插入图片描述

    查询解析(Query Parsing):将 SQL 查询解析成抽象语法树(AST)。

    逻辑计划生成(Logical Plan Generation):将 AST 转换为未优化的逻辑计划。

    逻辑计划优化(Logical Plan Optimization):使用 Catalyst 优化器对逻辑计划进行一系列规则优化。

    物理计划生成(Physical Plan Generation):将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。

    执行(Execution):将物理计划转换为 RDD,并在集群上并行执行。

    四、Spark SQL API 相关概述

    SparkContext:SparkContext 是 Spark 应用程序的主入口点,负责连接到 Spark 集群,管理资源和任务调度。在 Spark 2.0 之后,推荐使用 SparkSession 取代 SparkContext。

    SQLContext:SQLContext 是 Spark SQL 的编程入口点,允许用户通过 SQL 查询或 DataFrame API 进行数据处理。它提供了基本的 Spark SQL 功能。

    HiveContext:HiveContext 是 SQLContext 的子集,增加了对 Hive 的集成支持,可以直接访问 Hive 中的数据和元数据,使用 Hive 的 UDF 和 UDAF。

    SparkSession:SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了统一的编程接口。SparkSession 是 Spark SQL 的建议入口点,支持使用 DataFrame 和 Dataset API 进行数据处理。

    创建 SparkContext 和 SparkSession 的注意事项:如果同时需要创建 SparkContext 和 SparkSession,必须先创建 SparkContext,再创建 SparkSession。如果先创建 SparkSession,再创建 SparkContext,会导致异常,因为在同一个 JVM 中只能运行一个 SparkContext。

    五、Spark SQL 依赖

    <properties>
        <spark.version>3.1.2spark.version>
        <spark.scala.version>2.12spark.scala.version>
    properties>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-sql_${spark.scala.version}artifactId>
        <version>${spark.version}version>
    dependency>
    

    六、Spark SQL 数据集

    在 Spark SQL 中,数据集主要分为以下几种类型:DataFrame 和 Dataset。它们是处理和操作结构化和半结构化数据的核心抽象。

    1、DataFrame

    Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

    • 类似于二维表格:DataFrame 类似于传统的关系数据库中的二维表格。
    • Schema(数据结构信息):在 RDD 的基础上加入了 Schema,描述数据结构的信息。
    • 支持嵌套数据类型:DataFrame 的 Schema 支持嵌套的数据类型,如 structmaparray
    • 丰富的 SQL 操作 API:提供更多类似 SQL 操作的 API,便于进行数据查询和操作。

    2、Dataset

    Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:

    • 强类型:Spark 1.6中引入的一个更通用的数据集合,Dataset 是强类型的,提供类型安全的操作。
    • RDD + Schema:可以认为 Dataset 是 RDD 和 Schema 的结合,既有 RDD 的分布式计算能力,又有 Schema 描述数据结构的信息。
    • 适用于特定领域对象:可以存储和操作特定领域对象的强类型集合。
    • 并行操作:可以使用函数或者相关操作并行地进行转换和操作。

    3、DataFrame 和 Dataset 的关系

    • DataFrame 是特殊的 Dataset:DataFrame 是 Dataset 的一个特例,即 DataFrame = Dataset[Row]
    • 数据抽象和操作方式的统一:DataFrame 和 Dataset 统一了 Spark SQL 的数据抽象和操作方式,提供了灵活且强大的数据处理能力。

    七、Spark Sql 基本用法

    1、Scala 创建 SparkSession 对象

    import org.apache.spark.sql.SparkSession
    object SparkSqlContext {
    
      def main(args: Array[String]): Unit = {
        // 创建 SparkConf 对象,设置应用程序的配置
        val conf: SparkConf = new SparkConf()
          .setMaster("local[4]")   // 设置本地运行模式,使用 4 个线程
          .setAppName("spark sql") // 设置应用程序名称为 "spark sql"
    
        // 创建 SparkSession 对象,用于 Spark SQL 的编程入口
        val spark: SparkSession = SparkSession.builder()
          .config(conf) // 将 SparkConf 配置应用于 SparkSession
          .getOrCreate() // 获取现有的 SparkSession,或者新建一个
    	
        // 获取 SparkContext 对象,可以直接从 SparkSession 中获取
        val sc: SparkContext = spark.sparkContext
    
        // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法
        import spark.implicits._
    
        // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等...
    
        // 停止 SparkSession,释放资源
        spark.stop()
      }
    }
    

    2、DataFrame 和 Dataset 的创建方式

    1、从集合创建

    case class Person(name: String, age: Int)				// 下同
    
    val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
    val ds: Dataset[Person] = spark.createDataset(data)		// 这里的spark是SparkSession对象(如上代码),下同
    
    val data2 = Seq(("Alice", 25), ("Bob", 30))
    val df: DataFrame = data.toDF("name", "age")
    

    1、从文件系统读取

    val schema = StructType(Seq(
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false)
    ))
    
    val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]
    
    val dfCsv: DataFrame = spark.read
    	// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
    	// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。  
      .schema(schema)		  
       // 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。 
      .option("header", "true")
      .csv("/path/to/csv/file")
    

    3、从关系型数据库读取

    val url = "jdbc:mysql://localhost:3306/database"
    val properties = new java.util.Properties()
    properties.setProperty("user", "username")
    properties.setProperty("password", "password")
    
    val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]
    
    val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
    

    4、从非结构化数据源读取

    val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]
    
    val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
    

    5、手动创建 Dataset

    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false)
    ))
    val data = Seq(Row("Alice", 25), Row("Bob", 30))
    
    val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]
    
    val dfManual: DataFrame = spark.createDataFrame(
      spark.sparkContext.parallelize(data), schema
    )
    

    3、DataFrame API

    语法示例一

    模拟数据(1000条):

    id,name,gender,age,city
    1,邵睿,男,12,上海市
    2,林子异,男,48,广州市
    3,孟秀英,女,46,上海市
    4,金嘉伦,男,8,北京市
    ...
    

    需求:哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。

    // 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号)
    import spark.implicits._
    
    // 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema
    val schema = StructType(Seq(
      StructField("id", LongType),
      StructField("name", StringType),
      StructField("gender", StringType),
      StructField("age", IntegerType),
      StructField("city", StringType),
    ))
    
    // 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此)
    val WindowSpec: WindowSpec = Window
      .partitionBy($"gender")
      .orderBy($"avg_age".desc)
    
    // 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作
    // 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
    spark.read
       // .schema(schema)	// 应用我们定义的schema 
      .option("header", "true") 							// 使用CSV的header作为列名
      .csv("D:\\projects\\sparkSql\\people.csv")			// DataFrame
      .select($"id", $"name", $"age", $"city", $"gender") 	// 选择需要的列(不写默认就是全选)
      .groupBy($"city", $"gender") 							// 按城市和性别分组
      .agg(			// 多重聚合
        count($"id").as("count"),   		// 计算每个组的ID数量
        round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数
      )
      .where($"count".gt(50))  		// 过滤出ID数量大于(可以使用>)50的组
      .orderBy($"avg_age".desc)     // 按平均年龄降序排序
    
      .select($"city", $"gender", $"avg_age",
        dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank"))
      .show() // 显示结果
    

    结果:

    +------+------+-------+-------------------+
    |  city|gender|avg_age|gender_avg_age_rank|
    +------+------+-------+-------------------+
    |北京市|    男|  41.05|                  1|
    |  东莞|    男|  42.81|                  2|
    |上海市|    男|  43.92|                  3|
    |成都市|    男|  45.89|                  4|
    |  中山|    男|  47.08|                  5|
    |广州市|    男|  47.47|                  6|
    |  深圳|    男|  48.36|                  7|
    |上海市|    女|  46.02|                  1|
    |  中山|    女|  49.55|                  2|
    +------+------+-------+-------------------+
    

    语法示例二:视图,sql

    // 读取CSV文件到DataFrame,使用header作为列名
    val dfPeople: DataFrame = spark.read
        .option("header", "true") // 使用CSV的header作为列名
        .csv("D:\\projects\\sparkSql\\people.csv")
    
    // 将DataFrame注册为临时视图
    dfPeople.createOrReplaceTempView("people_view")
    // 可以使用Spark SQL来查询这个视图了
    // 例如,查询所有人的姓名和年龄
    spark.sql("SELECT name, age FROM people_view").show()
    // 二
    spark.sql(
          """
            |select * from people_view
            |where gender = '男'
            |""".stripMargin
        ).show()
    

    语法示例三:join

    case class Student(name: String, classId: Int)
    case class Class(classId: Int, className: String)
    
    val frmStu = spark.createDataFrame(
      Seq(
        Student("张三", 1),
        Student("李四", 1),
        Student("王五", 2),
        Student("赵六", 2),
        Student("李明", 2),
        Student("王刚", 4),
        Student("王朋", 5),
      )
    )
    
    val frmClass = spark.createDataFrame(
      Seq(
        Class(1, "name1"),
        Class(2, "name2"),
        Class(3, "name3"),
        Class(4, "name4")
      )
    )
    

    left 左连接,rignt 右连接, full 全外连接,anti左差集,semi左交集

    // 别名 + inner 内连接
    frmStu.as("S")
    	.join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默认 inner内连接
    	.show()
    
    // 使用左外连接将df和frmClass根据classId合并
    frmStu
      .join(frmClass, Seq("classId"), "left")	
      .show()
    
    // 左差集
    frmStu
      .join(frmClass, Seq("classId"), "anti")	
      .show()
    
    // 左交集
    frmStu
      .join(frmClass, Seq("classId"), "semi")	
      .show()
    

    结果

    别名 + inner 内连接
    +----+-------+-------+---------+
    |name|classId|classId|className|
    +----+-------+-------+---------+
    |张三|      1|      1|    name1|
    |李四|      1|      1|    name1|
    |王五|      2|      2|    name2|
    |赵六|      2|      2|    name2|
    |李明|      2|      2|    name2|
    |王刚|      4|      4|    name4|
    +----+-------+-------+---------+
    
    使用左外连接将df和frmClass根据classId合并
    +-------+----+---------+
    |classId|name|className|
    +-------+----+---------+
    |      1|张三|    name1|
    |      1|李四|    name1|
    |      2|王五|    name2|
    |      2|赵六|    name2|
    |      2|李明|    name2|
    |      4|王刚|    name4|
    |      5|王朋|     null|
    +-------+----+---------+
    
    左差集
    +-------+----+
    |classId|name|
    +-------+----+
    |      5|王朋|
    +-------+----+
    
    左交集
    +-------+----+
    |classId|name|
    +-------+----+
    |      1|张三|
    |      1|李四|
    |      2|王五|
    |      2|赵六|
    |      2|李明|
    |      4|王刚|
    +-------+----+
    
  • 相关阅读:
    Promise 重写 (第一部分)
    【人工智能 & 机器学习 & 深度学习】基础选择题 31~60题 练习(题目+答案),亦含 判断题
    【Lychee图床】本地电脑搭建私人图床,公网远程访问
    小程序极速注册认证免300认证费 突破管理员身份只能绑定5个小程序绿色通道
    GBase 8c 产品运行环境
    pdf如何批量压缩不影响清晰度
    selenium python 入门教程
    Java代码中Math.pow()具有什么功能呢?
    STM CubeMx不能生成代码的解决方法
    JavaScript正则表达式加密
  • 原文地址:https://blog.csdn.net/weixin_74292291/article/details/140347051