• 7.spark sql编程


    概述

    spark 版本为 3.2.4,注意 RDDDataFrame 的代码出现的问题及解决方案

    本文目标如下:

    • RDD ,Datasets,DataFrames 之间的区别
    • 入门
      • SparkSession
      • 创建 DataFrames
      • DataFrame 操作
      • 编程方式运行 sql 查询
      • 创建 Datasets
      • DataFramesRDDs 互相转换
        • 使用反射推断模式
        • 编程指定 Schema

    参考 Spark 官网

    相关文章链接如下

    文章链接
    spark standalone环境安装地址
    Spark的工作与架构原理地址
    使用spark开发第一个程序WordCount程序及多方式运行代码地址
    RDD编程指南地址
    RDD持久化地址

    RDD ,Datasets,DataFrames 之间的区别

    Datasets , DataFrames和 RDD

    Dataset 是一个分布式的数据集合,DatasetSpark 1.6 中添加的一个新接口,它增益了 RDD (强类型,可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建,然后使用函数转换(map、flatMap、filter等)进行操作。数据集API有Scala和Java版本。Python不支持数据集API。

    DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的DataFrame APIScalaJavaPythonR中可用。在Scala API中,DataFrame只是Dataset[Row]的一个类型别名。而在Java API中,用户需要使用Dataset<Row>来表示DataFrame

    DataFrame=RDD+SchemaRDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

    入门

    Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互,包括SQLDataset API。计算结果时,使用相同的执行引擎,与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作

    people.json

    people.json文件准备
    在这里插入图片描述

    SparkSession

    Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession,只需使用 SparkSession.builder()

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    创建 DataFrames

    使用 SparkSession,通过存在的RDDhive 表,或其它的Spark data sources 程序创建 DataFrames

    val df = spark.read.json("/tmp/people.json")
    df.show()
    
    • 1
    • 2

    执行如下图
    在这里插入图片描述

    DataFrame 操作

    使用数据集进行结构化数据处理的基本示例如下

    // 需要引入 spark.implicits._ 才可使用 $
    // This import is needed to use the $-notation
    import spark.implicits._
    // 打印schema 以树格式
    // Print the schema in a tree format
    df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // 仅显示 name 列
    // Select only the "name" column
    df.select("name").show()
    // +-------+
    // |   name|
    // +-------+
    // |Michael|
    // |   Andy|
    // | Justin|
    // +-------+
    // 显示所有,age 加1
    // Select everybody, but increment the age by 1
    df.select($"name", $"age" + 1).show()
    // +-------+---------+
    // |   name|(age + 1)|
    // +-------+---------+
    // |Michael|     null|
    // |   Andy|       31|
    // | Justin|       20|
    // +-------+---------+
    
    // 过滤 人的 age 大于 21
    // Select people older than 21
    df.filter($"age" > 21).show()
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+
    
    // 按 age 分组统计
    // Count people by age
    df.groupBy("age").count().show()
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    spark-shell 执行如下图
    在这里插入图片描述
    在这里插入图片描述

    编程方式运行 sql 查询

    df.createOrReplaceTempView("people")
    
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    
    • 1
    • 2
    • 3
    • 4

    执行如下:

    scala> df.createOrReplaceTempView("people")
    
    scala> val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> sqlDF.show()
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    创建 Datasets

    Datasets类似于RDD,不是使用Java序列化或Kryo,而是使用专门的编码器来序列化对象,以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作,如过滤、排序和哈希,而无需将字节反序列化为对象。

    case class Person(name: String, age: Long)
    
    // 为 case classes 创建编码器
    // Encoders are created for case classes
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    
    // 为能用类型创建编码器,并提供 spark.implicits._ 引入 
    // Encoders for most common types are automatically provided by importing spark.implicits._
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    
    // 通过定义类,将按照名称映射,DataFrames 能被转成 Dataset 
    // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
    val path = "/tmp/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    执行如下:

    scala> case class Person(name: String, age: Long)
    defined class Person
    
    scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
    
    scala> caseClassDS.show()
    +----+---+
    |name|age|
    +----+---+
    |Andy| 32|
    +----+---+
    
    
    scala> val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
    
    scala> primitiveDS.map(_ + 1).collect()
    res1: Array[Int] = Array(2, 3, 4)
    
    scala> val path = "/tmp/people.json"
    path: String = /tmp/people.json
    
    scala> val peopleDS = spark.read.json(path).as[Person]
    peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
    
    scala> peopleDS.show()
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    DataFrames 与 RDDs 互相转换

    Spark SQL支持两种不同的方法将现有RDD转换为Datasets

    • 第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,当知道 schema 结构的时间,会有更好的效果。
    • 第二种方法是通过编程接口,构造 schema,然后将其应用于现有的RDD。虽然此方法更详细,直至运行时,才能知道他们的字段和类型,用于构造 Datasets

    使用反射推断模式

    代码如下:

    object RddToDataFrameByReflect {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("RddToDataFrameByReflect")
          .master("local")
          .getOrCreate()
    
        // 用于从RDD到DataFrames的隐式转换
        // For implicit conversions from RDDs to DataFrames
        import spark.implicits._
    
        // Create an RDD of Person objects from a text file, convert it to a Dataframe
        val peopleDF = spark.sparkContext
          .textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")
          .map(_.split(","))
          .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
          .toDF()
        // Register the DataFrame as a temporary view
        peopleDF.createOrReplaceTempView("people")
    
        // SQL statements can be run by using the sql methods provided by Spark
        val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    
        // The columns of a row in the result can be accessed by field index
        teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    
        // or by field name
        teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
      }
    
      case class Person(name: String, age: Long)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    执行如下图:
    在这里插入图片描述

    编码问题

    关于 Spark 官网 上复杂类型编码问题,直接加下面一句代码

    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))
    
    • 1

    报以下图片错误
    在这里插入图片描述
    将原有代码改变如下:

     // 没有为 Dataset[Map[K,V]] 预先定义编码器,需要自己定义
     // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
     implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
     // 也可以如下操作
     // Primitive types and case classes can be also defined as
     // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
    
     // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
     teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))
     // Array(Map("name" -> "Justin", "age" -> 19))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述
    通过这一波操作,就可以理解什么情况下,需要编码器,以及编码器的作用

    编程指定 Schema

    代码如下:

    object RddToDataFrameByProgram {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .master("local")
          .getOrCreate()
    
        import org.apache.spark.sql.Row
    
        import org.apache.spark.sql.types._
    
        // 加上此解决报错问题
        import spark.implicits._
    
        // Create an RDD
        val peopleRDD = spark.sparkContext.textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")
    
        // The schema is encoded in a string
        val schemaString = "name age"
    
        // Generate the schema based on the string of schema
        val fields = schemaString.split(" ")
          .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val schema = StructType(fields)
    
        // Convert records of the RDD (people) to Rows
        val rowRDD = peopleRDD
          .map(_.split(","))
          .map(attributes => Row(attributes(0), attributes(1).trim))
    
        // Apply the schema to the RDD
        val peopleDF = spark.createDataFrame(rowRDD, schema)
    
        // Creates a temporary view using the DataFrame
        peopleDF.createOrReplaceTempView("people")
    
        // SQL can be run over a temporary view created using DataFrames
        val results = spark.sql("SELECT name FROM people")
    
        // The results of SQL queries are DataFrames and support all the normal RDD operations
        // The columns of a row in the result can be accessed by field index or by field name
        results.map(attributes => "Name: " + attributes(0)).show()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    执行如下图
    在这里插入图片描述

    官方文档的代码不全问题

    Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
    results.map(attributes => "Name: " + attributes(0)).show()

    在这里插入图片描述
    加下以下代码

    // 加上此解决报错问题
    import spark.implicits._
    
    • 1
    • 2

    如下图解决
    在这里插入图片描述

    结束

    spark sql 至此结束,如有问题,欢迎评论区留言。

  • 相关阅读:
    关于VIT模型训练自己的数据集报错为无效的数据
    稳了,我用 Python 可视化分析和预测了 2022 年 FIFA 世界杯
    uniapp+vue3使用pinia,安卓端报错白屏
    JavaEE开发之Spring框架整合1
    docker — 容器存储
    每日一题 1333. 餐厅过滤器(中等)
    springboot整合验证码、滑块验证框架
    Java 实例-如何编译 Java 文件
    【高阶数据结构】B树 {B树的概念;B树的实现:节点设计,查找,插入,遍历,删除;B树的性能分析;B+树和B*树;B树的应用}
    ES6 - 简单理解原型
  • 原文地址:https://blog.csdn.net/2301_79691134/article/details/134205579