• SparkSQL【核心编程、使用IDEA开发、用户自定义函数】


    一 DataSet

    DataSet是具有强类型的数据集合,需要提供对应的类型信息。

    1 创建DataSet

    (1)使用样例类序列创建DataSet

    scala> case class Person(name: String, age: Long)
    defined class Person
    
    scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
    
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
    
    scala> caseClassDS.show
    +---------+---+
    |     name|age|
    +---------+---+
    | zhangsan|  2|
    +---------+---+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    (2)使用基本类型的序列创建DataSet

    scala> val ds = Seq(1,2,3,4,5).toDS
    ds: org.apache.spark.sql.Dataset[Int] = [value: int]
    
    scala> ds.show
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    |    4|
    |    5|
    +-----+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

    2 RDD转换为DataSet

    SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构。

    # 创建RDD
    scala> val userRDD = sc.textFile("data/user.txt")
    userRDD: org.apache.spark.rdd.RDD[String] = data/user.txt MapPartitionsRDD[1] at textFile at <console>:24
    
    # 创建样例类
    scala> case class User(name:String, age:Int)
    defined class User
    
    # 转换
    scala> userRDD.map(x=>{var fields = x.split(" ");User(fields(0),fields(1).toInt)}).toDS.show
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 20|
    |    lisi| 30|
    |  wangwu| 40|
    |xiaowang| 50|
    +--------+---+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3 DataSet转换为RDD

    DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD

    scala> userRDD.map(x=>{var fields = x.split(" ");User(fields(0),fields(1).toInt)}).toDS
    res2: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    
    # DataSet转换成RDD后,类型为User,强类型
    scala> res2.rdd
    res3: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[9] at rdd at <console>:26
    
    # DataFrame转换成RDD后,类型为ROW,弱类型
    scala> val df = spark.read.json("data/user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
    
    scala> df.rdd
    res4: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[18] at rdd at <console>:26
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    二 DataFrame和DataSet转换

    1 DataFrame转换为DataSet

    scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> case class User(name:String, age:Int)
    defined class User
    
    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2 DataSet转换为DataFrame

    scala> ds.toDF
    res9: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    • 1
    • 2

    这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import
    spark.implicits._ 不然toDF、toDS无法使用。

    三 RDD、DataFrame、DataSet三者的关系

    在SparkSQL中Spark提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:

    • Spark1.0 => RDD
    • Spark1.3 => DataFrame
    • Spark1.6 => Dataset

    如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口

    1 三者的共性

    • RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
    • 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
    • 三者有许多共同的函数,如filter,排序等;
    • 在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
    • 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
    • 三者都有partition的概念
    • DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型

    2 三者的区别

    (1)RDD

    • RDD一般和spark mllib(机器学习)同时使用
    • RDD不支持sparksql操作

    (2)DataFrame

    • 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
    • DataFrame与DataSet一般不与 spark mllib 同时使用
    • DataFrame与DataSet均支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行 sql 语句操作
    • DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

    (3)DataSet

    • Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。 DataFrame其实就是DataSet的一个特例 type DataFrame = Dataset[Row]
    • DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

    3 三者的互相转换

    在这里插入图片描述

    四 IDEA开发SparkSQL

    实际开发中,都是使用IDEA进行开发的

    1 添加依赖

    
        org.apache.spark
        spark-sql_2.12
        3.0.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2 代码实现

    object SparkSQL01_Demo {
      def main(args: Array[String]): Unit = {
        //创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
    
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        
        //RDD=>DataFrame=>DataSet转换需要引入隐式转换规则,否则无法转换
        
        //spark不是包名,也不是类名,是上下文SparkSession环境对象名
        import spark.implicits._
    
        //读取json文件 创建DataFrame
        val df: DataFrame = spark.read.json("input/test.json")
        //df.show()
    
        //SQL风格语法
        df.createOrReplaceTempView("user")
        //spark.sql("select avg(age) from user").show
    
        //DSL风格语法
        //df.select("username","age").show()
    
        //*****RDD=>DataFrame=>DataSet*****
        //创建RDD
        val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",20)))
    
        //RDD转换成DataFrame
        val df1: DataFrame = rdd1.toDF("id","name","age")
        //df1.show()
    
        //DataFrame转换成DateSet
        val ds1: Dataset[User] = df1.as[User]
        //ds1.show()
    
        //*****DataSet=>DataFrame=>RDD*****
        //DateSet转换成DataFrame
        val df2: DataFrame = ds1.toDF()
    
        //DataFrame转换成RDD  
        //返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始
        val rdd2: RDD[Row] = df2.rdd
        //rdd2.foreach(a=>println(a.getString(1)))
    
        //*****RDD=>DataSet*****
        rdd1.map{
          case (id,name,age)=>User(id,name,age)
        }.toDS()
    
        //*****DataSet=>=>RDD*****
        ds1.rdd
    
        //释放资源
        spark.stop()
      }
    }
    case class User(id:Int,name:String,age:Int)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    六 用户自定义函数

    用户可以通过spark.udf功能添加自定义函数,实现自定义功能

    1 UDF

    输入一行,返回一个结果。在Shell窗口中可以通过spark.udf功能用户可以自定义函数。

    需求:自动定义UDF函数,在每一个查询的名字前面添加问候语

    def main(args: Array[String]): Unit = {
      //创建SparkConf配置文件对象
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_UDF")
      //创建SparkSession对象
      val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
      //创建DF
      val df: DataFrame = spark.read.json("data/user.json")
      //注册自定义函数
      spark.udf.register("addSayHello",(username:String)=>{"hello:" + username})
      //创建临时视图
      df.createOrReplaceTempView("user")
      //通过sql语句从临时视图查询数据
      spark.sql("select addSayHello(username),age from user").show()
      //释放资源
      spark.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2 UDAF

    输入多行,返回一行。强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。

    需求:求平均年龄

    (1)RDD算子方式实现

    def main(args: Array[String]): Unit = {
      //创建SparkConf配置文件对象
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL02_UDAF")
      //创建sc对象
      val sc = new SparkContext(conf)
      //创建rdd
      val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan",30),("lisi",40),("wangwu",50)))
      //转换数据格式,目的是统计总人数
      val mapRdd: RDD[(Int, Int)] = rdd.map {
        case (name, age) => {
          (age, 1)
        }
      }
      //对年龄和总人数进行聚合操作 (ageSum,numSum)
      val res: (Int, Int) = mapRdd.reduce(
        (t1, t2) => {
          (t1._1 + t2._1, t1._2 + t2._2)
        }
      )
      
      println(res._1 / res._2)
    
      //释放资源
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    (2)自定义累加器方式实现

    减少Shuffle,提高效率,模仿LongAccumulator累加器

    object SparkSQL03_Accumulator {
      def main(args: Array[String]): Unit = {
        //创建SparkConf配置文件对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL03_Accumulator")
        //创建sc对象
        val sc = new SparkContext(conf)
        //创建rdd
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan",30),("lisi",40),("wangwu",50)))
        //创建累加器对象
        val ma = new MyAccmulator
        //注册累加器
        sc.register(ma)
        //使用累加器
        rdd.foreach{
          case (name,age) => {
            ma.add(age)
          }
        }
        //打印结果
        println(ma.value)
        //释放资源
        sc.stop()
      }
    }
    // 定义累加器
    class MyAccmulator extends AccumulatorV2[Int,Double]{
    
      var ageSum : Int = 0
      var countSum : Double = 0
    
      override def isZero: Boolean = {
        ageSum == 0 && countSum == 0
      }
    
      override def copy(): AccumulatorV2[Int, Double] = {
        val ma = new MyAccmulator
        this.ageSum = ageSum
        this.countSum = countSum
        ma
      }
    
      override def reset(): Unit = {
        ageSum = 0
        countSum = 0
      }
    
      override def add(age: Int): Unit = {
        ageSum += age
        countSum += 1
      }
    
      override def merge(other: AccumulatorV2[Int, Double]): Unit = {
        other match{
          case ma: MyAccmulator => {
            this.ageSum += ma.ageSum
            this.countSum += ma.countSum
          }
          case _ =>
        }
      }
    
      override def value: Double = {
        ageSum / countSum
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    (3)自定义聚合函数实现-弱类型

    应用于DataFrame,执行一些sql风格的DF查询,更加方便。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数

    object SparkSQL04_UDAF {
      def main(args: Array[String]): Unit = {
        //创建SparkConf配置文件对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_UDF")
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        //读取json文件以创建DF
        val df: DataFrame = spark.read.json("data/user.json")
    
        //创建自定义函数对象
        val myAvg = new MyAvg
    
        //注册自定义函数
        spark.udf.register("myAvg",myAvg)
    
        //创建临时视图以查询
        df.createOrReplaceTempView("user")
        //使用聚合函数进行查询
        spark.sql("select myAvg(age) from user").show()
    
        //释放资源
        spark.stop()
      }
    }
    //自定义UDAF函数--弱类型
    class MyAvg extends UserDefinedAggregateFunction {
    
      //聚合函数输入数据类型
      //StructType存放的是多列的类型,每一列的数据类型存放在StructField中
      //多个StructField组成了StructType
      //IntegerType是SparkSQL为DF创建的数据类型,底层实现就是scala中的int
      override def inputSchema: StructType = {
        StructType(Array(StructField("age",IntegerType)))
      }
    
      //缓存的数据类型
      //将输入的数据(age,1)存放到缓存中,之后存放(age,2),一次存放一条数据
      //这种数据结构称为StructType
      override def bufferSchema: StructType = {
        StructType(Array(StructField("ageSum",IntegerType),StructField("count",IntegerType)))
      }
    
      //聚合函数返回的数据类型
      override def dataType: DataType = DoubleType
    
      //函数的稳定性,相同的输入是否会得到相同的输出
      //默认不处理,直接返回True
      override def deterministic: Boolean = true
    
      //初始化,将缓存设置为初始状态
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        //使年龄归0
        buffer(0) = 0
        //使总人数归0
        buffer(1) = 0
      }
    
      //更新缓存中的数据
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        //缓存中数据不为空执行操作
        if(!buffer.isNullAt(0)){
          buffer(0) = buffer.getInt(0) + input.getInt(0)
          buffer(1) = buffer.getInt(1) + 1
        }
      }
    
      //不同节点数据之间的聚合,分区间的合并
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
        buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
      }
    
      //计算逻辑
      override def evaluate(buffer: Row): Any = {
        buffer.getInt(0).toDouble / buffer.getInt(1)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    (4)自定义聚合函数实现-强类型

    应用于DataSet,执行DSL风格语句时,更加方便,定义类继承org.apache.spark.sql.expressions.Aggregator

    object SparkSQL05_UDAF {
      def main(args: Array[String]): Unit = {
        //创建SparkConf配置文件对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_UDF")
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    
        import spark.implicits._
    
        //读取json文件以创建DF
        val df: DataFrame = spark.read.json("data/user.json")
        //创建自定义函数对象
        val myAvgNew = new MyAvgNew
    
        // 如果自定义UDAF强类型,没有办法应用到SQL风格的DF查询
    //    //注册自定义函数
    //    spark.udf.register("myAvgNew",myAvgNew)
    //    //创建临时视图以查询
    //    df.createOrReplaceTempView("user")
    //    //使用聚合函数进行查询
    //    spark.sql("select myAvgNew(age) from user").show()
    
        //将df转换为ds
        val ds: Dataset[User06] = df.as[User06]
        //将自定义函数对象转换为查询列
        val col: TypedColumn[User06, Double] = myAvgNew.toColumn
        //在查询时,会将查询出来的记录(User06类型)交给自定义的函数处理
        ds.select(col).show()
        //释放资源
        spark.stop()
    
      }
    }
    //输入类型的样例类
    case class User06(name:String,age:Long)
    
    //平均年龄缓存的样例类
    case class AgeBuffer(var sum:Long,var count:Long)
    
    //自定义UDAF函数 -- 强类型
    class MyAvgNew extends Aggregator[User06,AgeBuffer,Double]{
    
      //对缓存数据进行初始化
      override def zero: AgeBuffer = {
        AgeBuffer(0,0)
      }
    
      //对当前分区内数据进行聚合
      override def reduce(b: AgeBuffer, a: User06): AgeBuffer = {
        b.sum = b.sum + a.age
        b.count = b.count + 1
        b
      }
    
      //分区间的合并
      override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
        b1.sum = b1.sum + b2.sum
        b1.count = b1.count + b2.count
        b1
      }
    
      //返回计算结果
      override def finish(buff: AgeBuffer): Double = {
        buff.sum.toDouble/buff.count
      }
    
      //DataSet默认的编、解码器,用于序列化,固定写法
      //用户自定义ref类型就是produce
      //系统自带类型根据数据类型进行选择
      override def bufferEncoder: Encoder[AgeBuffer] = {
        Encoders.product
      }
    
      override def outputEncoder: Encoder[Double] = {
        Encoders.scalaDouble
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    3 UDTF

    输入一行,返回多行(hive);

    SparkSQL中没有UDTF,spark中用flatMap即可实现该功能

  • 相关阅读:
    pytorch深度学习实战lesson17
    文本生成高精准3D模型,北京智源AI研究院等出品—3D-GPT
    CCF CSP认证 历年题目自练 Day20
    TI CC2642R 平台开发
    积分专题笔记-与路径无关条件
    OpenCV 4.x 版本的新特性都有哪些?
    Pycharm正版2022.2.2 | 官方翻译插件更新tkk失败解决
    4.9 多协议标记交换MPLS
    CentOS下多网卡绑定多IP段时导致只有一个会通的问题解决
    《设计模式》之代理模式
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/126642114