• Spark SQL函数


    内置函数

    ​     Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions

    中。其中大部分函数与Hive中的相同。

    ​     使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL

    语句中使用。

    - 以编程的方式使用lower()函数将用户姓名转为小写/大写,代码如下:

    ```

    df.select(lower(col("name")).as("greet")).show()

    df.select(upper(col("name")).as("greet")).show()

    ```

    ​     上述代码中,df指的是DataFrame对象,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col("name")指定要查询的列,也可以使用$"name"代替,代码如下:

    ```

    df.select(lower($"name").as("greet")).show()

    ```

    - 以SQL语句的方式使用lower()函数,代码如下:

    ```

    df.createTempView("temp")

    spark.sql("select upper(name) as greet from temp").show()

    ```

    ​     除了可以使用select()方法查询指定的列外,还可以直接使用filter()、groupBy()等方法对DataFrame数据进行过滤和分组,例如以下代码:

    ```

    df.printSchema()  # 打印Schema信息

    df.select("name").show()  # 查询name列

    # 查询name列和age列,其中将age列的值增加1

    df.select($"name",$"age"+1).show()

    df.filter($"age">25).show() # 查询age>25的所有数据

    # 根据age进行分组,并求每一组的数量

    df.groupBy("age").count().show()

    ```

    自定义函数

    ​     当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据需求编写自定义函数(User Defined Functions, UDF),然后在Spark SQL中调用。

    ​    例如有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位数字用星号(*)代替,比如手机号180****2688。这时就可以编写一个自定义函数来实现这个需求,实现代码如下:

    ​    

    ```

    package spark.demo.sql

    import org.apache.spark.rdd.RDD

    import org.apache.spark.sql.types.{StringType, StructField, StructType}

    import org.apache.spark.sql.{Row, SparkSession}

    /**

     * 用户自定义函数,隐藏手机号中间4位

     */

    object SparkSQLUDF {

      def main(args: Array[String]): Unit = {

        //创建或得到SparkSession

        val spark = SparkSession.builder()

          .appName("SparkSQLUDF")

          .master("local[*]")

          .getOrCreate()

        //第一步:创建测试数据(或直接从文件中读取)

        //模拟数据

        val arr=Array("18001292080","13578698076","13890890876")

        //将数组数据转为RDD

        val rdd: RDD[String] = spark.sparkContext.parallelize(arr)

        //将RDD[String]转为RDD[Row]

        val rowRDD: RDD[Row] = rdd.map(line=>Row(line))

        //定义数据的schema

        val schema=StructType(

          List{

            StructField("phone",StringType,true)

          }

        )

        //将RDD[Row]转为DataFrame

        val df = spark.createDataFrame(rowRDD, schema)

        //第二步:创建自定义函数(phoneHide)

        val phoneUDF=(phone:String)=>{

          var result = "手机号码错误!"

          if (phone != null && (phone.length==11)) {

            val sb = new StringBuffer

            sb.append(phone.substring(0, 3))

            sb.append("****")

            sb.append(phone.substring(7))

            result = sb.toString

          }

          result

        }

        //注册函数(第一个参数为函数名称,第二个参数为自定义的函数)

        spark.udf.register("phoneHide",phoneUDF)

        //第三步:调用自定义函数

        df.createTempView("t_phone")    //创建临时视图

        spark.sql("select phoneHide(phone) as phone from t_phone").show()

        // +-----------+

        // |      phone|

        // +-----------+

        // |180****2080|

        // |135****8076|

        // |138****0876|

        // +-----------+

      }

    }

    ```

    窗口(开窗)函数

    ​    开窗函数是为了既显示聚合前的数据,又显示聚合后的数据,即在每一行的最后一列添加聚合函数的结果。开窗口函数有以下功能:

    - 同时具有分组和排序的功能

    - 不减少原表的行数

    - 开窗函数语法:

    聚合类型开窗函数

    ```

    sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])

    ```

    排序类型开窗函数

    ```

    ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])

    ```

    - 以row_number()开窗函数为例:

    ​        开窗函数row_number()是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排列的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TopN)。row_number()函数的使用格式如下:

    ```

    row_number() over (partition by 列名 order by 列名 desc) 行号列别名

    ```

    上述格式说明如下:

    partition by:按照某一列进行分组;

    order by:分组后按照某一列进行组内排序;

    desc:降序,默认升序。

    例如,统计每一个产品类别的销售额前3名,代码如下:

    ```

    package spark.demo.sql

    import org.apache.spark.sql.types._

    import org.apache.spark.sql.{Row, SparkSession}

    /**

     * 统计每一个产品类别的销售额前3名(相当于分组求TOPN)

     */

    object SparkSQLWindowFunctionDemo {

      def main(args: Array[String]): Unit = {

        //创建或得到SparkSession

        val spark = SparkSession.builder()

          .appName("SparkSQLWindowFunctionDemo")

          .master("local[*]")

          .getOrCreate()

        //第一步:创建测试数据(字段:日期、产品类别、销售额)

        val arr=Array(

          "2019-06-01,A,500",

          "2019-06-01,B,600",

          "2019-06-01,C,550",

          "2019-06-02,A,700",

          "2019-06-02,B,800",

          "2019-06-02,C,880",

          "2019-06-03,A,790",

          "2019-06-03,B,700",

          "2019-06-03,C,980",

          "2019-06-04,A,920",

          "2019-06-04,B,990",

          "2019-06-04,C,680"

        )

        //转为RDD[Row]

        val rowRDD=spark.sparkContext

          .makeRDD(arr)

          .map(line=>Row(

            line.split(",")(0),

            line.split(",")(1),

            line.split(",")(2).toInt

          ))

        //构建DataFrame元数据

        val structType=StructType(Array(

          StructField("date",StringType,true),

          StructField("type",StringType,true),

          StructField("money",IntegerType,true)

        ))

        //将RDD[Row]转为DataFrame

        val df=spark.createDataFrame(rowRDD,structType)

        //第二步:使用开窗函数取每一个类别的金额前3名

        df.createTempView("t_sales")    //创建临时视图

        //执行SQL查询

        spark.sql(

          "select date,type,money,rank from " +

            "(select date,type,money," +

            "row_number() over (partition by type order by money desc) rank "+

            "from t_sales) t " +

            "where t.rank<=3"

        ).show()

      }

    }

    ```


     

  • 相关阅读:
    QT多项目工程管理及编译输出设置
    力扣每日一题56:合并区间
    【大型电商项目开发】性能压测-优化-中间件对性能的影响-40
    位深度/像素/分辨率/图像大小的计算/帧率/刷新率
    三个pdf工具和浏览软件(pdftk,muppdf,epdfview)
    [论文笔记]Root Mean Square Layer Normalization
    C++——STL容器【map和set】
    IPSG应用在网络中的位置
    十三水中各种牌型判断LUA版
    影视广告创意与制作(一)
  • 原文地址:https://blog.csdn.net/shijiuhuaR/article/details/139765013