• 尚硅谷大数据项目《在线教育之实时数仓》笔记005


    视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

    目录

    第9章 数仓开发之DWD层

    P031

    P032

    P033

    P034

    P035

    P036

    P037

    P038

    P039

    P040


    第9章 数仓开发之DWD层

    P031

    DWD层设计要点:

    (1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。

    (2)DWD层表名的命名规范为dwd_数据域_表名

    存放事实表,从kafka的topic_log和topic_db中读取需要用到的业务流程相关数据,将业务流程关联起来做成明细数据写回kafka当中。

    尚硅谷大数据学科全套教程\3.尚硅谷大数据学科--项目实战\尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系

    在线教育实时业务总线矩阵.xlsx

    9.1.3 图解

    P032

    1. package com.atguigu.edu.realtime.app.dwd.log;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONArray;
    4. import com.alibaba.fastjson.JSONObject;
    5. import com.atguigu.edu.realtime.util.DateFormatUtil;
    6. import com.atguigu.edu.realtime.util.EnvUtil;
    7. import com.atguigu.edu.realtime.util.KafkaUtil;
    8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    9. import org.apache.flink.api.common.state.ValueState;
    10. import org.apache.flink.api.common.state.ValueStateDescriptor;
    11. import org.apache.flink.configuration.Configuration;
    12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    13. import org.apache.flink.streaming.api.datastream.KeyedStream;
    14. import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
    15. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    16. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    17. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    18. import org.apache.flink.streaming.api.functions.ProcessFunction;
    19. import org.apache.flink.util.Collector;
    20. import org.apache.flink.util.OutputTag;
    21. /**
    22. * @author
    23. * @create 2023-04-21 14:01
    24. */
    25. public class BaseLogApp {
    26. public static void main(String[] args) throws Exception {
    27. //TODO 1 创建环境设置状态后端
    28. StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
    29. //TODO 2 从kafka中读取主流数据
    30. String topicName = "topic_log";
    31. String groupId = "base_log_app";
    32. DataStreamSource baseLogSource = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId),
    33. WatermarkStrategy.noWatermarks(),
    34. "base_log_source"
    35. );
    36. //TODO 3 对数据进行清洗转换
    37. // 3.1 定义侧输出流
    38. OutputTag dirtyStreamTag = new OutputTag("dirtyStream") {
    39. };
    40. // 3.2 清洗转换
    41. SingleOutputStreamOperator cleanedStream = baseLogSource.process(new ProcessFunction() {
    42. @Override
    43. public void processElement(String value, Context ctx, Collector out) throws Exception {
    44. try {
    45. JSONObject jsonObject = JSON.parseObject(value);
    46. out.collect(jsonObject);
    47. } catch (Exception e) {
    48. ctx.output(dirtyStreamTag, value);
    49. }
    50. }
    51. });
    52. // 3.3 将脏数据写出到kafka对应的主题
    53. SideOutputDataStream dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);
    54. String dirtyTopicName = "dirty_data";
    55. dirtyStream.sinkTo(KafkaUtil.getKafkaProducer(dirtyTopicName, "dirty_trans"));
    56. //TODO 4 新老访客标记修复
    57. //TODO 5 数据分流
    58. //TODO 6 写出到kafka不同的主题
    59. //TODO 7 执行任务
    60. }
    61. }

    P033

    KafkaUtil.java

    P034

    新老访客逻辑介绍

    P035

    BaseLogApp.java

    //TODO 4 新老访客标记修复

    1. [atguigu@node001 log]$ pwd
    2. /opt/module/data_mocker/01-onlineEducation/log
    3. [atguigu@node001 log]$ cat -n 200 app.2023-09-19.log
    4. {"common":{"ar":"26","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_188","os":"iOS 13.3.1","sc":"1","sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9","uid":"20","vc":"v2.1.134"},"page":{"during_time":901000,"item":"173","item_type":"paper_id","last_page_id":"course_detail","page_id":"exam"},"ts":1645456489411}
    5. {
    6. "common":{
    7. "ar":"26",
    8. "ba":"iPhone",
    9. "ch":"Appstore",
    10. "is_new":"0",
    11. "md":"iPhone 8",
    12. "mid":"mid_188",
    13. "os":"iOS 13.3.1",
    14. "sc":"1",
    15. "sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9",
    16. "uid":"20",
    17. "vc":"v2.1.134"
    18. },
    19. "page":{
    20. "during_time":901000,
    21. "item":"173",
    22. "item_type":"paper_id",
    23. "last_page_id":"course_detail",
    24. "page_id":"exam"
    25. },
    26. "ts":1645456489411
    27. }

    P036

    BaseLogApp.java

    //TODO 5 数据分流

    P037

    //TODO 6 写出到kafka不同的主题

    启动 hadoop、zookeeper、kafka、flume(f1)

    1. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic

    2. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic action_topic

    3. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic display_topic

    4. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic start_topic

    5. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic error_topic

    6. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic appVideo_topic

    1. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic
    2. [2023-11-01 14:36:17,581] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 2 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    3. [2023-11-01 14:36:18,710] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 6 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    4. [2023-11-01 14:36:18,720] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] The following subscribed topics are not assigned to any members: [page_topic] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
    1. [atguigu@node001 ~]$ f1.sh start
    2. -------- 启动 node001 采集flume启动 -------
    3. [atguigu@node001 ~]$ cd /opt/module/data
    4. data/ data_mocker/ datax/
    5. [atguigu@node001 ~]$ cd /opt/module/data
    6. data/ data_mocker/ datax/
    7. [atguigu@node001 ~]$ cd /opt/module/data_mocker/
    8. [atguigu@node001 data_mocker]$ cd 01-onlineEducation/
    9. [atguigu@node001 01-onlineEducation]$ ll
    10. 总用量 30460
    11. -rw-rw-r-- 1 atguigu atguigu 2223 9月 19 10:43 application.yml
    12. -rw-rw-r-- 1 atguigu atguigu 4057995 7月 25 10:28 edu0222.sql
    13. -rw-rw-r-- 1 atguigu atguigu 27112074 7月 25 10:28 edu2021-mock-2022-06-18.jar
    14. drwxrwxr-x 2 atguigu atguigu 4096 10月 26 14:01 log
    15. -rw-rw-r-- 1 atguigu atguigu 1156 7月 25 10:44 logback.xml
    16. -rw-rw-r-- 1 atguigu atguigu 633 7月 25 10:45 path.json
    17. [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
    18. SLF4J: Class path contains multiple SLF4J bindings.
    1. [atguigu@node001 01-onlineEducation]$ jpsall
    2. ================ node001 ================
    3. 7840 Kafka
    4. 11938 ConsoleConsumer
    5. 10402 ConsoleConsumer
    6. 12939 Application
    7. 6124 NameNode
    8. 8364 ConsoleConsumer
    9. 6703 NodeManager
    10. 12272 ConsoleConsumer
    11. 6897 JobHistoryServer
    12. 6294 DataNode
    13. 9623 ConsoleConsumer
    14. 11511 ConsoleConsumer
    15. 4730 QuorumPeerMain
    16. 14047 Jps
    17. ================ node002 ================
    18. 5114 NodeManager
    19. 6748 Jps
    20. 3868 QuorumPeerMain
    21. 4957 ResourceManager
    22. 5965 Kafka
    23. 4719 DataNode
    24. ================ node003 ================
    25. 3697 QuorumPeerMain
    26. 5349 Kafka
    27. 4697 NodeManager
    28. 4526 SecondaryNameNode
    29. 4399 DataNode
    30. 6207 Jps
    31. [atguigu@node001 01-onlineEducation]$
    1. public class BaseLogApp {
    2. public static void main(String[] args) throws Exception {
    3. //TODO 6 写出到kafka不同的主题
    4. String pageTopic = "page_topic";//page_topic、dwd_traffic_page_log
    5. String startTopic = "start_topic";//start_topic、dwd_traffic_start_log
    6. String appVideoTopic = "appVideo_topic";//appVideo_topic、dwd_traffic_play_pre_process
    7. String displayTopic = "display_topic";//display_topic、dwd_traffic_display_log
    8. String actionTopic = "action_topic";//action_topic、dwd_traffic_action_log
    9. String errorTopic = "error_topic";//error_topic、dwd_traffic_error_log
    10. pageStream.sinkTo(KafkaUtil.getKafkaProducer(pageTopic, "page_trans"));
    11. startStream.sinkTo(KafkaUtil.getKafkaProducer(startTopic, "start_trans"));
    12. appVideoStream.sinkTo(KafkaUtil.getKafkaProducer(appVideoTopic, "appVideo_trans"));
    13. displayStream.sinkTo(KafkaUtil.getKafkaProducer(displayTopic, "display_trans"));
    14. actionStream.sinkTo(KafkaUtil.getKafkaProducer(actionTopic, "action_trans"));
    15. errorStream.sinkTo(KafkaUtil.getKafkaProducer(errorTopic, "error_trans"));
    16. }
    17. }

    1. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic

      1. {"common":{"sc":"2","ar":"16","uid":"40","os":"iOS 13.2.9","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_12","vc":"v2.1.134","ba":"iPhone","sid":"60afbbd8-0709-4089-a78b-75a05f2ba27b"},"page":{"page_id":"payment","item":"61060","during_time":14992,"item_type":"order_id","last_page_id":"order"},"ts":1645433992155}

      2. {"common":{"sc":"1","ar":"11","uid":"45","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_224","vc":"v2.1.132","ba":"iPhone","sid":"087a4a0b-0d42-464e-8b16-df3769b6f5eb"},"page":{"page_id":"order","item":"61059","during_time":16171,"item_type":"order_id","last_page_id":"course_detail"},"ts":1645419355809}

    2. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic action_topic

      1. {"common":{"sc":"1","ar":"13","uid":"38","os":"Android 11.0","ch":"vivo","is_new":"1","md":"Huawei P30","mid":"mid_415","vc":"v2.1.134","ba":"Huawei","sid":"0348e98d-79a1-4768-8f6e-da3d97cf289a"},"action":{"item":"445","action_id":"favor_add","item_type":"course_id","ts":1645442392343},"page":{"page_id":"course_detail","item":"445","during_time":13442,"item_type":"course_id","last_page_id":"course_list"},"ts":1645442392343}

      2. {"common":{"sc":"3","ar":"30","uid":"14","os":"Android 10.0","ch":"360","is_new":"0","md":"Xiaomi 9","mid":"mid_101","vc":"v2.1.134","ba":"Xiaomi","sid":"c88ad527-5ff7-42c3-86df-7bfb4f770ed4"},"action":{"item":"379","action_id":"favor_add","item_type":"course_id","ts":1645410357198},"page":{"page_id":"course_detail","item":"379","during_time":18696,"item_type":"course_id","last_page_id":"course_list"},"ts":1645410357198}

    3. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic display_topic

      1. {"common":{"sc":"1","ar":"11","uid":"45","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_224","vc":"v2.1.132","ba":"iPhone","sid":"087a4a0b-0d42-464e-8b16-df3769b6f5eb"},"display":{"display_type":"promotion","item":"10","item_type":"course_id","pos_id":5,"order":3},"page":{"page_id":"order","item":"61059","during_time":16171,"item_type":"order_id","last_page_id":"course_detail"},"ts":1645419355809}

      2. {"common":{"sc":"1","ar":"11","uid":"45","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_224","vc":"v2.1.132","ba":"iPhone","sid":"087a4a0b-0d42-464e-8b16-df3769b6f5eb"},"display":{"display_type":"promotion","item":"2","item_type":"course_id","pos_id":3,"order":4},"page":{"page_id":"order","item":"61059","during_time":16171,"item_type":"order_id","last_page_id":"course_detail"},"ts":1645419355809}

    4. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic start_topic

      1. {"common":{"sc":"3","ar":"3","uid":"1","os":"Android 11.0","ch":"web","is_new":"0","md":"Xiaomi Mix2 ","mid":"mid_316","vc":"v2.1.134","ba":"Xiaomi","sid":"e379f985-e000-49a8-b859-8608ca05b7af"},"start":{"entry":"notice","first_open":0,"open_ad_skip_ms":0,"open_ad_ms":1834,"loading_time":11883,"open_ad_id":19},"ts":1645405681354}

      2. {"common":{"sc":"1","ar":"32","uid":"4","os":"Android 11.0","ch":"xiaomi","is_new":"0","md":"Honor 20s","mid":"mid_266","vc":"v2.1.134","ba":"Honor","sid":"f925612e-16ec-4ff8-8066-03dc4bcb5437"},"start":{"entry":"icon","first_open":1,"open_ad_skip_ms":0,"open_ad_ms":8098,"loading_time":14531,"open_ad_id":7},"ts":1645439534755}

    5. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic error_topic

      1. {"msg":" Exception in thread \\  java.net.SocketTimeoutException\\n \\tat com.atguigu.edu2021.mock.log.AppError.main(AppError.java:xxxxxx)","error_code":1083}

      2. {"msg":" Exception in thread \\  java.net.SocketTimeoutException\\n \\tat com.atguigu.edu2021.mock.log.AppError.main(AppError.java:xxxxxx)","error_code":3684}

    6. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic appVideo_topic

      1. {"common":{"sc":"2","ar":"10","uid":"7","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_412","vc":"v2.1.134","ba":"iPhone","sid":"e1340ad0-f297-4fd0-8c91-1747a6cdd391"},"appVideo":{"play_sec":30,"position_sec":600,"video_id":"2660"},"ts":1645446402788}

      2. {"common":{"sc":"2","ar":"10","uid":"7","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_412","vc":"v2.1.134","ba":"iPhone","sid":"e1340ad0-f297-4fd0-8c91-1747a6cdd391"},"appVideo":{"play_sec":11,"position_sec":630,"video_id":"2660"},"ts":1645446413788}

    P038

    9.2 流量域独立访客事务事实表

    P039

    1. package com.atguigu.edu.realtime.app.dwd.log;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONAware;
    4. import com.alibaba.fastjson.JSONObject;
    5. import com.atguigu.edu.realtime.util.DateFormatUtil;
    6. import com.atguigu.edu.realtime.util.EnvUtil;
    7. import com.atguigu.edu.realtime.util.KafkaUtil;
    8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    9. import org.apache.flink.api.common.functions.FlatMapFunction;
    10. import org.apache.flink.api.common.functions.MapFunction;
    11. import org.apache.flink.api.common.functions.RichFilterFunction;
    12. import org.apache.flink.api.common.state.StateTtlConfig;
    13. import org.apache.flink.api.common.state.ValueState;
    14. import org.apache.flink.api.common.state.ValueStateDescriptor;
    15. import org.apache.flink.api.common.time.Time;
    16. import org.apache.flink.api.java.functions.KeySelector;
    17. import org.apache.flink.configuration.Configuration;
    18. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    19. import org.apache.flink.streaming.api.datastream.KeyedStream;
    20. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    21. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    22. import org.apache.flink.util.Collector;
    23. /**
    24. * @author yhm
    25. * @create 2023-04-21 16:24
    26. */
    27. public class DwdTrafficUniqueVisitorDetail {
    28. public static void main(String[] args) throws Exception {
    29. // TODO 1 创建环境设置状态后端
    30. StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);
    31. // TODO 2 读取kafka日志主题数据
    32. String topicName = "dwd_traffic_page_log";
    33. DataStreamSource pageLogStream = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, "dwd_traffic_unique_visitor_detail"), WatermarkStrategy.noWatermarks(), "unique_visitor_source");
    34. // TODO 3 转换结构,过滤last_page_id不为空的数据
    35. SingleOutputStreamOperator firstPageStream = pageLogStream.flatMap(new FlatMapFunction() {
    36. @Override
    37. public void flatMap(String value, Collector out) throws Exception {
    38. try {
    39. JSONObject jsonObject = JSON.parseObject(value);
    40. String lastPageID = jsonObject.getJSONObject("page").getString("last_page_id");
    41. if (lastPageID == null) {
    42. out.collect(jsonObject);
    43. }
    44. } catch (Exception e) {
    45. e.printStackTrace();
    46. }
    47. }
    48. });
    49. // TODO 4 安装mid分组
    50. KeyedStream keyedStream = firstPageStream.keyBy(new KeySelector() {
    51. @Override
    52. public String getKey(JSONObject value) throws Exception {
    53. return value.getJSONObject("common").getString("mid");
    54. }
    55. });
    56. // TODO 5 判断独立访客
    57. SingleOutputStreamOperator filteredStream = keyedStream.filter(new RichFilterFunction() {
    58. ValueState lastVisitDtState;
    59. @Override
    60. public void open(Configuration parameters) throws Exception {
    61. super.open(parameters);
    62. ValueStateDescriptor stringValueStateDescriptor = new ValueStateDescriptor<>("last_visit_dt", String.class);
    63. // 设置状态的存活时间
    64. stringValueStateDescriptor.enableTimeToLive(StateTtlConfig
    65. .newBuilder(Time.days(1L))
    66. // 设置状态的更新模式为创建及写入
    67. // 每次重新写入的时候记录时间 到1天删除状态
    68. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    69. .build());
    70. lastVisitDtState = getRuntimeContext().getState(stringValueStateDescriptor);
    71. }
    72. @Override
    73. public boolean filter(JSONObject jsonObject) throws Exception {
    74. String visitDt = DateFormatUtil.toDate(jsonObject.getLong("ts"));
    75. String lastVisitDt = lastVisitDtState.value();
    76. // 对于迟到的数据,last日期会大于visit日期,数据也不要
    77. if (lastVisitDt == null || (DateFormatUtil.toTs(lastVisitDt) < DateFormatUtil.toTs(visitDt))) {
    78. lastVisitDtState.update(visitDt);
    79. return true;
    80. }
    81. return false;
    82. }
    83. });
    84. // TODO 6 将独立访客数据写出到对应的kafka主题
    85. String targetTopic = "dwd_traffic_unique_visitor_detail";
    86. SingleOutputStreamOperator sinkStream = filteredStream.map((MapFunction) JSONAware::toJSONString);
    87. sinkStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, "unique_visitor_trans"));
    88. // TODO 7 运行任务
    89. env.execute();
    90. }
    91. }

    P040

    1. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_unique_visitor_detail
    2. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_page_log
    3. [atguigu@node001 01-onlineEducation]$ cd /opt/module/data_mocker/01-onlineEducation/
    4. [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar

  • 相关阅读:
    LeetCode --- 1534. Count Good Triplets 解题报告
    spark知识点总结(1)
    总结CSS常用的知识点以及SCSS语法
    什么是SSL/TLS/HTTPS,对称非对称加密+证书+CA
    but it set boost_system_FOUND to FALSE so package “boost_system“ is
    eslint 警告处理合集
    js 同步、异步的概念,async 和 await 的说明
    【JavaScript】DOM查询(子节点、父节点、兄弟节点)源码详解
    43特征01——特征值和特征向量基本性质
    k8s,30分钟部署一个kubernetes集群
  • 原文地址:https://blog.csdn.net/weixin_44949135/article/details/134139313