• Scala | SparkSQL | 创建DataSet | 序列化问题 | UDF与UDAF | 开窗函数




    一、SparkSQL

    1.SparkSQL介绍

      HiveShark 的前身,SharkSparkSQL 的前身,SparkSQL 产生的根本原因是其完全脱离了 Hive 的限制

    • SparkSQL 支持查询原生的 RDDRDDSpark 平台的核心概念, 是 Spark 能够高效的处理大数据的各种场景的基础。
    • 能够在 scalaJava中写 SQL 语句。支持简单的 SQL 语法检查,能够在Scala 中写 Hive 语句访问 Hive 数据,并将结果取回作为 RDD 使用。
    • Spark on HiveHive 只作为储存角色,Spark 负责 sql 解析优化,执行。
    • Hive on SparkHive 即作为存储又负责 sql 的解析优化,Spark 负责执行。

    两者数据源均为Hive表,底层人物均为Spark人物,关键区别在于一个是Hive去解析,一个是Spark去解析。

    name存数据的位置解析引擎执行引擎
    hive on sparkhive表hivespark
    spark on hivehive表sparkspark

    2.Dataset 与 DataFrame概念解析

    在这里插入图片描述

      Dataset 也是一个分布式数据容器。与 RDD 类似,然而 Dataset 更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息(元数据),即schema。同时,与 Hive 类似,Dataset 也支持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上 看, Dataset API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。Dataset 的底层封装的是 RDD,当 RDD 的泛型是 Row 类型的时候,我们也可以称它为 DataFrame

    Dataset<Row> = DataFrame
    
    • 1

    3.SparkSQL 的数据源

      SparkSQL 的数据源可以是 JSON 类型的字符串,JDBCParquentHiveHDFS 等。
    在这里插入图片描述
    可以将不同源中的数据进行join,这就是SparkSQL中的异构数据源的操作。

    4.SparkSQL 底层架构

      首先拿到 sql 后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过 SparkPlanner 的策略转化成一批物理计划,随后经过消费模型转换成一个个的 Spark 任务执行。
    在这里插入图片描述

    5.谓词下推(predicate Pushdown)

    在这里插入图片描述

    二、创建DataSet的几种方式

    1.读取 json 格式的文件创建 Dataset

      读取 json 格式的文件有两种方式:

    • 一种是spark.read.json(文件路径)
    • 另一种是spark.read.format("json").load(文件路径)

    注意

    • json 文件中的 json 数据不能嵌套 json 格式数据。
    • Dataset 是一个一个 Row 类型的 RDDds.rdd()/ds.javaRdd()
    • df.show()默认显示前 20 行数据。
    • json文件自带元数据,默认排序时字段名按照字典序排序,然后类型自动推断。
    • 注册成临时表时,表中的列默认按 ascii 顺序显示列。
    package com.shsxt.scala_Test.sql.dataset
    
    import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
    
    object CreateDFFromJsonFile {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local")
          .appName("jsonRDD")
          .getOrCreate()
    
    
        val jsonDS: DataFrame = spark.read.json("./data/json")
    
        jsonDS.show();
    
        jsonDS.createOrReplaceTempView("jsonTable")
    
        val result: DataFrame = spark.sql("select * from jsonTable where age is not NULL")
        result.show()
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    +----+--------+
    | age|    name|
    +----+--------+
    |  18|zhangsan|
    |null|    lisi|
    |  18|  wangwu|
    |  28|  laoliu|
    |  20|zhangsan|
    |null|    lisi|
    |  18|  wangwu|
    |  28|  laoliu|
    |  28|zhangsan|
    |null|    lisi|
    |  18|  wangwu|
    +----+--------+
    
    +---+--------+
    |age|    name|
    +---+--------+
    | 18|zhangsan|
    | 18|  wangwu|
    | 28|  laoliu|
    | 20|zhangsan|
    | 18|  wangwu|
    | 28|  laoliu|
    | 28|zhangsan|
    | 18|  wangwu|
    +---+--------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.通过 json 格式的 RDD 创建 Dataset

    package com.shsxt.scala_Test.sql.dataset
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, SQLContext, SparkSession}
    
    
    case class Person(id: String, name: String, age: Integer)
    
    
    object CreateDFFromRDDWithReflect {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local")
          .appName("jsonRDD")
          .getOrCreate()
    
        val sc: SparkContext = spark.sparkContext
        val nameRDD: RDD[String] = sc.parallelize(Array(
          "{'name':'zhangsan','age':\"18\"}",
          "{\"name\":\"lisi\",\"age\":\"19\"}",
          "{\"name\":\"wangwu\",\"age\":\"20\"}"))
    
        val scoreRDD: RDD[String] = sc.parallelize(Array(
          "{\"name\":\"zhangsan\",\"score\":\"100\"}",
          "{\"name\":\"lisi\",\"score\":\"200\"}",
          "{\"name\":\"wangwu\",\"score\":\"300\"}"
        ))
    
        val name: DataFrame = spark.read.json(nameRDD)
        val score: DataFrame = spark.read.json(scoreRDD)
    
        //注册成临时表使用
        name.createOrReplaceTempView("nameTable");
        score.createOrReplaceTempView("scoreTable");
        val result: DataFrame = spark.sql(
          """
            |select nameTable.name,nameTable.age,scoreTable.score
            |from nameTable join scoreTable
            |on nameTable.name = scoreTable.name
            |""".stripMargin)
        result.show()
    
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    +--------+---+-----+
    |    name|age|score|
    +--------+---+-----+
    |  wangwu| 20|  300|
    |zhangsan| 18|  100|
    |    lisi| 19|  200|
    +--------+---+-----+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.非 json 格式的 RDD 创建 Dataset

    3.1 反射

      通过反射的方式将非 json 格式的 RDD 转换成 Dataset。实际上就是先将数据转换成自定义类对象,变成RDD,在底层通过反射的方式解析Person.class获得Person的所有schema信息(field),结合RDD本身,就生成了Dataset

    • 自定义类要可序列化
    • 自定义类的访问级别是 Public
    • RDD 转成 Dataset 后会根据映射将字段按 Assci 码排序
    • Dataset转换成RDD时获取字段两种方式,一种是ds.getInt(0)下标获取(不推荐使用),另一种是 ds.getAs(“列名”)获取(推荐使用)
    package com.shsxt.scala_Test.sql.dataset
    
    import org.apache.spark.sql.{DataFrame, Dataset, SQLContext, SparkSession}
    
    case class Person(id: String, name: String, age: Integer)
    
    
    object CreateDFFromRDDWithReflect {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local")
                .appName("jsonRDD")
                .getOrCreate()
    
            import spark.implicits._
            
            //将非json格式RDD隐式转换成Dataset
            val ds: Dataset[Person] = spark.read.textFile("data/person.txt").map { x =>
                 Person(x.split(",")(0), x.split(",")(1), Integer.valueOf(x.split(",")(2)))
            }
    
    
            ds.createOrReplaceTempView("person")
            val result: DataFrame = spark.sql("select name ,id from person ")
            result.show()
            spark.stop()
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    +--------+---+
    |    name| id|
    +--------+---+
    |zhangsan|  1|
    |    lisi|  2|
    |  wangwu|  3|
    +--------+---+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.2 动态创建 Schema

      动态创建 Schema 将非 json 格式的 RDD 转换成 Dataset。

    package com.shsxt.scala_Test.sql.dataset
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, Row, RowFactory, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    
    object CreateDFFromRDDWithStruct {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local")
                .appName("schema")
                .getOrCreate()
    
            val sc: SparkContext = spark.sparkContext
            val lineRDD: RDD[String] = sc.textFile("data/person.txt")
            val rowRDD: RDD[Row] = lineRDD.map { x => {
                val split: Array[String] = x.split(",")
                RowFactory.create(split(0), split(1), Integer.valueOf(split(2)))
            }
            }
    
            val schema: StructType = StructType(List(
                StructField("id", StringType, true),
                StructField("name", StringType, true),
                StructField("age", IntegerType, true)
            ))
    
            val df: DataFrame = spark.createDataFrame(rowRDD, schema)
            df.show()
            df.printSchema()
            sc.stop()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 18|
    |  2|    lisi| 19|
    |  3|  wangwu| 20|
    +---+--------+---+
    
    root
     |-- id: string (nullable = true)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = true)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.读取 parquet 文件创建 Dataset

      读取与保存二进制格式–parquet 文件。

    • 可以将 Dataset 存储成 parquet 文件。保存成 parquet 文件的方式有两种:

      df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet")
      df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet")
      
      • 1
      • 2
    • 读取 parquet 文件的方式有两种:

      spark.read.parquet("./sparksql/parquet")
      spark.read.format("parquet").load("./sparksql/parquet")
      
      • 1
      • 2

    SaveMode 指定文件保存时的模式:

    • Overwrite:覆盖
    • Append:追加
    • ErrorIfExists:如果存在就报错
    • Ignore:如果存在就忽略
    package com.shsxt.scala_Test.sql.dataset
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object CreateDFFromParquet {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local")
          .appName("jsonRDD")
          .getOrCreate()
    
    
        val df: DataFrame = spark.read.json("./data/json")
        df.write.mode(saveMode="Overwrite").format("parquet").save("./data/parquet")
    //    df.write.mode(saveMode="Overwrite").parquet("./data/parquet")
    	
    	val result: DataFrame = spark.read.parquet("./data/parquet")
        result.show()
        
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    +----+--------+
    | age|    name|
    +----+--------+
    |  18|zhangsan|
    |null|    lisi|
    |  18|  wangwu|
    |  28|  laoliu|
    |  20|zhangsan|
    |null|    lisi|
    |  18|  wangwu|
    |  28|  laoliu|
    |  28|zhangsan|
    |null|    lisi|
    |  18|  wangwu|
    +----+--------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    5.读取 JDBC 中的数据创建 Dataset(MySql 为例)

      从MYSQL中读取创建 Dataset与写入。读取JDBC所需参数及书写案例

    • 第一种方式读取MySql数据库表,加载为DataFrame
    • 第二种方式读取MySql数据表加载为Dataset
     package com.shsxt.scala_Test.sql.dataset
    
    import java.util.{HashMap, Properties}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, DataFrameReader, SQLContext, SaveMode, SparkSession}
    
    import java.util
    
    object CreateDFFromMysql {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local")
                .appName("jsonRDD")
                .getOrCreate()
    
    
            /**
              * 第一种方式:读取Mysql数据库表创建DS
              */
            val options: util.HashMap[String, String] = new HashMap[String, String]();
            options.put("url", "jdbc:mysql://localhost:3306/spark")
            options.put("driver", "com.mysql.jdbc.Driver")
            options.put("user", "root")
            options.put("password", "root")
            options.put("dbtable", "person")
            val person: DataFrame = spark.read.format("jdbc").options(options).load()
            person.show()
            person.createOrReplaceTempView("person")
            
            /**
              * 第二种方式:读取Mysql数据库表创建DS
              */
            val reader: DataFrameReader = spark.read.format("jdbc")
            reader.option("url", "jdbc:mysql://localhost:3306/spark")
            reader.option("driver", "com.mysql.jdbc.Driver")
            reader.option("user", "root")
            reader.option("password", "root")
            reader.option("dbtable", "score")
            val score: DataFrame = reader.load()
            score.show()
            score.createOrReplaceTempView("score")
    
    
            val result: DataFrame = spark.sql(
                """
                  |select person.id,person.name,person.age,score.score
                  |from person,score
                  |where person.name = score.name  and score.score> 82"
                  |""".stripMargin);
    
            result.show()
            /**
              * 将数据写入到Mysql表中
              */
    
            val properties: Properties = new Properties()
            properties.setProperty("user", "root")
            properties.setProperty("password", "root")
            result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)
    
            spark.stop()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    6.读取 Hive 中的数据加载成 Dataset

    package com.shsxt.java_Test.sql.dataset;
    
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    
    public class CreateDSFromHive {
    
    	public static void main(String[] args) {
            SparkSession sparkSession = SparkSession
                    .builder()
                    .appName("hvie")
                    //开启hive的支持,接下来就可以操作hive表了
                    // 前提需要是需要开启hive metastore 服务
                    .enableHiveSupport()
                    .getOrCreate();
    
            sparkSession.sql("USE spark");
            sparkSession.sql("DROP TABLE IF EXISTS student_infos");
    		//在hive中创建student_infos表
            sparkSession.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");
            sparkSession.sql("load data local inpath '/root/student_infos' into table student_infos");
            //注意:此种方式,程序需要能读取到数据(如/root/student_infos),同时也要能读取到 metastore服务的配置信息。
    
            sparkSession.sql("DROP TABLE IF EXISTS student_scores");
            sparkSession.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");
            sparkSession.sql("LOAD DATA "
    				+ "LOCAL INPATH '/root/student_scores'"
    				+ "INTO TABLE student_scores");
    
    
    
    //		Dataset df = hiveContext.table("student_infos");//读取Hive表加载Dataset方式
            /**
             * 查询表生成Dataset
             */
    		Dataset<Row> goodStudentsDF = sparkSession.sql("SELECT si.name, si.age, ss.score "
    				+ "FROM student_infos si "
    				+ "JOIN student_scores ss "
    				+ "ON si.name=ss.name "
    				+ "WHERE ss.score>=80");
    
    		goodStudentsDF.registerTempTable("goodstudent");
            Dataset<Row>  result = sparkSession.sql("select * from goodstudent");
    		result.show();
    		
    		/**
    		 * 将结果保存到hive表 good_student_infos
    		 */
            sparkSession.sql("DROP TABLE IF EXISTS good_student_infos");
    		goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
    
            sparkSession.stop();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    三、序列化问题

      序列化是生成对象的一种方式。

    1. 反序列化时serializable版本号不一致时会导致不能反序列化。
      简单来说,Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的, 可以进行反序列化,否则就会出现序列化版本不一致的异常。当实现java.io.Serializable接口的实体(类)没有显式地定义一个名为serialVersionUID,类型为long的变量时,Java序列化机制会根据编译的class自动生成一个serialVersionUID作序列化版本比较用,这种情况下,只有同一次编译生成的class才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本,即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID,类型为long的变量,不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。
    2. 子类中实现了serializable接口,父类中没有实现,父类中的变量是不能被序列化,序列化后父类中的变量会得到null。
      一个子类实现了 Serializable 接口,它的父类都没有实现 Serializable 接口,序列化该子类对象,然后反序列化后输出父类定义的某变量的数值,该变量数值与序列化时的数值不同。(需要在父类中是实现默认的构造方法,否则会报异常:no validconstructor)在父类没有实现 Serializable 接口时,虚拟机是不会序列化父对象的,而一个 Java 对象的构造必须先有父对象,才有子对象,反序列化也不例外。所以反序列化时,为了构造父对象,只能调用父类的无参构造函数作为默认的父对象。因此当我们取父对象的变量值时,它的值是调用父类无参构造函数后的值。如果你考虑到这种序列化的情况,在父类无参构造函数中对变量进行初始化,否则的话,父类变量值都是默认声明的值,如 int 型的默认是 0,string 型的默认是 null。
      • 注意:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化(应用:将一些不需要序列化的属性值抽取出来放到父类(未实现序列化接口),子类实现序列化接口)
    3. 被关键字transient修饰的变量不能被序列化。
    4. 静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。
      • 静态变量的值是在jvm中,能获取到不是因为反序列化。

    四、自定义函数 UDF 和 UDAF

    1.自定义函数 UDF

    package com.shsxt.scala_Test.sql.udf_udaf
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    object UDF {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local")
          .appName("udaf")
          .getOrCreate()
        val sc: SparkContext = spark.sparkContext
    
    
        // 构造模拟数据
        val names: Array[String] = Array("Leo", "Marry", "Jack", "Tom")
        val namesRDD: RDD[String] = sc.parallelize(names, 5)
        val namesRowRDD: RDD[Row] = namesRDD.map { name => Row(name) }
        val structType: StructType = StructType(Array(StructField("name", StringType, true)))
        val namesDF: DataFrame = spark.createDataFrame(namesRowRDD, structType)
    
        // 注册一张names表
        namesDF.createOrReplaceTempView("names")
    
        // 定义和注册自定义函数
        // 定义函数:自己写匿名函数
        // 注册函数:SQLContext.udf.register()
        spark.udf.register("strLen", (str: String) => str.length())
    
        // 使用自定义函数
        val result: DataFrame = spark.sql("select name,strLen(name) from names")
        result.show()
    
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    +-----+----------------+
    | name|UDF:strLen(name)|
    +-----+----------------+
    |  Leo|               3|
    |Marry|               5|
    | Jack|               4|
    |  Tom|               3|
    +-----+----------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.自定义函数 UDAF

      实现 UDAF 函数,如果要自定义类,要实现UserDefinedAggregateFunction 类。

    package com.shsxt.scala_Test.sql.udf_udaf
    
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    
    
    class MyAverage extends UserDefinedAggregateFunction {
      //指定输入数据的字段名称以及类型
      def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
    
      // 指定buffer字段的字段名称以及类型
      def bufferSchema: StructType = {
        StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
      }
    
      // UDAF函数返回值的类型
      def dataType: DataType = DoubleType
    
      // 相同的输入是否输出相同的结果
      def deterministic: Boolean = true
    
      // 初始化buffer中的元素
      def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
    
      // 每个组,有新的值进来的时候,进行分组对应的聚合值的计算(局部更新)
      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (!input.isNullAt(0)) {
          buffer(0) = buffer.getLong(0) + input.getLong(0)
          buffer(1) = buffer.getLong(1) + 1
        }
      }
    
      // 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并(全局更新)
      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
    
      // 最后返回一个最终的聚合值     
      // 要和dataType的类型一一对应
      def evaluate(buffer: Row): Double = {
        buffer.getLong(0).toDouble / buffer.getLong(1)
      }
    }
    
    object UDAF {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local")
          .appName("udaf")
          .getOrCreate()
    
        //注册聚合函数
        spark.udf.register("myAverage", new MyAverage())
    
        val df: DataFrame = spark.read.json("data/json")
        df.show()
        df.createOrReplaceTempView("person")
    
    
        val result: DataFrame = spark.sql("SELECT myAverage(age) as average_age FROM person")
        result.show()
    
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    +---+--------+
    |age|    name|
    +---+--------+
    | 18|zhangsan|
    | 18|    lisi|
    | 18|  wangwu|
    | 28|  laoliu|
    | 20|zhangsan|
    | 18|    lisi|
    | 18|  wangwu|
    | 28|  laoliu|
    | 28|zhangsan|
    | 18|    lisi|
    | 18|  wangwu|
    +---+--------+
    
    +-----------------+
    |      average_age|
    +-----------------+
    |20.90909090909091|
    +-----------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    5.开窗函数

      SQL函数

    • row_number(): 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于分组取 topN

    • 开窗函数格式:相当于增加一列唯一的递增的从一开始的序列。

      row_number() over (partitin by XXX order by XXX)
      
      • 1
    package com.shsxt.scala_Test.sql.windowFun
    
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    //定义Sales样例类
    case class Sales(riqi: String, leibie: String, jine: String)
    
    object RowNumberWindowFun {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local")
          .appName("window")
          .enableHiveSupport()
          .getOrCreate()
    
        import spark.implicits._
    
        val ds1: Dataset[String] = spark.read.textFile("./data/sales.txt")
    
        val ds2: Dataset[Sales] = ds1.map(x => {
          Sales(x.split("\t")(0), x.split("\t")(1), x.split("\t")(2))
        })
    
        ds2.createTempView("sales")
    
        /**
         * 开窗函数格式:
         * 【 rou_number() over (partitin by XXX order by XXX) 】
         */
        val result: DataFrame = spark.sql(
          """
            |select riqi,leibie,jine
            |from (select riqi,leibie,jine,
            |      row_number() over (partition by leibie order by jine desc) as rank
            |      from sales) as t
            |where t.rank<=3
            |""".stripMargin);
        result.show();
    
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    +----+------+----+
    |riqi|leibie|jine|
    +----+------+----+
    |   6|     F|  96|
    |   9|     F|  87|
    |   9|     F|  84|
    |   7|     E|  97|
    |   4|     E|  94|
    |   9|     E|  90|
    |   8|     B|  98|
    |   9|     B|  82|
    |   7|     B|  67|
    |   3|     D|  93|
    |   1|     D|   9|
    |   8|     D|  79|
    |   5|     C|  95|
    |   9|     C|  86|
    |   9|     C|  81|
    |   9|     A|  99|
    |   2|     A|  92|
    |   9|     A|  88|
    |   1|     G|  91|
    |   9|     G|  89|
    |   8|     G|  75|
    +----+------+----+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  • 相关阅读:
    Zookeeper如何实现Leader选举
    (持续更新中!)详解【计算机类&面试真题】军队文职考试 ——第一期(真题+解析)| TCP、UDP的区别;程序局部性;ICMP协议及主要功能;重定位所需的计算机硬件
    Redis-持久化讲解
    【Vue】Vue3 Swiper 插件 loop 无限滚动、并且暂停的问题
    【zookeeper】报错整理 zookeeper Packet len* is out of range
    再拯救一下,语言搞多了总是忘,曲不离口,敲敲基础的fortran77小程序
    引导过程与服务控制
    利用CompletableFuture提高接口的响应速度
    弄清数据库索引的来龙去脉
    1204. 最后一个能进入电梯的人
  • 原文地址:https://blog.csdn.net/liujiesxs/article/details/125969498