• Spark SQL


    一、Spark SQL概述

    Spark SQL属于Spark计算框架的一部分,是专门负责结构化数据的处理计算框架,Spark SQL提供了两种数据抽象:DataFrame、Dataset,都是基于RDD之上的一种高级数据抽象,在RDD基础之上增加了一个schema表结构。

    DataFrame是以前旧版本的数据抽象(untyped类型的数据抽象),Dataset是新版本的数据抽象(typed有类型的数据抽象),新版本当中DataFrame底层就是Dataset[Row]。

    Spark SQL特点

    • 易整合
    • 统一的数据访问方式
    • 兼容Hive
    • 标准的数据库连接

    二、准备Spark SQL的编程环境

    1、创建Spark SQL的编程项目,scala语言支持的

    2、引入编程依赖

    spark-core_2.12
    hadoop-hdfs
    spark-sql_2.12

    spark-hive_2.12

    hadoop的有一个依赖jackson版本和scala2.12版本冲突了,Spark依赖中也有这个依赖,但是默认使用的是pom.xml先引入的那个依赖,把hadoop中jackson依赖排除了即可。

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0modelVersion>
    
      <groupId>com.kanggroupId>
      <artifactId>spark-sql-studyartifactId>
      <version>1.0-SNAPSHOTversion>
      <packaging>jarpackaging>
    
      <name>spark-sql-studyname>
      <url>http://maven.apache.orgurl>
    
      <properties>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
      properties>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.hadoopgroupId>
          <artifactId>hadoop-hdfsartifactId>
          <version>3.1.4version>
          <exclusions>
            <exclusion>
              <groupId>com.fasterxml.jackson.modulegroupId>
              <artifactId>*artifactId>
            exclusion>
            <exclusion>
              <groupId>com.fasterxml.jackson.coregroupId>
              <artifactId>*artifactId>
            exclusion>
          exclusions>
        dependency>
        <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-core_2.12artifactId>
          <version>3.1.1version>
        dependency>
        <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-sql_2.12artifactId>
          <version>3.1.1version>
        dependency>
        <dependency>
          <groupId>mysqlgroupId>
          <artifactId>mysql-connector-javaartifactId>
          <version>8.0.18version>
        dependency>
    
        <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-hive_2.12artifactId>
          <version>3.1.1version>
        dependency>
      dependencies>
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    三、Spark SQL程序编程的入口

    1、SQLContext:只能做SQL编程,无法操作Hive以及使用HQL操作。

    2、HiveContext:专门提供用来操作和Hive相关的编程。

    3、SparkSession:全新的Spark SQL程序执行入口,把SQLContext和HiveContext功能全部整合了,SparkSession底层封装了一个SparkContext,而且SparkSession可以开启Hive的支持。

    package study
    
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    /**
     * Spark SQL的基本案例执行
     */
    object Demo01 {
      def main(args: Array[String]): Unit = {
        /**
         * 1、创建Spark SQL的程序编程入口
         */
        val sparkConf:SparkConf = new SparkConf()
        val sc:SparkSession = SparkSession.builder().appName("test").master("local[*]").config(sparkConf).getOrCreate()
        import sc.implicits._
    
        /**
         * 2、创建DataFrame或者Dataset数据抽象
         */
        val rdd:RDD[(String,Int)] = sc.sparkContext.makeRDD(Array(("zs",20),("ls",30)))
        val df:DataFrame = rdd.toDF("name","age")
        df.printSchema()
        df.show()
    
        sc.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    四、DataFrame的创建

    • 1、使用隐式转换函数从RDD、Scala集合创建DataFrame
      toDF() toDF(columnName*)

      • 机制:如果集合或者RDD的类型不是Bean,而且再toDF没有传入任何的列名,那么Spark会默认按照列的个数给生成随机的列名,但是如果类型是一个Bean类型,那么toDF产生的随机列名就是bean的属性名。

      • package create.methon1
        
        import org.apache.spark.SparkConf
        import org.apache.spark.sql.{DataFrame, SparkSession}
        /**
         * 1、通过隐式转换函数从Scala集合创建DataFrame
         *   如果使用隐式转换函数 那么必须引入spark定义的隐式转换函数代码
         *   sparksession的对象名.implicits._
         */
        object Demo01 {
          def main(args: Array[String]): Unit = {
            val sparkConf:SparkConf = new SparkConf()
            val ss:SparkSession = SparkSession.builder().appName("seq to df").master("local[*]").config(sparkConf).getOrCreate()
            //隐式转换必须导入隐式转换函数类
            import ss.implicits._
        
            /**
             * 从集合创建DataFrame
             * 集合一般都是T类型的 T类型如果是Scala自带类型,toDF后面需要跟列名,不跟列名也可以
             * 集合必须是Seq类型的 而且必须显示的声明为Seq类型
             */
            val array:Seq[(String,Int)] = Array(("zs",20),("ls",30))
            val df:DataFrame = array.toDF("name","age")
            df.printSchema()
            df.show()
        
            val array1:Seq[Student] = Array(Student("zs",21),Student("ls",25))
            val df1:DataFrame = array1.toDF()
            df1.printSchema()
            df1.show()
        
            ss.stop()
        
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
      • package create.methon1
        
        import org.apache.spark.SparkConf
        import org.apache.spark.rdd.RDD
        import org.apache.spark.sql.{DataFrame, SparkSession}
        /**
         * 1、通过隐式转换函数从RDD创建DataFrame
         *   如果使用隐式转换函数 那么必须引入spark定义的隐式转换函数代码
         *   sparksession的对象名.implicits._
         */
        object Demo02 {
          def main(args: Array[String]): Unit = {
            val sparkConf:SparkConf = new SparkConf()
            val ss:SparkSession = SparkSession.builder().appName("seq to df").master("local[*]").config(sparkConf).getOrCreate()
            //隐式转换必须导入隐式转换函数类
            import ss.implicits._
        
            /**
             * 从RDD创建DataFrame
             */
            val array:Seq[(String,Int)] = Array(("zs",20),("ls",30))
            val rdd:RDD[(String,Int)] = ss.sparkContext.makeRDD(array)
            val df:DataFrame = rdd.toDF()
            df.printSchema()
            df.show()
        
            val array1:Seq[Student] = Array(Student("zs",21),Student("ls",25))
            val rdd1:RDD[Student] = ss.sparkContext.makeRDD(array1)
            val df1:DataFrame = rdd1.toDF()
            df1.printSchema()
            df1.show()
        
            ss.stop()
        
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
      • package create.methon1
        
        case class Student(name:String,age:Int)
        
        • 1
        • 2
        • 3
    • 2、通过SparkSession自带的createDataFrame函数从集合或者RDD中创建DataFrame—使用并不多

      • package create.methon2
        
        import org.apache.spark.SparkConf
        import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
        import org.apache.spark.sql.{DataFrame, Row, SparkSession}
        /**
         * createDataFrame函数从集合中创建DataFrame
         */
        object Demo01 {
          def main(args: Array[String]): Unit = {
            val sparkConf: SparkConf = new SparkConf()
            val ss: SparkSession = SparkSession.builder().appName("seq to df").master("local[*]").config(sparkConf).getOrCreate()
            /**
             * 1、通过Scala的seq集合创建DataFrame 列名是自动生成的
             */
            val array:Seq[(String,Int)] = Array(("zs",20),("ls",30))
            val df:DataFrame = ss.createDataFrame(array)
            df.printSchema()
            df.show()
        
            val array1:Seq[Student] = Array(Student("zs",20),Student("ls",30))
            val df1:DataFrame = ss.createDataFrame(array1)
            df1.printSchema()
            df1.show()
            /**
             * 2、从java集合中创建DataFrame,如果是Java集合,必须传入一个BeanClass
             * 同时如果Java集合中存放的数据类型是Row类型,那么必须传入StructType指定row的结构
             *
             * java集合中如果使用BeanClass构建DaraFrame,要求Java集合中存放的数据类型也必须是Bean的类型
             * BeanClass必须有getter和setter方法
             */
            val list: java.util.List[Student] = java.util.Arrays.asList(Student("ls",20),Student("zs",30))
            val df2 = ss.createDataFrame(list,classOf[Student])
            df2.printSchema()
            df2.show()
            /**
             * 3、java集合的类型为row类型
             */
            val list1: java.util.List[Row] = java.util.Arrays.asList(Row("ls",20),Row("zs",30))
            val df3 = ss.createDataFrame(list1,StructType(java.util.Arrays.asList(StructField("name",DataTypes.StringType),StructField("age",DataTypes.IntegerType))))
            df3.printSchema()
            df3.show()
            ss.stop()
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
      • package create.method2
        
        import create.methon2.Student
        import org.apache.spark.SparkConf
        import org.apache.spark.rdd.RDD
        import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
        import org.apache.spark.sql.{DataFrame, Row, SparkSession}
        
        import java.util
        
        /**
         * createDataFrame函数从RDD中创建DataFrame(操作手法完全一致的)
         */
        object Demo02 {
          def main(args: Array[String]): Unit = {
            val sparkConf: SparkConf = new SparkConf()
            val ss: SparkSession = SparkSession.builder().config(sparkConf).appName("seq to df").master("local[*]").getOrCreate()
        
            /**
             * 1、通过Scala的seq集合创建DataFrame 列名是自动生成的
             */
            val array:Seq[(String,Int)] = Array(("zs",20),("ls",30))
            val rdd:RDD[(String,Int)] = ss.sparkContext.makeRDD(array)
            val df:DataFrame = ss.createDataFrame(rdd)
            df.printSchema()
            df.show()
        
            val array1: Seq[Student] = Array(Student("zs",20))
            val rdd1:RDD[Student] = ss.sparkContext.makeRDD(array1)
        
            val df1: DataFrame = ss.createDataFrame(rdd1,classOf[Student])
            df1.printSchema()
            df1.show()
        
            /**
             * 3、java集合的类型为row类型
             */
            val array2:Array[Row] = Array(Row("zs",20),Row("ww",30))
            val rdd2:RDD[Row] = ss.sparkContext.makeRDD(array2)
            val df3 = ss.createDataFrame(rdd2, StructType(Array(StructField("name", DataTypes.StringType), StructField("age", DataTypes.IntegerType))))
        
            df3.printSchema()
            df3.show()
        
        
            ss.stop()
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
      • package create.methon2
        
        import scala.beans.BeanProperty
        
        case class Student(@BeanProperty var name:String, @BeanProperty var age:Int)
        
        • 1
        • 2
        • 3
        • 4
        • 5
    • 3、从Spark SQL支持的数据源创建DataFrame(HDFS、Hive、JSON文件、CSV文件等等):使用频率最高的

      • 外部存储HDFS中读取数据成为DataFrame

        • ss.read.format(“jsonxx”).load(“path”) 不太好用
        • ss.read.option(key,value).option(…).csv/json(path)
      • 从jdbc支持的数据库创建DataFrame

        • ss.read.jdbc(url,table,properties)
        package create.methon3
        
        import org.apache.spark.SparkConf
        import org.apache.spark.sql.{DataFrame, SparkSession}
        
        import java.util.Properties
        
        /**
         * 从外部存储读取数据成为DataFrame
         */
        object Demo01 {
          def main(args: Array[String]): Unit = {
            val sparkConf:SparkConf = new SparkConf()
            val ss:SparkSession = SparkSession.builder().appName("storage to df").master("local[*]").getOrCreate()
        
            /**
             * 从csv文件读取数据成为DataFrame
             */
            val df:DataFrame = ss.read.option("header","true").format("csv").load("file:///D://Desktop/Student.csv")
            df.printSchema()
            df.show()
        
            /**
             * 读取模式有三种:
             * permissive:默认的
             * dropMalformed
             * failfast
             */
            val df1:DataFrame = ss.read.option("header","true").format("csv").option("mode","permissive").csv("file:///D://Desktop/Student.csv")
            df1.printSchema()
            df1.show()
        
            /**
             * 从json文件创建DataFrame
             * json文件中要求一个json对象独占一行
             */
            val df2:DataFrame = ss.read.option("mode","dropMalformed").json("file:///D://Desktop/Student.json")
            df2.printSchema()
            df2.show()
        
            /**
             * 从普通的文本文档创建DataFrame---不太实用
             */
            val df3 = ss.read.text("file:///D://Desktop/Student.csv")
            df3.printSchema()
            df3.show()
        
            /**
             * 从JDBC可以连接的数据库(rdbms、Hive)创建DataFrame
             */
            val prop:Properties = new Properties()
            prop.setProperty("user","root")
            prop.setProperty("password","root")
            val df4 = ss.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8","student",prop)
            df4.printSchema()
            df4.show()
        
            ss.stop()
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
        • 49
        • 50
        • 51
        • 52
        • 53
        • 54
        • 55
        • 56
        • 57
        • 58
        • 59
        • 60
      • 读取Hive数据成为DataFrame

        • 1、通过SparkSession开启Hive的支持

        • 2、引入spark-hive的编程依赖

        • 3、通过ss.sql()

        • package create.methon3
          
          import org.apache.spark.SparkConf
          import org.apache.spark.sql.{DataFrame, SparkSession}
          
          /**
           * 连接Hive创建DataFrame:
           *   1、jdbc方式(基本的操作只能查询表中的所有字段 所有数据)
           *   2、Spark SQL On Hive:用Hive作为数据存储,用Spark直连Hive 操作Hive中的数据
           *     不是使用JDBC的方式,而是使用的Hive的元数据库来完成的
           *     两步操作:(1)需要把Hive的配置文件放到项目的resources目录下,如果在集群环境下,我们需要把hive的配置文件放到spark的conf目录下,(2)需要开启SparkSession的hive支持
           */
          object Demo02 {
            def main(args: Array[String]): Unit = {
              val sparkConf:SparkConf = new SparkConf()
              val sparkSession:SparkSession = SparkSession.builder().appName("spark sql on hive").master("local[*]").config(sparkConf).enableHiveSupport().getOrCreate()
          
              /**
               * 从Hive中读取数据创建DataFrame
               */
              val df:DataFrame = sparkSession.sql("select * from project.ods_user_behavior_origin")
              df.printSchema()
              df.show()
              //新建数据表
              sparkSession.sql("create table test (name string,age int,sex string) row format delimited fields terminated by '*'")
          
              sparkSession.stop()
            }
          }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
    • 4、从其他的DataFrame转换的来

    五、DataFrame的编程风格

    • 通过代码来操作计算DataFrame中数据

    • DSL编程风格

      • DataFrame和Dataset提供了一系列的API操作,API说白了就是Spark SQL中算子操作,可以通过算子操作来获取DataFrame或者Dataset中的数据。

      • 转换算子

        • RDD具备的算子DataFrame基本上都可以使用。
        • DataFrame还增加了一些和SQL操作有关的算子:
          selectExpr、where/filter、groupBy、orderBy/sort、limit、join
        操作算子算子概念
        limit获得指定前n行数据并形成新的 dataframe
        where、filter条件过滤
        select根据传入的 string 类型字段名,获取指定字段的值,以 DataFrame 类型返回
        join按指定的列进行合并两个dataframe
        groupBy按指定字段进行分组,后面可加聚合函数对分组后的数据进行操作
        orderBy、sort按指定字段排序
        selectExpr对指定字段进行特殊处理,可以对指定字段调用 UDF 函数或者指定别名;selectExpr 传入 string 类型的参数,返回 DataFrame 对象。
      • 行动算子

        • RDD具备的行动算子DataFrame和Dataset也都具备一些

        • collect/collectAsList:不建议使用,尤其是数据量特别庞大的情况下

        • foreach/foreachPartition

        • 获取结果集的一部分数据

          • first/take(n)/head(n)/takeAsList(n)/tail(n)
          • 获取的返回值类型就是Dataset存储的数据类型
        • printSchema:获取DataFrame或者Dataset的表结构的

        • show()/show(num,truncate:boolean)/show(num,truncate:Int)/show(num,truncate:Int,ver:boolean)

        • 保存输出的算子

          • 文件系统

            • df/ds.write.mode(SaveMode).csv/json/parquet/orc/text(path–目录)
            • text纯文本文档要求DataFrame和Dataset的结果集只有一列 而且列必须是String类型
          • JDBC支持的数据库

            • df/ds.write.mode().jdbc

            • foreach|foreachPartition

            • package oprator
              
              import org.apache.spark.SparkConf
              import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
              
              import java.util.Properties
              
              object Demo03 {
                def main(args: Array[String]): Unit = {
                  val sparkConf: SparkConf = new SparkConf()
                  val ss: SparkSession = SparkSession.builder().appName("action").master("local[*]").config(sparkConf).enableHiveSupport().getOrCreate()
                  import ss.implicits._
              
                  /**
                   * 创建DataFrame
                   */
                  val array:Seq[(String,Int,String)] = Array(("zs",20,"man"),("ls",30,"woman"),("ww",40,"man"),("ml",50,"woman"))
                  val dataset:Dataset[(String,Int,String)] = array.toDS()
              //    dataset.show()
                  /**
                   * 保存到MySQL当中  JDBC连接保存
                   */
                  val prop = new Properties()
                  prop.setProperty("user","root")
                  prop.setProperty("password","root")
                  dataset.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=Asia/Shanghai","Student",prop)
              
                  ss.stop()
                }
              }
              
              • 1
              • 2
              • 3
              • 4
              • 5
              • 6
              • 7
              • 8
              • 9
              • 10
              • 11
              • 12
              • 13
              • 14
              • 15
              • 16
              • 17
              • 18
              • 19
              • 20
              • 21
              • 22
              • 23
              • 24
              • 25
              • 26
              • 27
              • 28
              • 29
              • 30
            • 执行前

            image-20230930105818773

            • 执行后

            image-20230930105855493

          • Hive

            • df/ds.write.mode().saveAsTable(“库名.表名”)

            • 1、保证hive支持开启的

            • 2、保存的数据底层在HDFS上以parquet文件格式保存的

            • dataset.write.mode(SaveMode.Append).saveAsTable("default.demo")
              
              • 1
            • image-20230930110937382

    • SQL编程风格

      • 1、将创建的DataFrame加载为一个临时表格
      • 2、然后通过ss.sql(sql语句)进行数据的查询
      package oprator
      
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{DataFrame, SparkSession}
      
      object Demo01 {
        def main(args: Array[String]): Unit = {
          val sparkConf:SparkConf = new SparkConf()
          val ss:SparkSession = SparkSession.builder().appName("spark sql on hive").enableHiveSupport().master("local[*]").getOrCreate()
      
          /**
           * 从Hive中读取数据创建DataFrame
           */
          val df:DataFrame = ss.sql("select * from project.ods_user_behavior_origin")
      
          df.createTempView("test_spark_sql")
          val df1 = ss.sql("select ip_addr,parse_url(request_url,'HOST') as host,age from test_spark_sql")
          df1.show()
      
          df.selectExpr("ip_addr","parse_url(request_url,'HOST') as host").show()
          df.select("age","ip_addr").where("age>40").show()
          ss.stop()
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24

    六、DataSet的创建和使用

    Dataset有类型,DataFrame无类型的。

    创建

    • 1、隐式转换,toDS()

      • package createdataset
        
        import org.apache.spark.SparkConf
        import org.apache.spark.sql.{Dataset, SparkSession}
        
        import scala.beans.BeanProperty
        case  class Student(@BeanProperty var name:String,@BeanProperty var age:Int)
        object Demo01 {
          def main(args: Array[String]): Unit = {
            val sparkConf:SparkConf = new SparkConf()
            val sparkSession:SparkSession = SparkSession.builder().appName("createds").config(sparkConf).master("local[*]").getOrCreate()
            import sparkSession.implicits._
            /**
             * 通过隐式转换从集合或者rdd创建Dataset
             */
            val array:Seq[(String,Int)] = Array(("zs",20),("ls",30))
            val ds:Dataset[(String,Int)] = array.toDS()
            ds.printSchema()
            ds.show()
        
            val array1:Seq[Student] = Array(Student("zs",30),Student("ls",20))
            val ds1:Dataset[Student] = array1.toDS()
            ds1.printSchema()
            ds1.show()
        
            sparkSession.stop()
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
      • image-20230930102553512

    • 2、通过SparkSession的createDataset函数创建

      • /**
         *  通过SparkSession的createDataset函数创建
         */
        val rdd:RDD[Student] = sparkSession.sparkContext.makeRDD(array1)
        val ds2:Dataset[Student] = sparkSession.createDataset(rdd)
        ds2.show()
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
      • image-20230930103045838

    • 3、通过DataFrame转换得到Dataset
      df.as[类型-Bean对象必须有getter、setter方法]
      也是需要隐式转换的

      • /**
         * 通过DataFrame转换得到Dataset
         */
        val df:DataFrame = sparkSession.createDataFrame(rdd, classOf[Student])
        val ds3:Dataset[Student] = df.as[Student]
        ds3.show()
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
      • image-20230930104153414

    七、Spark SQL的函数操作

    Spark SQL基本上常见的MySQL、Hive中函数都是支持的。

    package function
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object Demo01 {
      def main(args: Array[String]): Unit = {
        val sparkConf:SparkConf = new SparkConf()
        val ss:SparkSession = SparkSession.builder().appName("function").master("local[*]").enableHiveSupport().config(sparkConf).getOrCreate()
        import ss.implicits._
    
        val array:Seq[(Int,String,Int)] = Array((1,"zs",80),(1,"ls",90),(1,"ww",65),(1,"ml",70),(2,"zsf",70),(2,"zwj",67),(2,"qf",76),(2,"dy",80))
        val df:DataFrame = array.toDF("classId","studentName","score")
        df.createOrReplaceTempView("student_score_temp")
    
        ss.sql("select *,row_number() over(partition by classId order by score desc) as class_rank from student_score_temp").show()
    
        ss.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    image-20230930112644035

    ss.sql("select * from (select *,row_number() over(partition by classId order by score desc) as class_rank from student_score_temp) as temp where temp.class_rank < 2").show()
    
    • 1

    image-20230930112904468

    val array: Seq[(String, String)] = Array(("zs", "play,eat,drink"), ("ls", "play,game,run"))
    val df: DataFrame = array.toDF("name", "hobby")
    df.createOrReplaceTempView("temp")
    /**
     * zs play,eat,drink
     * ls play,game,run
     * zs play
     * zs eat
     */
    ss.sql("select temp.name,a.bobby from temp lateral view explode(split(hobby,',')) a as bobby").show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    image-20230930114025740

    自定义函数

    • ss.udf.register(name,函数)
    package function
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession}
    
    object Demo02 {
      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf()
        val ss: SparkSession = SparkSession.builder().appName("createMyFunction").master("local[*]").config(sparkConf).enableHiveSupport().getOrCreate()
        import ss.implicits._
    
        ss.udf.register("my_length",(name:String)=>{name.length})
    
        val array: Seq[(String, String)] = Array(("zs", "play,eat,drink"), ("ls", "play,game,run"))
        val df: DataFrame = array.toDF("name", "hobby")
        df.selectExpr("my_length(hobby)").show()
    
        ss.udf.register("my_avg",new My())
        val array1: Seq[(String, Int)] = Array(("zs", 20), ("ls", 30))
        val df1: DataFrame = array1.toDF("name", "score")
        df1.selectExpr("my_avg(score)").show()
    
        ss.stop()
      }
    }
    class My_AVG extends Aggregator[Int,(Int,Int),java.lang.Double]{
      /**
       * 设置初始值的 是缓冲区的初始值
       * @return
       */
      override def zero: (Int, Int) = (0,0)
    
      /**
       * 当输入一个结果之后,缓冲区如何对输入的结果进行计算
       *
       * @param b 缓冲区
       * @param a 输入的某一个值
       * @return
       */
      override def reduce(b: (Int, Int), a: Int): (Int, Int) = {
        (b._1+a,b._2+1)
      }
    
      /**
       * 分区之间的合并
       *
       * @param b1
       * @param b2
       * @return
       */
      override def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = {
        (b1._1+b2._1,b1._2+b2._2)
      }
    
      /**
       * 最后的结果
       *
       * @param reduction
       * @return
       */
      override def finish(reduction: (Int, Int)): java.lang.Double = {
        reduction._1.toDouble / reduction._2
      }
    
      override def bufferEncoder: Encoder[(Int, Int)] = Encoders.product[(Int, Int)]
    
      override def outputEncoder: Encoder[java.lang.Double] = Encoders.DOUBLE
    }
    --------------------------------------------------------------------------------
    --------------------------------------------------------------------------------
    package function
    
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
    
    class My extends UserDefinedAggregateFunction{
      override def inputSchema: StructType = StructType(Array(StructField("score",DataTypes.IntegerType)))
    
      override def bufferSchema: StructType = StructType(Array(StructField("sum",DataTypes.IntegerType),StructField("count",DataTypes.IntegerType)))
    
      override def dataType: DataType = DataTypes.DoubleType
    
      override def deterministic: Boolean = true
    
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0
        buffer(1) = 0
      }
    
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getInt(0)+input.getInt(0)
        buffer(1) = buffer.getInt(1)+1
      }
    
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
        buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
      }
    
      override def evaluate(buffer: Row): Any = {
        buffer.getInt(0).toDouble/buffer.getInt(1)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
  • 相关阅读:
    初识多线程
    mysql a表like b表的某个字段,mysql一个表的字段like另外一个表的字段
    oracle数据库操作—DML
    Java--简介
    L3-005 垃圾箱分布
    地球系统模式(CESM)实践技术
    go语言安装与环境配置
    网络概念_二
    eyb:RubbitMQ学习2
    利用docker搭建不同版本fastjson漏洞环境【保姆级教学】
  • 原文地址:https://blog.csdn.net/weixin_57367513/article/details/133436024