• SparkSQL系列-4、数据处理分析


    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    传送门:大数据系列文章目录

    官方网址http://spark.apache.org/http://spark.apache.org/sql/
    在这里插入图片描述

    有几种方式处理?

    在SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供两种方式分析处理数据,正如前面案例【词频统计WordCount】两种方式:

    DSL

    DSL(domain-specific language)编程,调DataFrame/Dataset API(函数),类似RDD中函数;

    SQL

    SQL 编程,将DataFrame/Dataset注册为临时视图或表,编写SQL语句,类似HiveQL;

    两种方式底层转换为RDD操作,包括性能优化完全一致,在实际项目中语句不通的习惯及业务灵活选择。比如机器学习相关特征数据处理,习惯使用DSL编程;比如数据仓库中数据ETL和报表分析,习惯使用SQL编程。无论哪种方式,都是相通的,必须灵活使用掌握。

    基于DSL分析

    调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL语句函数,部分截图如下:
    在这里插入图片描述
    类似SQL语法函数:调用Dataset中API进行数据分析, Dataset中涵盖很多函数,大致分类如下:

    1、选择函数select:选取某些列的值
    在这里插入图片描述
    2、过滤函数filter/where:设置过滤条件,类似SQL中WHERE语句
    在这里插入图片描述
    3、分组函数groupBy/rollup/cube:对某些字段分组,在进行聚合统计
    在这里插入图片描述
    4、聚合函数agg:通常与分组函数连用,使用一些count、 max、 sum等聚合函数操作
    在这里插入图片描述
    5、排序函数sort/orderBy:按照某列的值进行排序(升序ASC或者降序DESC)
    在这里插入图片描述
    6、限制函数limit:获取前几条数据,类似RDD中take函数
    在这里插入图片描述
    7、重命名函数withColumnRenamed:将某列的名称重新命名
    在这里插入图片描述

    8、删除函数drop:删除某些列
    在这里插入图片描述
    9、增加列函数withColumn:当某列存在时替换值,不存在时添加此列
    在这里插入图片描述

    上述函数在实际项目中经常使用,尤其数据分析处理的时候,其中要注意, 调用函数时,通常指定某个列名称,传递Column对象,通过隐式转换转换字符串String类型为Column对象。
    在这里插入图片描述
    Dataset/DataFrame中转换函数,类似RDD中Transformation函数,使用差不多:
    在这里插入图片描述

    基于SQL分析

    将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:

    第一步、注册为临时视图
    在这里插入图片描述
    第二步、编写SQL,执行分析
    在这里插入图片描述
    其中SQL语句类似Hive中SQL语句,查看Hive官方文档, SQL查询分析语句语法,官方文档文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
    在这里插入图片描述

    案例:电影评分数据分析

    使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:

    对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)

    数据集ratings.dat总共100万条数据,数据格式如下,每行数据各个字段之间使用双冒号分开:

    1::1193::5::978300760
    1::661::3::978302109
    1::914::3::978301968
    1::3408::4::978300275
    1::2355::5::978824291
    1::1197::3::978302268
    1::1287::5::978302039
    1::2804::5::978300719
    1::594::4::978302268
    1::919::4::978301368
    1::595::5::978824268
    1::938::4::978301752
    1::2398::4::978302281
    1::2918::4::978302124
    1::1035::5::978301753
    1::2791::4::978302188
    1::2687::3::978824268
    1::2018::4::978301777
    1::3105::5::978301713
    1::2797::4::978302039
    1::2321::3::978302205
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    数据处理分析步骤如下:
    ⚫ 第一步、读取电影评分数据,从本地文件系统读取
    ⚫ 第二步、转换数据,指定Schema信息,封装到DataFrame
    ⚫ 第三步、基于SQL方式分析
    ⚫ 第四步、基于DSL方式分析

    数据 ETL

    读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,代码如下:

        val spark: SparkSession = SparkSession.builder()
    //    SparkSQL程序不论数据量的多少,在经过聚合shuffle时,RDD分区数会变为200
          .config("spark.sql.shuffle.partitions",2)
          .appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
        //引入隐式转换
        import spark.implicits._
        //引入DSL函数库
        import org.apache.spark.sql.functions._
    
        //todo:2-实现处理的逻辑:统计平均评分最高的前10部电影【每部电影至少被评分2000次】
        val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")
    
        //step2:转换数据
        val rsData: RDD[(String, String, Double, Long)] = inputRDD.map(line => {
          val Array(userId, itemId, rating, timestamp) = line.split("::")
          (userId, itemId, rating.toDouble, timestamp.toLong)
        })
        //所有字段重命名
        val rsdf: DataFrame = rsData.toDF("userId", "itemId", "rating", "timestamp")
        /*
    	root
    	|-- userId: string (nullable = true)
    	|-- movieId: string (nullable = true)
    	|-- rating: double (nullable = false)
    	|-- timestamp: long (nullable = false)
    	*/
    	//ratingsDF.printSchema()
    	/*
    	+------+-------+------+---------+
    	|userId|movieId|rating|timestamp|
    	+------+-------+------+---------+
    	| 1| 1193| 5.0|978300760|
    	| 1| 661| 3.0|978302109|
    	| 1| 594| 4.0|978302268|
    	| 1| 919| 4.0|978301368|
    	+------+-------+------+---------+
    */
    	//ratingsDF.show(4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    使用SQL分析

    首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下:

    import model.MovieRating
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
    
    /**
     * @author: lwh
     * @date: 2022/08/15
     *        使用电影评分数据进行数据分析,分别使用 DSL编程和SQL编程,熟悉数据处理函数及SQL使
     *        用,业务需求说明:
     *        对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分
     *        的次数大于2000)。
     **/
    object RatingsSparkSQL {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
    
        import spark.implicits._
    
        val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")
    
        //RDD反射构建Schema
        val dataRdd: RDD[MovieRating] = inputRDD.map(line => {
          val Array(userId, itemId, rating, timestamp) = line.split("::")
          MovieRating(userId.toInt, itemId.toInt, rating.toInt, timestamp.toInt)
        })
        val df: DataFrame = dataRdd.toDF()
        df.show()
        println("====================")
        val ds: Dataset[MovieRating] = dataRdd.toDS()
        ds.show()
        println("====================")
    
        df.createOrReplaceTempView("data_df_tmp")
        ds.createOrReplaceTempView("data_ds_tmp")
    
        val rsData: DataFrame = spark.sql(
          """
            |select
            | itemId,
            | round(avg(rating),2) as rat,
            | count(1) as cnt
            |from data_df_tmp
            |group by itemId having cnt > 2000
            |order by rat desc,cnt desc
            |limit 10
            |""".stripMargin)
    
        rsData.printSchema()
        rsData.show()
    
        rsData.write.mode(SaveMode.Overwrite).csv("datas/sparksql/output1")
        spark.stop()
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    运行程序结果如下:

    +------+--------+----+
    |itemId|avg_rate| cnt|
    +------+--------+----+
    |   318|    4.55|2227|
    |   858|    4.52|2223|
    |   527|    4.51|2304|
    |  1198|    4.48|2514|
    |   260|    4.45|2991|
    |  2762|    4.41|2459|
    |   593|    4.35|2578|
    |  2028|    4.34|2653|
    |  2858|    4.32|3428|
    |  2571|    4.32|2590|
    +------+--------+----+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    使用DSL分析

    调用Dataset中函数,采用链式编程分析数据,核心代码如下:

    import java.util.Properties
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
    
    /**
     * @author: lwh
     * @date: 2022/08/15
     *        使用电影评分数据进行数据分析,分别使用 DSL编程和SQL编程,熟悉数据处理函数及SQL使
     *        用,业务需求说明:
     *        对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分
     *        的次数大于2000)。
     **/
    object RatingsSparkDSL {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
    //    SparkSQL程序不论数据量的多少,在经过聚合shuffle时,RDD分区数会变为200
          .config("spark.sql.shuffle.partitions",2)
          .appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
        //引入隐式转换
        import spark.implicits._
        //引入DSL函数库
        import org.apache.spark.sql.functions._
    
        //todo:2-实现处理的逻辑:统计平均评分最高的前10部电影【每部电影至少被评分2000次】
        val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")
    
        //step2:转换数据
        val rsData: RDD[(String, String, Double, Long)] = inputRDD.map(line => {
          val Array(userId, itemId, rating, timestamp) = line.split("::")
          (userId, itemId, rating.toDouble, timestamp.toLong)
        })
        //所有字段重命名
        val rsdf: DataFrame = rsData.toDF("userId", "itemId", "rating", "timestamp")
        //列的过滤,将用到的列过滤:itemId,rating
        val rsData2: Dataset[Row] = rsdf.select($"itemId",$"rating").groupBy($"itemId")
          .agg(
            //统计平均分
            round(avg($"rating"), 2).as("avg_rate"),
            //统计评分次数
            count($"itemId").as("cnt")
          )
          //过滤:评分次数大于2000
          .where($"cnt".gt(2000))
          //排序
          .orderBy($"avg_rate".desc, $"cnt".desc)
          .limit(10)
        //step3:保存结果
        rsData2.printSchema()
        rsData2.show()
    
        //保存到MySQL
        rsData2
          .write
          .mode(SaveMode.Overwrite)
            .option("user","root")
            .option("password","123456")
            .jdbc("jdbc:mysql://localhost:3306/test?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","test.tb_top10_movies",new Properties())
    
    //    Thread.sleep(1000000000000000L)
    
    
        spark.stop()
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    其中使用SparkSQL中自带函数库functions,org.apache.spark.sql.functions中,包含常用函数,有些与Hive中函数库类似,但是名称不一样。
    在这里插入图片描述
    使用需要导入函数库: import org.apache.spark.sql.functions._

    保存结果数据

    将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中。

    // TODO: 将分析的结果数据保存MySQL数据库和CSV文件
    // 结果DataFrame被使用多次,缓存
    resultDF.persist(StorageLevel.MEMORY_AND_DISK)
    // 1. 保存MySQL数据库表汇总
    resultDF
    	.coalesce(1) // 考虑降低分区数目
    	.write
    	.mode("overwrite")
    	.option("driver", "com.mysql.cj.jdbc.Driver")
    	.option("user", "root")
    	.option("password", "123456")
    	.jdbc("jdbc:mysql://localhost:3306/serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","db_test.tb_top10_movies",new Properties())
    // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
    resultDF
    	.coalesce(1)
    	.write.mode("overwrite")
    	.csv("datas/top10-movies")
    // 释放缓存数据
    resultDF.unpersist()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    查看数据库中结果表的数据:
    在这里插入图片描述

    案例完整代码

    上面的SQL和DSL已经是完整代码了,再来一份整合在一起的完整代码。

    电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    import org.apache.spark.storage.StorageLevel
    
    /**
     * 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
     */
    object SparkTop10Movie {
      def main(args: Array[String]): Unit = {
        // 构建SparkSession实例对象
        val spark: SparkSession = SparkSession.builder()
          .master("local[4]")
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          // TODO: 设置shuffle时分区数目
          .config("spark.sql.shuffle.partitions", "4")
          .getOrCreate()
        // 导入隐式转换
        import spark.implicits._
        // 1. 读取电影评分数据,从本地文件系统读取
        val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
        // 2. 转换数据
        val ratingsDF: DataFrame = rawRatingsDS
          // 过滤数据
          .filter(line => null != line && line.trim.split("::").length == 4)
          // 提取转换数据
          .mapPartitions { iter =>
            iter.map { line =>
              // 按照分割符分割,拆箱到变量中
              val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
              // 返回四元组
              (userId, movieId, rating.toDouble, timestamp.toLong)
            }
          }
          // 指定列名添加Schema
          .toDF("userId", "movieId", "rating", "timestamp")
        /*
        root
        |-- userId: string (nullable = true)
        |-- movieId: string (nullable = true)
        |-- rating: double (nullable = false)
        |-- timestamp: long (nullable = false)
        */
        //ratingsDF.printSchema()
        /*
        +------+-------+------+---------+
        |userId|movieId|rating|timestamp|
        +------+-------+------+---------+
        | 1| 1193| 5.0|978300760|
        | 1| 661| 3.0|978302109|
        | 1| 594| 4.0|978302268|
        | 1| 919| 4.0|978301368|
        +------+-------+------+---------+
        */
        //ratingsDF.show(4)
        // TODO: 基于SQL方式分析
        // 第一步、注册DataFrame为临时视图
        ratingsDF.createOrReplaceTempView("view_temp_ratings")
        // 第二步、编写SQL
        val top10MovieDF: DataFrame = spark.sql(
          """
            |SELECT
            | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
            |FROM
            | view_temp_ratings
            |GROUP BY
            | movieId
            |HAVING
            | cnt_rating > 2000
            |ORDER BY
            | avg_rating DESC, cnt_rating DESC
            |LIMIT
            | 10
          """.stripMargin)
        //top10MovieDF.printSchema()
        top10MovieDF.show(10, truncate = false)
        println("===============================================================")
        // TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
        import org.apache.spark.sql.functions._
        val resultDF: DataFrame = ratingsDF
          // 选取字段
          .select($"movieId", $"rating")
          // 分组:按照电影ID,获取平均评分和评分次数
          .groupBy($"movieId")
          .agg( //
            round(avg($"rating"), 2).as("avg_rating"), //
            count($"movieId").as("cnt_rating") //
          )
    
          // 过滤:评分次数大于2000
          .filter($"cnt_rating" > 2000)
          // 排序:先按照评分降序,再按照次数降序
          .orderBy($"avg_rating".desc, $"cnt_rating".desc)
          // 获取前10
          .limit(10)
        //resultDF.printSchema()
        resultDF.show(10)
        // TODO: 将分析的结果数据保存MySQL数据库和CSV文件
        // 结果DataFrame被使用多次,缓存
        resultDF.persist(StorageLevel.MEMORY_AND_DISK)
        // 1. 保存MySQL数据库表汇总
        resultDF
          .coalesce(1) // 考虑降低分区数目
          .write
          .mode("overwrite")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("user", "root")
          .option("password", "123456")
          .jdbc(
            "jdbc:mysql://localhost:3306/test?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
            "test.tb_top10_movies",
            new Properties()
          )
        // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
        resultDF
          .coalesce(1)
          .write.mode("overwrite")
          .csv("datas/top10-movies")
        // 释放缓存数据
        resultDF.unpersist()
        // 应用结束,关闭资源
        Thread.sleep(10000000)
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    Shuffle 分区数目

    运行上述程序时,查看WEB UI监控页面发现, 某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
    在这里插入图片描述
    原因: 在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。 在构建SparkSession实例对象时,设置参数的值:

        // 构建SparkSession实例对象
        val spark: SparkSession = SparkSession.builder()
          .master("local[4]")
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          // TODO: 设置shuffle时分区数目
          .config("spark.sql.shuffle.partitions", "4")
          .getOrCreate()
        // 导入隐式转换
        import spark.implicits._
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    下回分解

    下篇文章我们来看下DataSet的分析,以及RDD、DF与DS的转换。

  • 相关阅读:
    MDM数据质量应用说明
    FFmpeg实现fmp4+h265 aac切片命令
    C++学习day--24 推箱子游戏图像化开发
    多步验证Odoo功能模块
    OpenGL_Learn07(变换)
    【Python】【selenium】为什么结合selenium+beautiful Soup能够大大提高爬虫效率
    MySQL场景面试,你是如何进行SQL优化的?
    递归时间复杂度分析 && master公式
    DHCP工具分配IDRAC IP
    Web阶段一 静态网页
  • 原文地址:https://blog.csdn.net/l848168/article/details/126340525