• 用户自定义函数_大数据培训


    用户自定义函数

    在Shell窗口中可以通过spark.udf功能用户可以自定义函数。

    1、 UDF

    1)创建DataFrame

    scala> val df = spark.read.json(“examples/src/main/resources/people.json”)

    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

    2)打印数据

    3)注册UDF,功能为在数据前添加字符串

    scala> spark.udf.register(“addName”, (x:String)=> “Name:”+x)

    res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(,StringType,Some(List(StringType)))

    4)创建临时表

    scala> df.createOrReplaceTempView(“people”)

    5)应用UDF

    scala> spark.sql(“Select addName(name), age from people”).show()

    +—————–+—-+

    |UDF:addName(name)| age|

    +—————–+—-+

    |     Name:Michael|null|

    |        Name:Andy|  30|

    |      Name:Justin|  19|

    +—————–+—-+

    2 UDAF

    强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。

    1)需求:实现求平均工资的自定义聚合函数。

    2)代码实现

    import org.apache.spark.sql.expressions.MutableAggregationBuffer

    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

    import org.apache.spark.sql.types._

    import org.apache.spark.sql.Row

    import org.apache.spark.sql.SparkSession

    object MyAverage extends UserDefinedAggregateFunction {

    // 聚合函数输入参数的数据类型

    def inputSchema: StructType = StructType(StructField(“inputColumn”, LongType) :: Nil)

    // 聚合缓冲区中值得数据类型

    def bufferSchema: StructType = {

    StructType(StructField(“sum”, LongType) :: StructField(“count”, LongType) :: Nil)

    }

    // 返回值的数据类型

    def dataType: DataType = DoubleType

    // 对于相同的输入是否一直返回相同的输出。

    def deterministic: Boolean = true

    // 初始化

    def initialize(buffer: MutableAggregationBuffer): Unit = {

    // 存工资的总额

    buffer(0) = 0L

    // 存工资的个数

    buffer(1) = 0L

    }

    // 同一个分区数据合并。

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

    if (!input.isNullAt(0)) {

    buffer(0) = buffer.getLong(0) + input.getLong(0)

    buffer(1) = buffer.getLong(1) + 1

    }

    }

    // 不同分区间数据合并

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)

    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)

    }

    // 计算最终结果

    def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)

    }

    3)函数使用

    想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

  • 相关阅读:
    2024年【N1叉车司机】免费试题及N1叉车司机模拟试题
    安信可 RG-02 LoRaWAN 网关集成网络配置,实现MQTT配置链接至网络服务器,实现数据打通。
    详谈判断点在多边形内的七种方法
    golang for循环append的数据重复
    基础架构之持续发布
    C#进阶06——多线程,预处理器指令
    Android Studio: unrecognized Attribute name MODULE
    1.5.4 HDFS 客户端操作-hadoop-最全最完整的保姆级的java大数据学习资料
    工序解释执行程序--工程师的成长
    适合弱电行业用的项目管理系统,找企智汇项目管理系统!
  • 原文地址:https://blog.csdn.net/zjjcchina/article/details/126401911