• SparkSQL系列-7、自定义UDF函数?


    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    传送门:大数据系列文章目录

    官方网址http://spark.apache.org/http://spark.apache.org/sql/
    在这里插入图片描述

    Hive的UDF函数和Spark的UDF函数?

    无论Hive还是SparkSQL分析处理数据时,往往需要使用函数, SparkSQL模块本身自带很多实现公共功能的函数,在org.apache.spark.sql.functions中。 SparkSQL与Hive一样支持定义函数: UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。

    回顾Hive中自定义函数有三种类型:

    第一种: UDF(User-Defined-Function) 函数

    • 一对一的关系,输入一个值经过函数以后输出一个值;
    • 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

    第二种: UDAF(User-Defined Aggregation Function) 聚合函数

    • 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

    第三种: UDTF(User-Defined Table-Generating Functions) 函数

    • 一对多的关系,输入一个值输出多个值(一行变为多行) ;
    • 用户自定义生成函数,有点像flatMap;

    目前来说Spark 框架各个版本及各种语言对自定义函数的支持:
    在这里插入图片描述

    在SparkSQL中,目前仅仅支持UDF函数和UDAF函数:

    • UDF函数:一对一关系;
    • UDAF函数:聚合函数,通常与group by 分组函数连用,多对一关系;

    由于SparkSQL数据分析有两种方式: DSL编程和SQL编程,所以定义UDF函数也有两种方式,不同方式可以在不同分析中使用。

    SQL 中使用UDF

    使用SparkSession中udf方法定义和注册函数,在SQL中使用,使用如下方式定义:
    在这里插入图片描述

    DSL 中使用UDF

    使用org.apache.sql.functions.udf函数定义和注册函数,在DSL中使用,如下方式:
    在这里插入图片描述

    自定义UDF函数及使用完整范例演示代码

    测试数据

    {"name":"Michael", "salary":3000}
    {"name":"Andy", "salary":4500}
    {"name":"Justin", "salary":3500}
    {"name":"Berta", "salary":4000}
    
    • 1
    • 2
    • 3
    • 4

    范例演示: 将姓名转换为小写,调用String中toLowerCase方法。

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.expressions.UserDefinedFunction
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
     * SparkSQL中定义UDF函数,两种方式,需求:将字符串转换为小写字母
     */
    object SparkSQLUdf {
      def main(args: Array[String]): Unit = {
        // 构建SparkSession实例对象
        val spark: SparkSession = SparkSession.builder()
          .master("local[4]")
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          // TODO: 设置shuffle时分区数目
          .config("spark.sql.shuffle.partitions", "4")
          .getOrCreate()
        Logger.getRootLogger.setLevel(Level.WARN)
        // 导入隐式转换
        import spark.implicits._
        // 导入函数库
        import org.apache.spark.sql.functions._
        // 读取JSON格式数据
        val empDF: DataFrame = spark.read.json("datas/resources/employees.json")
        /*
        root
        |-- name: string (nullable = true)
        |-- salary: long (nullable = true)
        */
        empDF.printSchema()
        /*
        +-------+------+
        |name |salary|
        +-------+------+
        |Michael|3000 |
        |Andy |4500 |
        |Justin |3500 |
        |Berta |4000 |
        +-------+------+
        */
        empDF.show(10, truncate = false)
        // TODO: 定义UDF函数,在SQL中使用
        spark.udf.register(
          "lower_name", // 函数名称
          (name: String) => name.toLowerCase
        )
        // 注册DataFrame为临时视图
        empDF.createOrReplaceTempView("view_tmp_emp")
        spark.sql(
          """
            |SELECT name, lower_name(name) AS new_name FROM view_tmp_emp
    """.stripMargin)
          .show(10, truncate = false)
        println("==================================================")
        // TODO: 定义UDF函数,在DSL中使用
        val lower_udf: UserDefinedFunction = udf(
          (name: String) => name.toLowerCase
        )
        empDF
          .select(
            $"name", //
            lower_udf($"name").as("new_name") //
          )
          .show(10, truncate = false)
        // 应用结束,关闭资源
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    执行结果:

    root
     |-- name: string (nullable = true)
     |-- salary: long (nullable = true)
    
    +-------+------+
    |name   |salary|
    +-------+------+
    |Michael|3000  |
    |Andy   |4500  |
    |Justin |3500  |
    |Berta  |4000  |
    +-------+------+
    
    +-------+--------+
    |name   |new_name|
    +-------+--------+
    |Michael|michael |
    |Andy   |andy    |
    |Justin |justin  |
    |Berta  |berta   |
    +-------+--------+
    
    ==================================================
    +-------+--------+
    |name   |new_name|
    +-------+--------+
    |Michael|michael |
    |Andy   |andy    |
    |Justin |justin  |
    |Berta  |berta   |
    +-------+--------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  • 相关阅读:
    Linus Torvalds接受来自微软的Linux Hyper-V升级
    实现日期间的运算——C++
    JAVA --- Set
    【竞品分析】竞品分析报告的基本模板
    深度详解 Android R(11.0)Service 启动过程
    西安mPEG-DSPE磷脂聚乙二醇_CAS:178744-28-0供应商价格
    内网配置git代理
    学习ssh配置
    Minifilter过滤驱动与R3程序通讯实现文件保护
    【回眸】牛客网刷刷刷!(八)——中断专题
  • 原文地址:https://blog.csdn.net/l848168/article/details/126464307