• SparkSQL综合案例-省份维度的销售情况统计分析


    一、项目背景

    二、项目需求

            (1)需求

            ①各省销售指标,每个省份的销售额统计

            ②TOP3销售省份中,有多少家店铺日均销售额1000+

            ③TOP3省份中,各个省份的平均单价

            ④TOP3省份中,各个省份的支付类型比例

            (2)要求

            ①将需求结果写出到mysql

            ②将数据写入到Spark On Hive中

    三、代码实现

            (1)需求1:
    1. # cording:utf8
    2. '''
    3. 要求1:各省销售额统计
    4. 要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
    5. 要求3:TOP3省份中,各省的平均单单价
    6. 要求4:TOP3省份中,各个省份的支付类型比例
    7. '''
    8. from pyspark.sql import SparkSession
    9. from pyspark.sql import functions as F
    10. if __name__ == '__main__':
    11. spark = SparkSession.builder.appName('SQL_text').\
    12. master('local[*]').\
    13. config('spark.sql.shuffle.partitions', '2').\
    14. config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\
    15. config('hive.metastore.uris', 'thrift://pyspark01:9083').\
    16. enableHiveSupport().\
    17. getOrCreate()
    18. # 1.读取数据
    19. # 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串
    20. # 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据
    21. # 列值过滤(SparkSQL会自动做这个优化)
    22. df = spark.read.format('json').load('../../input/mini.json').\
    23. dropna(thresh=1, subset=['storeProvince']).\
    24. filter("storeProvince != 'null'").\
    25. filter('receivable < 10000').\
    26. select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')
    27. # TODO 1:各省销售额统计
    28. province_sale_df = df.groupBy('storeProvince').sum('receivable').\
    29. withColumnRenamed('sum(receivable)', 'money').\
    30. withColumn('money', F.round('money', 2)).\
    31. orderBy('money', ascending=False)
    32. province_sale_df.show(truncate=False)
    33. # 写出到Mysql
    34. province_sale_df.write.mode('overwrite').\
    35. format('jdbc').\
    36. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\
    37. option('dbtable', 'province_sale').\
    38. option('user', 'root').\
    39. option('password', 'root').\
    40. option('encoding', 'utf8').\
    41. save()
    42. # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    43. # 会将表写入到hive的数据仓库中
    44. province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')

            结果展示:

            MySQL数据展示:

            Hive数据展示:

            (2)需求2:
    1. # cording:utf8
    2. '''
    3. 要求1:各省销售额统计
    4. 要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
    5. 要求3:TOP3省份中,各省的平均单单价
    6. 要求4:TOP3省份中,各个省份的支付类型比例
    7. '''
    8. from pyspark.sql import SparkSession
    9. from pyspark.sql import functions as F
    10. from pyspark.storagelevel import StorageLevel
    11. if __name__ == '__main__':
    12. spark = SparkSession.builder.appName('SQL_text').\
    13. master('local[*]').\
    14. config('spark.sql.shuffle.partitions', '2').\
    15. config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\
    16. config('hive.metastore.uris', 'thrift://pyspark01:9083').\
    17. enableHiveSupport().\
    18. getOrCreate()
    19. # 1.读取数据
    20. # 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串
    21. # 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据
    22. # 列值过滤(SparkSQL会自动做这个优化)
    23. df = spark.read.format('json').load('../../input/mini.json').\
    24. dropna(thresh=1, subset=['storeProvince']).\
    25. filter("storeProvince != 'null'").\
    26. filter('receivable < 10000').\
    27. select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')
    28. # TODO 1:各省销售额统计
    29. province_sale_df = df.groupBy('storeProvince').sum('receivable').\
    30. withColumnRenamed('sum(receivable)', 'money').\
    31. withColumn('money', F.round('money', 2)).\
    32. orderBy('money', ascending=False)
    33. # # 写出到Mysql
    34. # province_sale_df.write.mode('overwrite').\
    35. # format('jdbc').\
    36. # option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\
    37. # option('dbtable', 'province_sale').\
    38. # option('user', 'root').\
    39. # option('password', 'root').\
    40. # option('encoding', 'utf8').\
    41. # save()
    42. #
    43. # # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    44. # # 会将表写入到hive的数据仓库中
    45. # province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')
    46. # TODO 需求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
    47. # 2.1 找到TOP3的销售省份
    48. top3_province_df = province_sale_df.limit(3).select('storeProvince').\
    49. withColumnRenamed('storeProvince', 'top3_province') # 对列名进行重命名,防止与province_sale_df的storeProvince冲突
    50. # 2.2 和原始的DF进行内关联,数据关联后,得到TOP3省份的销售数据
    51. top3_province_df_joined = df.join(top3_province_df, on=df['storeProvince'] == top3_province_df['top3_province'])
    52. # 因为需要多次使用到TOP3省份数据,所有对其进行持久化缓存
    53. top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
    54. # from_unixtime将秒级的日期数据转换为年月日数据
    55. # from_unixtime的精度是秒级,数据的精度是毫秒级,需要对数据进行进度的裁剪
    56. province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",
    57. F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-mm-dd").alias('day')).\
    58. sum('receivable').withColumnRenamed('sum(receivable)', 'money').\
    59. filter('money > 1000 ').\
    60. dropDuplicates(subset=['storeID']).\
    61. groupBy('storeProvince').count()
    62. province_hot_store_count_df.show()
    63. # 写出到Mysql
    64. province_sale_df.write.mode('overwrite'). \
    65. format('jdbc'). \
    66. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \
    67. option('dbtable', 'province_hot_store_count'). \
    68. option('user', 'root'). \
    69. option('password', 'root'). \
    70. option('encoding', 'utf8'). \
    71. save()
    72. # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    73. # 会将表写入到hive的数据仓库中
    74. province_sale_df.write.mode('overwrite').saveAsTable('default.province_hot_store_count', 'parquet')
    75. top3_province_df_joined.unpersist()

            结果展示:

            MySQL结果展示:

            Hive结果展示:

            (3)需求3:
    1. # cording:utf8
    2. '''
    3. 要求1:各省销售额统计
    4. 要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
    5. 要求3:TOP3省份中,各省的平均单单价
    6. 要求4:TOP3省份中,各个省份的支付类型比例
    7. '''
    8. from pyspark.sql import SparkSession
    9. from pyspark.sql import functions as F
    10. from pyspark.storagelevel import StorageLevel
    11. if __name__ == '__main__':
    12. spark = SparkSession.builder.appName('SQL_text').\
    13. master('local[*]').\
    14. config('spark.sql.shuffle.partitions', '2').\
    15. config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\
    16. config('hive.metastore.uris', 'thrift://pyspark01:9083').\
    17. enableHiveSupport().\
    18. getOrCreate()
    19. # 1.读取数据
    20. # 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串
    21. # 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据
    22. # 列值过滤(SparkSQL会自动做这个优化)
    23. df = spark.read.format('json').load('../../input/mini.json').\
    24. dropna(thresh=1, subset=['storeProvince']).\
    25. filter("storeProvince != 'null'").\
    26. filter('receivable < 10000').\
    27. select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')
    28. # TODO 1:各省销售额统计
    29. province_sale_df = df.groupBy('storeProvince').sum('receivable').\
    30. withColumnRenamed('sum(receivable)', 'money').\
    31. withColumn('money', F.round('money', 2)).\
    32. orderBy('money', ascending=False)
    33. # # 写出到Mysql
    34. # province_sale_df.write.mode('overwrite').\
    35. # format('jdbc').\
    36. # option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\
    37. # option('dbtable', 'province_sale').\
    38. # option('user', 'root').\
    39. # option('password', 'root').\
    40. # option('encoding', 'utf8').\
    41. # save()
    42. #
    43. # # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    44. # # 会将表写入到hive的数据仓库中
    45. # province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')
    46. # TODO 需求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
    47. # 2.1 找到TOP3的销售省份
    48. top3_province_df = province_sale_df.limit(3).select('storeProvince').\
    49. withColumnRenamed('storeProvince', 'top3_province') # 对列名进行重命名,防止与province_sale_df的storeProvince冲突
    50. # 2.2 和原始的DF进行内关联,数据关联后,得到TOP3省份的销售数据
    51. top3_province_df_joined = df.join(top3_province_df, on=df['storeProvince'] == top3_province_df['top3_province'])
    52. # 因为需要多次使用到TOP3省份数据,所有对其进行持久化缓存
    53. top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
    54. # from_unixtime将秒级的日期数据转换为年月日数据
    55. # from_unixtime的精度是秒级,数据的精度是毫秒级,需要对数据进行进度的裁剪
    56. province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",
    57. F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-mm-dd").alias('day')).\
    58. sum('receivable').withColumnRenamed('sum(receivable)', 'money').\
    59. filter('money > 1000 ').\
    60. dropDuplicates(subset=['storeID']).\
    61. groupBy('storeProvince').count()
    62. province_hot_store_count_df.show()
    63. # # 写出到Mysql
    64. # province_hot_store_count_df.write.mode('overwrite'). \
    65. # format('jdbc'). \
    66. # option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \
    67. # option('dbtable', 'province_hot_store_count'). \
    68. # option('user', 'root'). \
    69. # option('password', 'root'). \
    70. # option('encoding', 'utf8'). \
    71. # save()
    72. #
    73. # # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    74. # # 会将表写入到hive的数据仓库中
    75. # province_hot_store_count_df.write.mode('overwrite').saveAsTable('default.province_hot_store_count', 'parquet')
    76. # TODO 3:TOP3省份中,各省的平均单单价
    77. top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
    78. avg("receivable").\
    79. withColumnRenamed("avg(receivable)", "money").\
    80. withColumn("money", F.round("money", 2)).\
    81. orderBy("money", ascending=False)
    82. top3_province_order_avg_df.show(truncate=False)
    83. # 写出到Mysql
    84. top3_province_order_avg_df.write.mode('overwrite'). \
    85. format('jdbc'). \
    86. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \
    87. option('dbtable', 'top3_province_order_avg'). \
    88. option('user', 'root'). \
    89. option('password', 'root'). \
    90. option('encoding', 'utf8'). \
    91. save()
    92. # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    93. # 会将表写入到hive的数据仓库中
    94. top3_province_order_avg_df.write.mode('overwrite').saveAsTable('default.top3_province_order_avg', 'parquet')
    95. top3_province_df_joined.unpersist()

            结果展示

            MySQL与Hive结果展示:

            (4)需求4:
    1. # cording:utf8
    2. '''
    3. 要求1:各省销售额统计
    4. 要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
    5. 要求3:TOP3省份中,各省的平均单单价
    6. 要求4:TOP3省份中,各个省份的支付类型比例
    7. '''
    8. from pyspark.sql import SparkSession
    9. from pyspark.sql import functions as F
    10. from pyspark.storagelevel import StorageLevel
    11. from pyspark.sql.types import StringType
    12. if __name__ == '__main__':
    13. spark = SparkSession.builder.appName('SQL_text').\
    14. master('local[*]').\
    15. config('spark.sql.shuffle.partitions', '2').\
    16. config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\
    17. config('hive.metastore.uris', 'thrift://pyspark01:9083').\
    18. enableHiveSupport().\
    19. getOrCreate()
    20. # 1.读取数据
    21. # 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串
    22. # 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据
    23. # 列值过滤(SparkSQL会自动做这个优化)
    24. df = spark.read.format('json').load('../../input/mini.json').\
    25. dropna(thresh=1, subset=['storeProvince']).\
    26. filter("storeProvince != 'null'").\
    27. filter('receivable < 10000').\
    28. select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')
    29. # TODO 1:各省销售额统计
    30. province_sale_df = df.groupBy('storeProvince').sum('receivable').\
    31. withColumnRenamed('sum(receivable)', 'money').\
    32. withColumn('money', F.round('money', 2)).\
    33. orderBy('money', ascending=False)
    34. # # 写出到Mysql
    35. # province_sale_df.write.mode('overwrite').\
    36. # format('jdbc').\
    37. # option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\
    38. # option('dbtable', 'province_sale').\
    39. # option('user', 'root').\
    40. # option('password', 'root').\
    41. # option('encoding', 'utf8').\
    42. # save()
    43. #
    44. # # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    45. # # 会将表写入到hive的数据仓库中
    46. # province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')
    47. # TODO 需求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
    48. # 2.1 找到TOP3的销售省份
    49. top3_province_df = province_sale_df.limit(3).select('storeProvince').\
    50. withColumnRenamed('storeProvince', 'top3_province') # 对列名进行重命名,防止与province_sale_df的storeProvince冲突
    51. # 2.2 和原始的DF进行内关联,数据关联后,得到TOP3省份的销售数据
    52. top3_province_df_joined = df.join(top3_province_df, on=df['storeProvince'] == top3_province_df['top3_province'])
    53. # 因为需要多次使用到TOP3省份数据,所有对其进行持久化缓存
    54. top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
    55. # from_unixtime将秒级的日期数据转换为年月日数据
    56. # from_unixtime的精度是秒级,数据的精度是毫秒级,需要对数据进行进度的裁剪
    57. province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",
    58. F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-mm-dd").alias('day')).\
    59. sum('receivable').withColumnRenamed('sum(receivable)', 'money').\
    60. filter('money > 1000 ').\
    61. dropDuplicates(subset=['storeID']).\
    62. groupBy('storeProvince').count()
    63. province_hot_store_count_df.show()
    64. # # 写出到Mysql
    65. # province_hot_store_count_df.write.mode('overwrite'). \
    66. # format('jdbc'). \
    67. # option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \
    68. # option('dbtable', 'province_hot_store_count'). \
    69. # option('user', 'root'). \
    70. # option('password', 'root'). \
    71. # option('encoding', 'utf8'). \
    72. # save()
    73. #
    74. # # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    75. # # 会将表写入到hive的数据仓库中
    76. # province_hot_store_count_df.write.mode('overwrite').saveAsTable('default.province_hot_store_count', 'parquet')
    77. # TODO 3:TOP3省份中,各省的平均单单价
    78. top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
    79. avg("receivable").\
    80. withColumnRenamed("avg(receivable)", "money").\
    81. withColumn("money", F.round("money", 2)).\
    82. orderBy("money", ascending=False)
    83. top3_province_order_avg_df.show(truncate=False)
    84. # # 写出到Mysql
    85. # top3_province_order_avg_df.write.mode('overwrite'). \
    86. # format('jdbc'). \
    87. # option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \
    88. # option('dbtable', 'top3_province_order_avg'). \
    89. # option('user', 'root'). \
    90. # option('password', 'root'). \
    91. # option('encoding', 'utf8'). \
    92. # save()
    93. #
    94. # # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    95. # # 会将表写入到hive的数据仓库中
    96. # top3_province_order_avg_df.write.mode('overwrite').saveAsTable('default.top3_province_order_avg', 'parquet')
    97. # TODO 4:TOP3省份中,各个省份的支付类型比例
    98. top3_province_df_joined.createTempView("province_pay")
    99. # 自定义UDF
    100. def udf_func(percent):
    101. return str(round(percent * 100)) + "%"
    102. # 注册UDF
    103. my_udf = F.udf(udf_func, StringType())
    104. pay_type_df = spark.sql('''
    105. SELECT storeProvince, payType, (count(payType) / total) AS percent FROM
    106. (SELECT storeProvince, payType, count(1) OVER(PARTITION BY storeProvince) AS total FROM province_pay) AS sub
    107. GROUP BY storeProvince, payType, total
    108. ''').withColumn('percent', my_udf("percent"))
    109. pay_type_df.show()
    110. # 写出到Mysql
    111. pay_type_df.write.mode('overwrite'). \
    112. format('jdbc'). \
    113. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \
    114. option('dbtable', 'pay_type'). \
    115. option('user', 'root'). \
    116. option('password', 'root'). \
    117. option('encoding', 'utf8'). \
    118. save()
    119. # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive
    120. # 会将表写入到hive的数据仓库中
    121. top3_province_order_avg_df.write.mode('overwrite').saveAsTable('default.pay_type', 'parquet')
    122. top3_province_df_joined.unpersist()

           结果展示:

           MySQL结果展示:

            Hive结果展示:

    四、项目运行问题及解决方法

            报错:java.sql.BatchUpdateException: Incorrect string value: '\xE6\xB1\x9F\xE8\xA5\xBF...' for column 'storeProvince' atrow1

            原因:MySQL的UTF-8只支持3个字节的unicode字符,无法支持四个字节的Unicode字符

            解决办法:在MySQL控制台执行下列代码修改编码格式

  • 相关阅读:
    测试工作流程图,你一定要知道的
    【利用Selenium+autoIt实现文件上传】
    好用的读书网站
    Java集合(二):Map集合与Collections工具类
    Spring Cloud Alibaba(四)
    DJ12-2-2 算术运算指令
    OpenCV入门5——OpenCV的算术与位运算
    水管安装过滤器笔记
    C语言——结构体(位段)、联合体、枚举
    论文阅读 Dynamic Network Embedding by Modeling Triadic Closure Process
  • 原文地址:https://blog.csdn.net/2202_75347029/article/details/134083558