目录
DWD层设计要点:
(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。
(2)DWD层表名的命名规范为dwd_数据域_表名。
存放事实表,从kafka的topic_log和topic_db中读取需要用到的业务流程相关数据,将业务流程关联起来做成明细数据写回kafka当中。
尚硅谷大数据学科全套教程\3.尚硅谷大数据学科--项目实战\尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系
在线教育实时业务总线矩阵.xlsx
9.1.3 图解
- package com.atguigu.edu.realtime.app.dwd.log;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.util.DateFormatUtil;
- import com.atguigu.edu.realtime.util.EnvUtil;
- import com.atguigu.edu.realtime.util.KafkaUtil;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.state.ValueStateDescriptor;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.util.Collector;
- import org.apache.flink.util.OutputTag;
-
- /**
- * @author
- * @create 2023-04-21 14:01
- */
- public class BaseLogApp {
- public static void main(String[] args) throws Exception {
- //TODO 1 创建环境设置状态后端
- StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
-
- //TODO 2 从kafka中读取主流数据
- String topicName = "topic_log";
- String groupId = "base_log_app";
- DataStreamSource
baseLogSource = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId), - WatermarkStrategy.noWatermarks(),
- "base_log_source"
- );
-
- //TODO 3 对数据进行清洗转换
- // 3.1 定义侧输出流
- OutputTag
dirtyStreamTag = new OutputTag("dirtyStream") { - };
- // 3.2 清洗转换
- SingleOutputStreamOperator
cleanedStream = baseLogSource.process(new ProcessFunction() { - @Override
- public void processElement(String value, Context ctx, Collector
out) throws Exception { - try {
- JSONObject jsonObject = JSON.parseObject(value);
- out.collect(jsonObject);
- } catch (Exception e) {
- ctx.output(dirtyStreamTag, value);
- }
- }
- });
- // 3.3 将脏数据写出到kafka对应的主题
- SideOutputDataStream
dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag); - String dirtyTopicName = "dirty_data";
- dirtyStream.sinkTo(KafkaUtil.getKafkaProducer(dirtyTopicName, "dirty_trans"));
-
- //TODO 4 新老访客标记修复
-
- //TODO 5 数据分流
-
- //TODO 6 写出到kafka不同的主题
-
- //TODO 7 执行任务
- }
- }
KafkaUtil.java
新老访客逻辑介绍
BaseLogApp.java
//TODO 4 新老访客标记修复
- [atguigu@node001 log]$ pwd
- /opt/module/data_mocker/01-onlineEducation/log
- [atguigu@node001 log]$ cat -n 200 app.2023-09-19.log
- {"common":{"ar":"26","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_188","os":"iOS 13.3.1","sc":"1","sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9","uid":"20","vc":"v2.1.134"},"page":{"during_time":901000,"item":"173","item_type":"paper_id","last_page_id":"course_detail","page_id":"exam"},"ts":1645456489411}
- {
- "common":{
- "ar":"26",
- "ba":"iPhone",
- "ch":"Appstore",
- "is_new":"0",
- "md":"iPhone 8",
- "mid":"mid_188",
- "os":"iOS 13.3.1",
- "sc":"1",
- "sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9",
- "uid":"20",
- "vc":"v2.1.134"
- },
- "page":{
- "during_time":901000,
- "item":"173",
- "item_type":"paper_id",
- "last_page_id":"course_detail",
- "page_id":"exam"
- },
- "ts":1645456489411
- }
BaseLogApp.java
//TODO 5 数据分流
//TODO 6 写出到kafka不同的主题
启动 hadoop、zookeeper、kafka、flume(f1)。
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic action_topic
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic display_topic
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic start_topic
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic error_topic
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic appVideo_topic
- [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic
- [2023-11-01 14:36:17,581] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 2 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
- [2023-11-01 14:36:18,710] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 6 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
- [2023-11-01 14:36:18,720] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] The following subscribed topics are not assigned to any members: [page_topic] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
- [atguigu@node001 ~]$ f1.sh start
- -------- 启动 node001 采集flume启动 -------
- [atguigu@node001 ~]$ cd /opt/module/data
- data/ data_mocker/ datax/
- [atguigu@node001 ~]$ cd /opt/module/data
- data/ data_mocker/ datax/
- [atguigu@node001 ~]$ cd /opt/module/data_mocker/
- [atguigu@node001 data_mocker]$ cd 01-onlineEducation/
- [atguigu@node001 01-onlineEducation]$ ll
- 总用量 30460
- -rw-rw-r-- 1 atguigu atguigu 2223 9月 19 10:43 application.yml
- -rw-rw-r-- 1 atguigu atguigu 4057995 7月 25 10:28 edu0222.sql
- -rw-rw-r-- 1 atguigu atguigu 27112074 7月 25 10:28 edu2021-mock-2022-06-18.jar
- drwxrwxr-x 2 atguigu atguigu 4096 10月 26 14:01 log
- -rw-rw-r-- 1 atguigu atguigu 1156 7月 25 10:44 logback.xml
- -rw-rw-r-- 1 atguigu atguigu 633 7月 25 10:45 path.json
- [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
- SLF4J: Class path contains multiple SLF4J bindings.
- [atguigu@node001 01-onlineEducation]$ jpsall
- ================ node001 ================
- 7840 Kafka
- 11938 ConsoleConsumer
- 10402 ConsoleConsumer
- 12939 Application
- 6124 NameNode
- 8364 ConsoleConsumer
- 6703 NodeManager
- 12272 ConsoleConsumer
- 6897 JobHistoryServer
- 6294 DataNode
- 9623 ConsoleConsumer
- 11511 ConsoleConsumer
- 4730 QuorumPeerMain
- 14047 Jps
- ================ node002 ================
- 5114 NodeManager
- 6748 Jps
- 3868 QuorumPeerMain
- 4957 ResourceManager
- 5965 Kafka
- 4719 DataNode
- ================ node003 ================
- 3697 QuorumPeerMain
- 5349 Kafka
- 4697 NodeManager
- 4526 SecondaryNameNode
- 4399 DataNode
- 6207 Jps
- [atguigu@node001 01-onlineEducation]$
- public class BaseLogApp {
- public static void main(String[] args) throws Exception {
- //TODO 6 写出到kafka不同的主题
- String pageTopic = "page_topic";//page_topic、dwd_traffic_page_log
- String startTopic = "start_topic";//start_topic、dwd_traffic_start_log
- String appVideoTopic = "appVideo_topic";//appVideo_topic、dwd_traffic_play_pre_process
- String displayTopic = "display_topic";//display_topic、dwd_traffic_display_log
- String actionTopic = "action_topic";//action_topic、dwd_traffic_action_log
- String errorTopic = "error_topic";//error_topic、dwd_traffic_error_log
-
- pageStream.sinkTo(KafkaUtil.getKafkaProducer(pageTopic, "page_trans"));
- startStream.sinkTo(KafkaUtil.getKafkaProducer(startTopic, "start_trans"));
- appVideoStream.sinkTo(KafkaUtil.getKafkaProducer(appVideoTopic, "appVideo_trans"));
- displayStream.sinkTo(KafkaUtil.getKafkaProducer(displayTopic, "display_trans"));
- actionStream.sinkTo(KafkaUtil.getKafkaProducer(actionTopic, "action_trans"));
- errorStream.sinkTo(KafkaUtil.getKafkaProducer(errorTopic, "error_trans"));
- }
- }
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic
{"common":{"sc":"2","ar":"16","uid":"40","os":"iOS 13.2.9","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_12","vc":"v2.1.134","ba":"iPhone","sid":"60afbbd8-0709-4089-a78b-75a05f2ba27b"},"page":{"page_id":"payment","item":"61060","during_time":14992,"item_type":"order_id","last_page_id":"order"},"ts":1645433992155}
{"common":{"sc":"1","ar":"11","uid":"45","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_224","vc":"v2.1.132","ba":"iPhone","sid":"087a4a0b-0d42-464e-8b16-df3769b6f5eb"},"page":{"page_id":"order","item":"61059","during_time":16171,"item_type":"order_id","last_page_id":"course_detail"},"ts":1645419355809}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic action_topic
{"common":{"sc":"1","ar":"13","uid":"38","os":"Android 11.0","ch":"vivo","is_new":"1","md":"Huawei P30","mid":"mid_415","vc":"v2.1.134","ba":"Huawei","sid":"0348e98d-79a1-4768-8f6e-da3d97cf289a"},"action":{"item":"445","action_id":"favor_add","item_type":"course_id","ts":1645442392343},"page":{"page_id":"course_detail","item":"445","during_time":13442,"item_type":"course_id","last_page_id":"course_list"},"ts":1645442392343}
{"common":{"sc":"3","ar":"30","uid":"14","os":"Android 10.0","ch":"360","is_new":"0","md":"Xiaomi 9","mid":"mid_101","vc":"v2.1.134","ba":"Xiaomi","sid":"c88ad527-5ff7-42c3-86df-7bfb4f770ed4"},"action":{"item":"379","action_id":"favor_add","item_type":"course_id","ts":1645410357198},"page":{"page_id":"course_detail","item":"379","during_time":18696,"item_type":"course_id","last_page_id":"course_list"},"ts":1645410357198}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic display_topic
{"common":{"sc":"1","ar":"11","uid":"45","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_224","vc":"v2.1.132","ba":"iPhone","sid":"087a4a0b-0d42-464e-8b16-df3769b6f5eb"},"display":{"display_type":"promotion","item":"10","item_type":"course_id","pos_id":5,"order":3},"page":{"page_id":"order","item":"61059","during_time":16171,"item_type":"order_id","last_page_id":"course_detail"},"ts":1645419355809}
{"common":{"sc":"1","ar":"11","uid":"45","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs","mid":"mid_224","vc":"v2.1.132","ba":"iPhone","sid":"087a4a0b-0d42-464e-8b16-df3769b6f5eb"},"display":{"display_type":"promotion","item":"2","item_type":"course_id","pos_id":3,"order":4},"page":{"page_id":"order","item":"61059","during_time":16171,"item_type":"order_id","last_page_id":"course_detail"},"ts":1645419355809}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic start_topic
{"common":{"sc":"3","ar":"3","uid":"1","os":"Android 11.0","ch":"web","is_new":"0","md":"Xiaomi Mix2 ","mid":"mid_316","vc":"v2.1.134","ba":"Xiaomi","sid":"e379f985-e000-49a8-b859-8608ca05b7af"},"start":{"entry":"notice","first_open":0,"open_ad_skip_ms":0,"open_ad_ms":1834,"loading_time":11883,"open_ad_id":19},"ts":1645405681354}
{"common":{"sc":"1","ar":"32","uid":"4","os":"Android 11.0","ch":"xiaomi","is_new":"0","md":"Honor 20s","mid":"mid_266","vc":"v2.1.134","ba":"Honor","sid":"f925612e-16ec-4ff8-8066-03dc4bcb5437"},"start":{"entry":"icon","first_open":1,"open_ad_skip_ms":0,"open_ad_ms":8098,"loading_time":14531,"open_ad_id":7},"ts":1645439534755}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic error_topic
{"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atguigu.edu2021.mock.log.AppError.main(AppError.java:xxxxxx)","error_code":1083}
{"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atguigu.edu2021.mock.log.AppError.main(AppError.java:xxxxxx)","error_code":3684}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic appVideo_topic
{"common":{"sc":"2","ar":"10","uid":"7","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_412","vc":"v2.1.134","ba":"iPhone","sid":"e1340ad0-f297-4fd0-8c91-1747a6cdd391"},"appVideo":{"play_sec":30,"position_sec":600,"video_id":"2660"},"ts":1645446402788}
{"common":{"sc":"2","ar":"10","uid":"7","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone Xs Max","mid":"mid_412","vc":"v2.1.134","ba":"iPhone","sid":"e1340ad0-f297-4fd0-8c91-1747a6cdd391"},"appVideo":{"play_sec":11,"position_sec":630,"video_id":"2660"},"ts":1645446413788}
9.2 流量域独立访客事务事实表
- package com.atguigu.edu.realtime.app.dwd.log;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONAware;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.util.DateFormatUtil;
- import com.atguigu.edu.realtime.util.EnvUtil;
- import com.atguigu.edu.realtime.util.KafkaUtil;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.RichFilterFunction;
- import org.apache.flink.api.common.state.StateTtlConfig;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.state.ValueStateDescriptor;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- /**
- * @author yhm
- * @create 2023-04-21 16:24
- */
- public class DwdTrafficUniqueVisitorDetail {
- public static void main(String[] args) throws Exception {
- // TODO 1 创建环境设置状态后端
- StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);
-
- // TODO 2 读取kafka日志主题数据
- String topicName = "dwd_traffic_page_log";
- DataStreamSource
pageLogStream = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, "dwd_traffic_unique_visitor_detail"), WatermarkStrategy.noWatermarks(), "unique_visitor_source"); -
- // TODO 3 转换结构,过滤last_page_id不为空的数据
- SingleOutputStreamOperator
firstPageStream = pageLogStream.flatMap(new FlatMapFunction() { - @Override
- public void flatMap(String value, Collector
out) throws Exception { - try {
- JSONObject jsonObject = JSON.parseObject(value);
- String lastPageID = jsonObject.getJSONObject("page").getString("last_page_id");
- if (lastPageID == null) {
- out.collect(jsonObject);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- // TODO 4 安装mid分组
- KeyedStream
keyedStream = firstPageStream.keyBy(new KeySelector() { - @Override
- public String getKey(JSONObject value) throws Exception {
- return value.getJSONObject("common").getString("mid");
- }
- });
-
- // TODO 5 判断独立访客
- SingleOutputStreamOperator
filteredStream = keyedStream.filter(new RichFilterFunction() { - ValueState
lastVisitDtState; -
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- ValueStateDescriptor
stringValueStateDescriptor = new ValueStateDescriptor<>("last_visit_dt", String.class); - // 设置状态的存活时间
- stringValueStateDescriptor.enableTimeToLive(StateTtlConfig
- .newBuilder(Time.days(1L))
- // 设置状态的更新模式为创建及写入
- // 每次重新写入的时候记录时间 到1天删除状态
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build());
- lastVisitDtState = getRuntimeContext().getState(stringValueStateDescriptor);
- }
-
- @Override
- public boolean filter(JSONObject jsonObject) throws Exception {
- String visitDt = DateFormatUtil.toDate(jsonObject.getLong("ts"));
- String lastVisitDt = lastVisitDtState.value();
- // 对于迟到的数据,last日期会大于visit日期,数据也不要
- if (lastVisitDt == null || (DateFormatUtil.toTs(lastVisitDt) < DateFormatUtil.toTs(visitDt))) {
- lastVisitDtState.update(visitDt);
- return true;
- }
- return false;
- }
- });
-
- // TODO 6 将独立访客数据写出到对应的kafka主题
- String targetTopic = "dwd_traffic_unique_visitor_detail";
- SingleOutputStreamOperator
sinkStream = filteredStream.map((MapFunction) JSONAware::toJSONString); - sinkStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, "unique_visitor_trans"));
-
- // TODO 7 运行任务
- env.execute();
- }
- }
- [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_unique_visitor_detail
- [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_page_log
-
- [atguigu@node001 01-onlineEducation]$ cd /opt/module/data_mocker/01-onlineEducation/
- [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar