• 4 . DWD和ADS层


    7、DWD层

    流表和维表关联,可以使用lookup join ,当存在hbase或者mysql中的表发生改变时,可以动态的发生改变

    1、支付事实表

    数据仓库建模的方法:

    注意:Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。从kafka中source并且sink到kafka中去。

    学习一个新的函数:listAGG():连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。

    1. --1、创建kafka sink表
    2. CREATE TABLE gma_dwd.dwd_kafka_payment_info (
    3. id bigint,
    4. user_id BIGINT,
    5. payment_time STRING,
    6. payment_type STRING,
    7. province_id BIGINT,
    8. skus STRING,
    9. payment_price decimal(10,2),
    10. sku_num BIGINT,
    11. proc_time as PROCTIME(),
    12. PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
    13. ) WITH (
    14. 'connector' = 'upsert-kafka',
    15. 'topic' = 'dwd_payment_info',
    16. 'properties.bootstrap.servers' = 'master:9092',
    17. 'key.format' = 'json',
    18. 'value.format' = 'json'
    19. );
    20. --2、关联支付流水表,订单表,订单明细表,构建支付事实表
    21. insert into gma_dwd.dwd_kafka_payment_info
    22. select
    23. id,
    24. cast(user_id as BIGINT) as user_id,
    25. payment_time,
    26. payment_type,
    27. cast(province_id as BIGINT) as province_id,
    28. listAGG(cast(sku_id as STRING)) as skus,
    29. sum(order_price*sku_num) as payment_price,
    30. sum(sku_num) as sku_num
    31. from (
    32. select a.id,a.user_id,a.payment_time,a.payment_type,b.province_id,c.sku_id,c.order_price,cast(c.sku_num as bigint) as sku_num from
    33. gma_ods.ods_mysql_kafka_payment_info
    34. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
    35. inner join
    36. gma_ods.ods_mysql_kafka_order_info
    37. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
    38. on cast(a.order_id as bigint)=b.id
    39. inner join
    40. gma_ods.ods_mysql_kafka_order_detail
    41. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
    42. on b.id=c.order_id
    43. )
    44. as d
    45. group by
    46. id,user_id,payment_time,payment_type,province_id;
    47. -- 消费kafka
    48. kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic dwd_payment_info
    49. --在flink中查询数据
    50. select * from gma_dwd.dwd_kafka_payment_info /*+ OPTIONS('scan.startup.mode'='earliest-offset') */

    2、订单事实表

    1. -- 1、创建kafka sink表
    2. CREATE TABLE gma_dwd.dwd_kafka_order_info (
    3. id BIGINT,
    4. consignee STRING,
    5. consignee_tel STRING,
    6. delivery_address STRING,
    7. order_status STRING,
    8. user_id BIGINT,
    9. payment_way STRING,
    10. create_time TIMESTAMP(3),
    11. operate_time TIMESTAMP(3),
    12. expire_time TIMESTAMP(3),
    13. province_id BIGINT,
    14. skus STRING,
    15. total_amount decimal(10,2),
    16. proc_time as PROCTIME(),
    17. PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
    18. ) WITH (
    19. 'connector' = 'upsert-kafka',
    20. 'topic' = 'dwd_order_info',
    21. 'properties.bootstrap.servers' = 'master:9092',
    22. 'key.format' = 'json',
    23. 'value.format' = 'json'
    24. );
    25. -- 执行sql
    26. insert into gma_dwd.dwd_kafka_order_info
    27. select
    28. a.id,
    29. a.consignee,
    30. a.consignee_tel,
    31. a.delivery_address,
    32. a.order_status,
    33. a.user_id,
    34. a.payment_way,
    35. a.create_time,
    36. a.operate_time,
    37. a.expire_time,
    38. cast(a.province_id as bigint),
    39. b.skus,
    40. a.total_amount
    41. from
    42. gma_ods.ods_mysql_kafka_order_info
    43. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
    44. join
    45. (
    46. select order_id,listagg(cast(sku_id as STRING)) as skus from
    47. gma_ods.ods_mysql_kafka_order_detail
    48. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
    49. group by order_id
    50. )
    51. as b
    52. on a.id=b.order_id

    8、ADS层

    • 统计指标的方法论

      原子指标:下单金额,支付金额

      派生指标=原子指标+统计周期+业务限定+统计维度

    先在mysql中创建数据库gma_ads

    1、支付金额

    • 实时计算每个用户每天实时的支付金额
    • 实时计算每个地区每天的支付金额
    • 实时计算每种支付方式每天支付金额
    • 实时统计每个大区每天的支付金额
    • 实时统计不同性别每天支付金额
    1、实时计算每个用户每天实时的支付金额
    1. -- 1、创建msyql sink表
    2. -- flink sql jdbc sink表
    3. CREATE TABLE gma_ads.ads_mysql_user_day_sum_payment_price (
    4. user_id BIGINT,
    5. day_id STRING,
    6. sum_payment_price decimal(10,2),
    7. PRIMARY KEY (user_id,day_id) NOT ENFORCED -- 按照主键更新数据
    8. ) WITH (
    9. 'connector' = 'jdbc',
    10. 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
    11. 'table-name' = 'ads_mysql_user_day_sum_payment_price', -- 需要手动到数据库中创建表
    12. 'username' = 'root',
    13. 'password' = '123456'
    14. );
    15. -- 在mysql中创建表
    16. CREATE TABLE `ads_mysql_user_day_sum_payment_price` (
    17. `user_id` BIGINT NOT NULL,
    18. `day_id` varchar(255) NOT NULL,
    19. `sum_payment_price` decimal(10,2),
    20. PRIMARY KEY (`user_id`,`day_id`)
    21. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    22. --实时统计
    23. insert into gma_ads.ads_mysql_user_day_sum_payment_price
    24. select
    25. user_id,
    26. substr(payment_time,1,10) as day_id,
    27. sum(payment_price) as sum_payment_price
    28. from gma_dwd.dwd_kafka_payment_info
    29. group by user_id,substr(payment_time,1,10);
    2、实时计算每个地区每天的支付金额
    1. -- 1、创建msyql sink表
    2. -- flink sql jdbc sink表
    3. CREATE TABLE gma_ads.ads_mysql_proc_day_sum_payment_price (
    4. pro_name STRING,
    5. day_id STRING,
    6. sum_payment_price decimal(10,2),
    7. PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
    8. ) WITH (
    9. 'connector' = 'jdbc',
    10. 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
    11. 'table-name' = 'ads_mysql_proc_day_sum_payment_price', -- 需要手动到数据库中创建表
    12. 'username' = 'root',
    13. 'password' = '123456'
    14. );
    15. -- 在mysql中创建表
    16. CREATE TABLE `ads_mysql_proc_day_sum_payment_price` (
    17. `pro_name` varchar(255) NOT NULL,
    18. `day_id` varchar(255) NOT NULL,
    19. `sum_payment_price` decimal(10,2),
    20. PRIMARY KEY (`pro_name`,`day_id`)
    21. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    22. --实时统计
    23. -- 实时关联读取维度表获取省名
    24. insert into gma_ads.ads_mysql_proc_day_sum_payment_price
    25. select
    26. b.info.pro_name as pro_name,
    27. substr(payment_time,1,10) as day_id,
    28. sum(payment_price) as sum_payment_price
    29. from gma_dwd.dwd_kafka_payment_info as a
    30. LEFT JOIN
    31. gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
    32. on a.province_id=b.pro_id
    33. group by b.info.pro_name,substr(payment_time,1,10);

    2、下单笔数

    • 每个省份每天实时下单的数量
    • 每个大区每天实时下单的数量
    • 实时统计每个品牌每天下单的数量
    • 实时统计每个用户每天下单的数量
    1、每个省份每天实时下单的数量
    1. -- 创建mysql sin表
    2. CREATE TABLE gma_ads.ads_mysql_proc_day_order_num (
    3. pro_name STRING,
    4. day_id STRING,
    5. num bigint,
    6. PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
    7. ) WITH (
    8. 'connector' = 'jdbc',
    9. 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
    10. 'table-name' = 'ads_mysql_proc_day_order_num', -- 需要手动到数据库中创建表
    11. 'username' = 'root',
    12. 'password' = '123456'
    13. );
    14. -- 在mysql中创建表
    15. CREATE TABLE `ads_mysql_proc_day_order_num` (
    16. `pro_name` varchar(255) NOT NULL,
    17. `day_id` varchar(255) NOT NULL,
    18. `num` bigint,
    19. PRIMARY KEY (`pro_name`,`day_id`)
    20. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    2、每个大区每天实时下单的数量
    1. -- 创建mysql sin表
    2. CREATE TABLE gma_ads.ads_mysql_region_day_order_num (
    3. region_name STRING,
    4. day_id STRING,
    5. num bigint,
    6. PRIMARY KEY (region_name,day_id) NOT ENFORCED -- 按照主键更新数据
    7. ) WITH (
    8. 'connector' = 'jdbc',
    9. 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
    10. 'table-name' = 'ads_mysql_region_day_order_num', -- 需要手动到数据库中创建表
    11. 'username' = 'root',
    12. 'password' = '123456'
    13. );
    14. -- 在mysql中创建表
    15. CREATE TABLE `ads_mysql_region_day_order_num` (
    16. `region_name` varchar(255) NOT NULL,
    17. `day_id` varchar(255) NOT NULL,
    18. `num` bigint,
    19. PRIMARY KEY (`region_name`,`day_id`)
    20. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    3、实时统计每个品牌每天下单的数量
    1. LOAD MODULE hive WITH ('hive-version' = '1.2.1');
    2. select c.info.tm_name as tm_name,day_id,count(1) as num from (
    3. select
    4. sku_id,
    5. DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
    6. PROCTIME() as proc_time
    7. from
    8. gma_dwd.dwd_kafka_order_info as a,
    9. LATERAL TABLE(explode(split(skus,','))) t(sku_id)
    10. ) as b
    11. left join
    12. gma_dim.dim_hbase_item_info FOR SYSTEM_TIME AS OF b.proc_time as c
    13. on cast(b.sku_id as bigint)=c.sku_id
    14. group by c.info.tm_name,day_id
    在一个任务中执行多条sql
    1. EXECUTE STATEMENT SET
    2. BEGIN
    3. -- 每个省份每天实时下单的数量
    4. insert into gma_ads.ads_mysql_proc_day_order_num
    5. select
    6. b.info.pro_name,
    7. DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
    8. count(1) as num
    9. from
    10. gma_dwd.dwd_kafka_order_info as a
    11. left join
    12. gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
    13. on a.province_id=b.pro_id
    14. group by b.info.pro_name,DATE_FORMAT(create_time,'yyyy-MM-dd');
    15. -- 每个大区每天实时下单的数量
    16. insert into gma_ads.ads_mysql_region_day_order_num
    17. select
    18. b.info.region_name,
    19. DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
    20. count(1) as num
    21. from
    22. gma_dwd.dwd_kafka_order_info as a
    23. left join
    24. gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
    25. on a.province_id=b.pro_id
    26. group by b.info.region_name,DATE_FORMAT(create_time,'yyyy-MM-dd');
    27. END;

    9、问题

    1、flink将更新的数据批量写入hbase时数据丢失的问题 (BUG)

    • 解决方法:将批量写入改成条写入
      • 'sink.buffer-flush.max-rows'='0'
  • 相关阅读:
    RKE2部署高可用Rancher2.6.5
    使用@Constraint和自定义注解校验接口入参
    竞赛选题 深度学习人体语义分割在弹幕防遮挡上的实现 - python
    xv6源码阅读——虚拟内存
    【JavaScript】JS能力测试题:数组扁平化 | 判断质数 | 获取字符串的长度
    操作系统MIT6.S081:P5->Isolation & system call entry/exit
    最短路径算法
    【Java】JVM字节码分析
    MySQL分区、主从复制,数据库优化
    【疯狂Java】数组
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/126339344