• SparkSQL项目实战


    1 准备数据

    我们这次Spark-sql操作所有的数据均来自Hive,首先在Hive中创建表,并导入数据。一共有3张表:1张用户行为表,1张城市表,1张产品表。

    1)将city_info.txt、product_info.txt、user_visit_action.txt上传到/opt/module/data

    [atguigu@hadoop102 module]$ mkdir data

    2)将创建对应的三张表

    1. hive (default)>
    2. CREATE TABLE `user_visit_action`(
    3.   `date` string,
    4.   `user_id` bigint,
    5.   `session_id` string,
    6.   `page_id` bigint,
    7.   `action_time` string,
    8.   `search_keyword` string,
    9.   `click_category_id` bigint,
    10.   `click_product_id` bigint, --点击商品id,没有商品用-1表示。
    11.   `order_category_ids` string,
    12.   `order_product_ids` string,
    13.   `pay_category_ids` string,
    14.   `pay_product_ids` string,
    15.   `city_id` bigint --城市id
    16. )
    17. row format delimited fields terminated by '\t';
    18. CREATE TABLE `city_info`(
    19.   `city_id` bigint, --城市id
    20.   `city_name` string, --城市名称
    21.   `area` string --区域名称
    22. )
    23. row format delimited fields terminated by '\t';
    24. CREATE TABLE `product_info`(
    25.   `product_id` bigint, -- 商品id
    26.   `product_name` string, --商品名称
    27.   `extend_info` string
    28. )
    29. row format delimited fields terminated by '\t';

    3)并加载数据

    1. hive (default)>
    2. load data local inpath '/opt/module/data/user_visit_action.txt' into table user_visit_action;
    3. load data local inpath '/opt/module/data/product_info.txt' into table product_info;
    4. load data local inpath '/opt/module/data/city_info.txt' into table city_info;

    4)测试一下三张表数据是否正常

    1. hive (default)>
    2. select * from user_visit_action limit 5;
    3. select * from product_info limit 5;
    4. select * from city_info limit 5;

    2 需求:各区域热门商品Top3

    2.1 需求简介

    这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

    例如:

    地区

    商品名称

    点击次数

    城市备注

    华北

    商品A

    100000

    北京21.2%,天津13.2%,其他65.6%

    华北

    商品P

    80200

    北京63.0%,太原10%,其他27.0%

    华北

    商品M

    40000

    北京63.0%,太原10%,其他27.0%

    东北

    商品J

    92000

    大连28%,辽宁17.0%,其他 55.0%

    2.2 思路分析

    1. CREATE TABLE `user_visit_action`(
    2.   `datestring,
    3.   `user_id` bigint,
    4.   `session_id` string,
    5.   `page_id` bigint,
    6.   `action_timestring,
    7.   `search_keyword` string,
    8.   `click_category_id` bigint,
    9.   `click_product_id` bigint, --点击商品id,没有商品用-1表示。
    10.   `order_category_ids` string,
    11.   `order_product_ids` string,
    12.   `pay_category_ids` string,
    13.   `pay_product_ids` string,
    14.   `city_id` bigint --城市id
    15. )
    16. CREATE TABLE `city_info`(
    17.   `city_id` bigint, --城市id
    18.   `city_name` string, --城市名称
    19.   `areastring --区域名称
    20. )
    21. CREATE TABLE `product_info`(
    22.   `product_id` bigint, -- 商品id
    23.   `product_name` string, --商品名称
    24.   `extend_info` string
    25. )
    26. city_remark
    27. IN: 城市名称 String
    28. BUFF: totalcnt总点击量,Map[(cityName, 点击数量)]
    29. OUT:城市备注 String
    30. select
    31.    c.area, --地区
    32.    c.city_name, -- 城市
    33.    p.product_name, -- 商品名称
    34.    v.click_product_id -- 点击商品id
    35. from user_visit_action v
    36. join city_info c
    37. on v.city_id = c.city_id
    38. join product_info p
    39. on v.click_product_id = p.product_id
    40. where click_product_id > -1
    41. select
    42.     t1.area, --地区
    43.     t1.product_name, -- 商品名称
    44. count(*) click_count, -- 商品点击次数
    45. city_remark(t1.city_name) --城市备注
    46. from t1
    47. group by t1.area, t1.product_name
    48. select
    49. *,
    50. rank() over(partition by t2.area order by t2.click_count desc) rank -- 每个区域内按照点击量,倒序排行
    51. from t2
    52. select
    53. *
    54. from t3
    55. where rank <= 3

    使用Spark-SQL来完成复杂的需求,可以使用UDF或UDAF

    (1)查询出来所有的点击记录,并与city_info表连接,得到每个城市所在的地区,与 Product_info表连接得到商品名称

    (2)按照地区和商品名称分组,统计出每个商品在每个地区的总点击次数

    (3)每个地区内按照点击次数降序排列

    (4)只取前三名,并把结果保存在数据库中

    (5)城市备注需要自定义UDAF函数

    2.3 代码实现

    1. package com.atguigu.sparksql.demo;
    2. import lombok.Data;
    3. import org.apache.spark.SparkConf;
    4. import org.apache.spark.sql.*;
    5. import org.apache.spark.sql.expressions.Aggregator;
    6. import java.io.Serializable;
    7. import java.util.ArrayList;
    8. import java.util.HashMap;
    9. import java.util.TreeMap;
    10. import java.util.function.BiConsumer;
    11. import static org.apache.spark.sql.functions.udaf;
    12. public class Test01_Top3 {
    13. public static void main(String[] args) {
    14. // 1. 创建sparkConf配置对象
    15. SparkConf conf = new SparkConf().setAppName("sql").setMaster("local[*]");
    16. // 2. 创建sparkSession连接对象
    17. SparkSession spark = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate();
    18. // 3. 编写代码
    19. // 将3个表格数据join在一起
    20. Dataset<Row> t1DS = spark.sql("select \n" +
    21. "\tc.area,\n" +
    22. "\tc.city_name,\n" +
    23. "\tp.product_name\n" +
    24. "from\n" +
    25. "\tuser_visit_action u\n" +
    26. "join\n" +
    27. "\tcity_info c\n" +
    28. "on\n" +
    29. "\tu.city_id=c.city_id\n" +
    30. "join\n" +
    31. "\tproduct_info p\n" +
    32. "on\n" +
    33. "\tu.click_product_id=p.product_id");
    34.         t1DS.createOrReplaceTempView("t1");
    35.         spark.udf().register("cityMark",udaf(new CityMark(),Encoders.STRING()));
    36. // 将区域内的产品点击次数统计出来
    37. Dataset<Row> t2ds = spark.sql("select \n" +
    38. "\tarea,\n" +
    39. "\tproduct_name,\n" +
    40. "\tcityMark(city_name) mark,\n" +
    41. "\tcount(*) counts\n" +
    42. "from\t\n" +
    43. "\tt1\n" +
    44. "group by\n" +
    45. "\tarea,product_name");
    46. //        t2ds.show(false);
    47.         t2ds.createOrReplaceTempView("t2");
    48. // 对区域内产品点击的次数进行排序  找出区域内的top3
    49.         spark.sql("select\n" +
    50. "\tarea,\n" +
    51. "\tproduct_name,\n" +
    52. "\tmark,\n" +
    53. "\trank() over (partition by area order by counts desc) rk\n" +
    54. "from \n" +
    55. "\tt2").createOrReplaceTempView("t3");
    56. // 使用过滤  取出区域内的top3
    57.         spark.sql("select\n" +
    58. "\tarea,\n" +
    59. "\tproduct_name,\n" +
    60. "\tmark \n" +
    61. "from\n" +
    62. "\tt3\n" +
    63. "where \n" +
    64. "\trk < 4").show(50,false);
    65. // 4. 关闭sparkSession
    66.         spark.close();
    67. }
    68.     @Data
    69. public static class Buffer implements Serializable {
    70. private Long totalCount;
    71. private HashMap<String,Long> map;
    72. public Buffer() {
    73. }
    74. public Buffer(Long totalCount, HashMap<String, Long> map) {
    75.             this.totalCount = totalCount;
    76.             this.map = map;
    77. }
    78. }
    79. public static class CityMark extends Aggregator<String, Buffer, String> {
    80. public static class CityCount {
    81. public String name;
    82. public Long count;
    83. public CityCount(String name, Long count) {
    84.                 this.name = name;
    85.                 this.count = count;
    86. }
    87. public CityCount() {
    88. }
    89. }
    90. public static class CompareCityCount implements Comparator<CityCount> {
    91. /**
    92.              * 默认倒序
    93.              * @param o1
    94.              * @param o2
    95.              * @return
    96.              */
    97.             @Override
    98. public int compare(CityCount o1, CityCount o2) {
    99. if (o1.count > o2.count) {
    100. return -1;
    101. } else return o1.count.equals(o2.count) ? 0 : 1;
    102. }
    103. }
    104.         @Override
    105. public Buffer zero() {
    106. return new Buffer(0L, new HashMap<String, Long>());
    107. }
    108. /**
    109.          * 分区内的预聚合
    110.          *
    111.          * @param b map(城市,sum)
    112.          * @param a 当前行表示的城市
    113.          * @return
    114.          */
    115.         @Override
    116. public Buffer reduce(Buffer b, String a) {
    117. HashMap<String, Long> hashMap = b.getMap();
    118. // 如果map中已经有当前城市  次数+1
    119. // 如果map中没有当前城市    0+1
    120.             hashMap.put(a, hashMap.getOrDefault(a, 0L) + 1);
    121.             b.setTotalCount(b.getTotalCount() + 1L);
    122. return b;
    123. }
    124. /**
    125.          * 合并多个分区间的数据
    126.          *
    127.          * @param b1 (北京,100),(上海,200)
    128.          * @param b2 (天津,100),(上海,200)
    129.          * @return
    130.          */
    131.         @Override
    132. public Buffer merge(Buffer b1, Buffer b2) {
    133.             b1.setTotalCount(b1.getTotalCount() + b2.getTotalCount());
    134. HashMap<String, Long> map1 = b1.getMap();
    135. HashMap<String, Long> map2 = b2.getMap();
    136. // 将map2中的数据放入合并到map1
    137.             map2.forEach(new BiConsumer<String, Long>() {
    138.                 @Override
    139. public void accept(String s, Long aLong) {
    140.                     map1.put(s, aLong + map1.getOrDefault(s, 0L));
    141. }
    142. });
    143. return b1;
    144. }
    145. /**
    146.          * map => {(上海,200),(北京,100),(天津,300)}
    147.          *
    148.          * @param reduction
    149.          * @return
    150.          */
    151.         @Override
    152. public String finish(Buffer reduction) {
    153. Long totalCount = reduction.getTotalCount();
    154. HashMap<String, Long> map = reduction.getMap();
    155. // 需要对map中的value次数进行排序
    156. ArrayList<CityCount> cityCounts = new ArrayList<>();
    157. // 将map中的数据放入到treeMap中 进行排序
    158. map.forEach(new BiConsumer<String, Long>() {
    159.                 @Override
    160. public void accept(String s, Long aLong) {
    161.                     cityCounts.add(new CityCount(s, aLong));
    162. }
    163. });
    164.             cityCounts.sort(new CompareCityCount());
    165. ArrayList<String> resultMark = new ArrayList<>();
    166. Double sum = 0.0;
    167. // 当前没有更多的城市数据  或者  已经找到两个城市数据了  停止循环
    168. while (!(cityCounts.size() == 0) && resultMark.size() < 2) {
    169. CityCount cityCount = cityCounts.get(0);
    170.                 resultMark.add(cityCount.name + String.format("%.2f",cityCount.count.doubleValue() / totalCount * 100) + "%");
    171.                 cityCounts.remove(0);
    172. }
    173. // 拼接其他城市
    174. if (cityCounts.size() > 0) {
    175.                 resultMark.add("其他" + String.format("%.2f", 100sum) + "%");
    176. }
    177. StringBuilder cityMark = new StringBuilder();
    178. for (String s : resultMark) {
    179.                 cityMark.append(s).append(",");
    180. }
    181. return cityMark.substring(0, cityMark.length() - 1);
    182. }
    183.         @Override
    184. public Encoder<Buffer> bufferEncoder() {
    185. return Encoders.javaSerialization(Buffer.class);
    186. }
    187.         @Override
    188. public Encoder<String> outputEncoder() {
    189. return Encoders.STRING();
    190. }
    191. }
    192. }

  • 相关阅读:
    python3-python中的GUI,Tkinter的使用,抓取小米应用商店应用列表名称
    2023年十大零日漏洞攻击
    古玩交易NFT数字藏品平台
    SpringBoot +MyBatis批量插入数据
    Ubuntu安装cuda
    res_config_settings.py文件详解
    677. 键值映射-字符树算法应用
    sublime text3安装SublimeREPL实现python中交互输入input,配置过程
    【C++】入门篇-关键字&&命名空间&&输入输出&&缺省参数
    C和指针 第14章 预处理器 14.1 预定义符号
  • 原文地址:https://blog.csdn.net/shangjg03/article/details/134361083