• Spark_SQL函数定义(定义UDF函数、使用窗口函数)


                        一、UDF函数定义

            (1)函数定义

            (2)Spark支持定义函数

            (3)定义UDF函数

                    (4)定义返回Array类型的UDF

            (5)定义返回字典类型的UDF

    二、窗口函数

            (1)开窗函数简述

            (2)窗口函数的语法


    一、UDF函数定义

            (1)函数定义

            无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
            Hive中自定义函数有三种类型:

            第一种:UDF(User-Defined_-function)函数

                    · 一对一的关系,输入一个值经过函数以后输出一个值;

                    · 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

            第二种:UDAF(User-Defined Aggregation Function)聚合函数

                    · 多对一的关系,输入多个值输出一个值,通常于groupBy联合使用;

            第三种:UDTF(User-Defined Table-Generating Functions)函数

                    · 一对多的关系,输入一个值输出多个值(一行变多为行);

                    · 用户自定义生成函数,有点像flatMap;

            (2)Spark支持定义函数

            目前来说Spark框架各个版本及各种语言对自定义函数的支持:在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF。

    Spark版本及支持函数定义
    Apache Spark VersionSpark SQL UDF(Python,Java,Scala)Spark SQL UDAF(Java,Scala)Spark SQL UDF(R)Hive UDF,UDAF,UDTF
    1.1-1.4
    1.5experimental
    1.6
    2.0
            (3)定义UDF函数

            ①sparksession.udf.register()

            注册的UDF可以用于DSL和SQL,返回值用于DSL风格,传参内给的名字用于SQL风格。

            ②pyspark.sql.functions.udf

            仅能用于DSL风格

            其中F是:from pyspark.sql import functions as F。其中,被注册为UDF的方法名是指具体的计算方法,如:def add(x, y): x + y  。 add就是将要被注册成UDF的方法名

    1. # cording:utf8
    2. from pyspark.sql import SparkSession
    3. import pyspark.sql.functions as F
    4. from pyspark.sql.types import IntegerType, StringType, StructType
    5. if __name__ == '__main__':
    6. spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    7. sc = spark.sparkContext
    8. # 构建一个RDD
    9. rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    10. df = rdd.toDF(['num'])
    11. # TODO 1:方式1 sparksession.udf.register(),DSL和SQL风格均可使用
    12. # UDF的处理函数
    13. def num_ride_10(num):
    14. return num * 10
    15. # 参数1:注册的UDF的名称,这个UDF名称,仅可以用于SQL风格
    16. # 参数2:UDF的处理逻辑,是一个单独定义的方法
    17. # 参数3:声明UDF的返回值类型,注意:UDF注册时候,必要声明返回值类型,并且UDF的真实返回值一定要和声明的返回值一致
    18. # 当前这种方式定义的UDF,可以通过参数1的名称用于SQL风格,通过返回值对象用户的DSL风格
    19. udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())
    20. # SQL风格中使用
    21. # selectExpr 以SELECT的表达式执行,表达式SQL风格的表达式(字符串)
    22. # select方法,接受普通的字符串字段名,或者返回值时Column对象的计算
    23. df.selectExpr('udf1(num)').show()
    24. # DSL 风格使用
    25. # 返回值UDF对象,如果作为方法使用,传入的参数一定是Column对象
    26. df.select(udf2(df['num'])).show()
    27. # TODO 2:方式2注册,仅能用于DSL风格
    28. udf3 = F.udf(num_ride_10, IntegerType())
    29. df.select(udf3(df['num'])).show()

            方式1结果:

            方式2结果:

                    (4)定义返回Array类型的UDF

            注意:数组或者list类型,可以使用spark的ArrayType来描述即可。

            注意:声明ArrayType要类似这样::ArrayType(StringType()),在ArrayType中传入数组内的数据类型。

    1. # cording:utf8
    2. from pyspark.sql import SparkSession
    3. import pyspark.sql.functions as F
    4. from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
    5. if __name__ == '__main__':
    6. spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    7. sc = spark.sparkContext
    8. # 构建一个RDD
    9. rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']])
    10. df = rdd.toDF(['line'])
    11. # 注册UDF,UDF的执行函数定义
    12. def split_line(data):
    13. return data.split(' ')
    14. # TODO 1:方式1 后见UDF
    15. udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType()))
    16. # DLS 风格
    17. df.select(udf2(df['line'])).show()
    18. # SQL风格
    19. df.createTempView('lines')
    20. spark.sql('SELECT udf1(line) FROM lines').show(truncate=False)
    21. # TODO 2:方式的形式构建UDF
    22. udf3 = F.udf(split_line, ArrayType(StringType()))
    23. df.select(udf3(df['line'])).show(truncate=False)

            

            (5)定义返回字典类型的UDF

            注意:字典类型返回值,可以用StructType来进行描述,StructType是—个普通的Spark支持的结构化类型.
            只是可以用在:
                    · DF中用于描述Schema
                    · UDF中用于描述返回值是字典的数据

    1. # cording:utf8
    2. import string
    3. from pyspark.sql import SparkSession
    4. import pyspark.sql.functions as F
    5. from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
    6. if __name__ == '__main__':
    7. spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    8. sc = spark.sparkContext
    9. # 假设 有三个数字: 1 2 3 在传入数字,返回数字所在序号对应的 字母 然后和数字结合组成dict返回
    10. # 例:传入1 返回{'num':1, 'letters': 'a'}
    11. rdd = sc.parallelize([[1], [2], [3]])
    12. df = rdd.toDF(['num'])
    13. # 注册UDF
    14. def process(data):
    15. return {'num': data, 'letters': string.ascii_letters[data]}
    16. '''
    17. UDF返回值是字典的话,需要用StructType来接收
    18. '''
    19. udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\
    20. add('letters', StringType(), nullable=True))
    21. # SQL风格
    22. df.selectExpr('udf1(num)').show(truncate=False)
    23. # DSL风格
    24. df.select(udf1(df['num'])).show(truncate=False)

            (6)通过RDD构建UDAF函数

    1. # cording:utf8
    2. import string
    3. from pyspark.sql import SparkSession
    4. import pyspark.sql.functions as F
    5. from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
    6. if __name__ == '__main__':
    7. spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    8. sc = spark.sparkContext
    9. rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    10. df = rdd.map(lambda x: [x]).toDF(['num'])
    11. # 方法:使用RDD的mapPartitions 算子来完成聚合操作
    12. # 如果用mapPartitions API 完成UDAF聚合,一定要单分区
    13. single_partition_rdd = df.rdd.repartition(1)
    14. def process(iter):
    15. sum = 0
    16. for row in iter:
    17. sum += row['num']
    18. return [sum] # 一定要嵌套list,因为mapPartitions方法要求返回值是list对象
    19. print(single_partition_rdd.mapPartitions(process).collect())

    二、窗口函数

            (1)开窗函数简述

            ●介绍

            开窗函数的引入是为了既显示聚集前的数据又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

            ●聚合函数和开窗函数

            聚合函数是将多行变成一行,count,avg...

            开窗函数是将一行变成多行;

            聚合函数如果要显示其他的列必须将列加入到group by中,开窗函数可以不使用group by,直接将所有信息显示出来。

            ●开窗函数分类

            1.聚合开窗函数 聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句

            2.排序开窗函数 排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。

            3.分区类型NTILE的窗口函数

            (2)窗口函数的语法

            窗口函数的语法:

    1. # cording:utf8
    2. import string
    3. from pyspark.sql import SparkSession
    4. import pyspark.sql.functions as F
    5. from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
    6. if __name__ == '__main__':
    7. spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    8. sc = spark.sparkContext
    9. rdd = sc.parallelize([
    10. ('张三', 'class_1', 99),
    11. ('王五', 'class_2', 35),
    12. ('王三', 'class_3', 57),
    13. ('王久', 'class_4', 12),
    14. ('王丽', 'class_5', 99),
    15. ('王娟', 'class_1', 90),
    16. ('王军', 'class_2', 91),
    17. ('王俊', 'class_3', 33),
    18. ('王君', 'class_4', 55),
    19. ('王珺', 'class_5', 66),
    20. ('郑颖', 'class_1', 11),
    21. ('郑辉', 'class_2', 33),
    22. ('张丽', 'class_3', 36),
    23. ('张张', 'class_4', 79),
    24. ('黄凯', 'class_5', 90),
    25. ('黄开', 'class_1', 90),
    26. ('黄恺', 'class_2', 90),
    27. ('王凯', 'class_3', 11),
    28. ('王凯杰', 'class_1', 11),
    29. ('王开杰', 'class_2', 3),
    30. ('王景亮', 'class_3', 99)])
    31. schema = StructType().add('name', StringType()).\
    32. add('class', StringType()).\
    33. add('score', IntegerType())
    34. df = rdd.toDF(schema)
    35. # 创建表
    36. df.createTempView('stu')
    37. # TODO 1:聚合窗口函数的演示
    38. spark.sql('''
    39. SELECT *, AVG(score) over() AS avg_socre FROM stu
    40. ''').show()
    41. # TODO 2: 排序相关的窗口函数计算
    42. # RANK over, DENSE_RANK over, ROW_NUMBER over
    43. spark.sql('''
    44. SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
    45. DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
    46. RANK() OVER(ORDER BY score) AS RANK
    47. FROM stu
    48. ''').show()
    49. # TODO NTILE
    50. spark.sql('''
    51. SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    52. ''').show()

            TODO1结果:

            TODO2结果展示:

            TODO3结果展示:

  • 相关阅读:
    Linux-命令大全
    redux、mobx
    jadx 反编译apk
    数学建模插值分析法(附完整代码)python实现(插值&拟合)
    Error inflating class com.baidu.mapapi.map.MapView
    详解module.exports与exports,export与export default,import 与require
    点云从入门到精通技术详解100篇-基于 Kinect v2 相机的柑橘点云分割与配准(续)
    03UEc++【打飞艇:导弹与飞艇的碰撞事件】
    蓝桥杯动态规划集齐图案
    pg 模拟主库down机之pg_kaboom
  • 原文地址:https://blog.csdn.net/2202_75347029/article/details/134019344