• 尚硅谷大数据项目《在线教育之实时数仓》笔记006


    视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

    目录

    第9章 数仓开发之DWD层

    P041

    P042

    P043

    P044

    P045

    P046

    P047

    P048

    P049

    P050

    P051

    P052


    第9章 数仓开发之DWD层

    P041

    9.3 流量域用户跳出事务事实表

    P042

    DwdTrafficUserJumpDetail

    // TODO 1 创建环境设置状态后端

    // TODO 2 从kafka的page主题读取数据

    // TODO 3 过滤加转换数据

    // TODO 4 添加水位线

    // TODO 5 按照mid分组

    P043

    1. package com.atguigu.edu.realtime.app.dwd.log;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONObject;
    4. import com.atguigu.edu.realtime.util.EnvUtil;
    5. import com.atguigu.edu.realtime.util.KafkaUtil;
    6. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    7. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    8. import org.apache.flink.api.common.functions.FlatMapFunction;
    9. import org.apache.flink.api.java.functions.KeySelector;
    10. import org.apache.flink.cep.CEP;
    11. import org.apache.flink.cep.PatternFlatSelectFunction;
    12. import org.apache.flink.cep.PatternFlatTimeoutFunction;
    13. import org.apache.flink.cep.PatternStream;
    14. import org.apache.flink.cep.pattern.Pattern;
    15. import org.apache.flink.cep.pattern.conditions.IterativeCondition;
    16. import org.apache.flink.streaming.api.datastream.*;
    17. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    18. import org.apache.flink.streaming.api.windowing.time.Time;
    19. import org.apache.flink.util.Collector;
    20. import org.apache.flink.util.OutputTag;
    21. import java.util.List;
    22. import java.util.Map;
    23. /**
    24. * @author yhm
    25. * @create 2023-04-21 17:54
    26. */
    27. public class DwdTrafficUserJumpDetail {
    28. public static void main(String[] args) throws Exception {
    29. // TODO 1 创建环境设置状态后端
    30. StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);
    31. // TODO 2 从kafka的page主题读取数据
    32. String topicName = "dwd_traffic_page_log";
    33. DataStreamSource logDS = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, "dwd_traffic_user_jump_detail"), WatermarkStrategy.noWatermarks(), "user_jump_source");
    34. // 测试数据
    35. DataStream kafkaDS = env
    36. .fromElements(
    37. "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
    38. "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
    39. "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\"},\"ts\":15000} ",
    40. "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
    41. "\"detail\"},\"ts\":30000} "
    42. );
    43. // TODO 3 过滤加转换数据
    44. SingleOutputStreamOperator jsonObjStream = kafkaDS.flatMap(new FlatMapFunction() {
    45. @Override
    46. public void flatMap(String value, Collector out) throws Exception {
    47. try {
    48. JSONObject jsonObject = JSON.parseObject(value);
    49. out.collect(jsonObject);
    50. } catch (Exception e) {
    51. e.printStackTrace();
    52. }
    53. }
    54. });
    55. // TODO 4 添加水位线
    56. SingleOutputStreamOperator withWatermarkStream = jsonObjStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
    57. .withTimestampAssigner(new SerializableTimestampAssigner() {
    58. @Override
    59. public long extractTimestamp(JSONObject element, long recordTimestamp) {
    60. return element.getLong("ts");
    61. }
    62. }));
    63. // TODO 5 按照mid分组
    64. KeyedStream keyedStream = withWatermarkStream.keyBy(new KeySelector() {
    65. @Override
    66. public String getKey(JSONObject jsonObject) throws Exception {
    67. return jsonObject.getJSONObject("common").getString("mid");
    68. }
    69. });
    70. // TODO 6 定义cep匹配规则
    71. Pattern pattern = Pattern.begin("first").where(new IterativeCondition() {
    72. @Override
    73. public boolean filter(JSONObject jsonObject, Context ctx) throws Exception {
    74. // 一个会话的开头 -> last_page_id 为空
    75. String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
    76. return lastPageId == null;
    77. }
    78. }).next("second").where(new IterativeCondition() {
    79. @Override
    80. public boolean filter(JSONObject jsonObject, Context ctx) throws Exception {
    81. // 满足匹配的条件
    82. // 紧密相连,又一个会话的开头
    83. String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
    84. return lastPageId == null;
    85. }
    86. }).within(Time.seconds(10L));
    87. // TODO 7 将CEP作用到流上
    88. PatternStream patternStream = CEP.pattern(keyedStream, pattern);
    89. // TODO 8 提取匹配数据和超时数据
    90. OutputTag timeoutTag = new OutputTag("timeoutTag") {
    91. };
    92. SingleOutputStreamOperator flatSelectStream = patternStream.flatSelect(timeoutTag, new PatternFlatTimeoutFunction() {
    93. @Override
    94. public void timeout(Map> pattern, long timeoutTimestamp, Collector out) throws Exception {
    95. JSONObject first = pattern.get("first").get(0);
    96. out.collect(first.toJSONString());
    97. }
    98. }, new PatternFlatSelectFunction() {
    99. @Override
    100. public void flatSelect(Map> pattern, Collector out) throws Exception {
    101. JSONObject first = pattern.get("first").get(0);
    102. out.collect(first.toJSONString());
    103. }
    104. });
    105. SideOutputDataStream timeoutStream = flatSelectStream.getSideOutput(timeoutTag);
    106. // TODO 9 合并数据写出到kafka
    107. DataStream unionStream = flatSelectStream.union(timeoutStream);
    108. String targetTopic = "dwd_traffic_user_jump_detail";
    109. unionStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, "user_jump_trans"));
    110. // TODO 10 执行任务
    111. env.execute();
    112. }
    113. }

    P044

    超时数据

    P045

    9.4 学习域播放事务事实表

    P046

    DwdLearnPlay、DwdLearnPlayBean

    //TODO 1 创建环境设置状态后端

    //TODO 2 读取kafka播放日志数据

    //TODO 3 清洗转换

    //TODO 4 添加水位线

    P047

    1. package com.atguigu.edu.realtime.app.dwd.log;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONObject;
    4. import com.atguigu.edu.realtime.bean.DwdLearnPlayBean;
    5. import com.atguigu.edu.realtime.util.EnvUtil;
    6. import com.atguigu.edu.realtime.util.KafkaUtil;
    7. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    9. import org.apache.flink.api.common.functions.FlatMapFunction;
    10. import org.apache.flink.api.common.functions.ReduceFunction;
    11. import org.apache.flink.api.java.functions.KeySelector;
    12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    13. import org.apache.flink.streaming.api.datastream.KeyedStream;
    14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    15. import org.apache.flink.streaming.api.datastream.WindowedStream;
    16. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    17. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    18. import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
    19. import org.apache.flink.streaming.api.windowing.time.Time;
    20. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    21. import org.apache.flink.util.Collector;
    22. import java.time.Duration;
    23. /**
    24. * @author yhm
    25. * @create 2023-04-23 14:21
    26. */
    27. public class DwdLearnPlay {
    28. public static void main(String[] args) throws Exception {
    29. //TODO 1 创建环境设置状态后端
    30. StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
    31. //TODO 2 读取kafka播放日志数据
    32. String topicName = "dwd_traffic_play_pre_process";
    33. String groupId = "dwd_learn_play";
    34. DataStreamSource playSource = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId), WatermarkStrategy.noWatermarks(), "learn_play");
    35. //TODO 3 清洗转换
    36. SingleOutputStreamOperator learnBeanStream = playSource.flatMap(new FlatMapFunction() {
    37. @Override
    38. public void flatMap(String value, Collector out) throws Exception {
    39. try {
    40. JSONObject jsonObject = JSON.parseObject(value);
    41. JSONObject common = jsonObject.getJSONObject("common");
    42. JSONObject appVideo = jsonObject.getJSONObject("appVideo");
    43. Long ts = jsonObject.getLong("ts");
    44. DwdLearnPlayBean learnPlayBean = DwdLearnPlayBean.builder()
    45. .provinceId(common.getString("ar"))
    46. .brand(common.getString("ba"))
    47. .channel(common.getString("ch"))
    48. .isNew(common.getString("is_new"))
    49. .model(common.getString("md"))
    50. .machineId(common.getString("mid"))
    51. .operatingSystem(common.getString("os"))
    52. .sourceId(common.getString("sc"))
    53. .sessionId(common.getString("sid"))
    54. .userId(common.getString("uid"))
    55. .versionCode(common.getString("vc"))
    56. .playSec(appVideo.getInteger("play_sec"))
    57. .videoId(appVideo.getString("video_id"))
    58. .positionSec(appVideo.getInteger("position_sec"))
    59. .ts(ts)
    60. .build();
    61. out.collect(learnPlayBean);
    62. } catch (Exception e) {
    63. e.printStackTrace();
    64. }
    65. }
    66. });
    67. //TODO 4 添加水位线
    68. SingleOutputStreamOperator withWatermarkStream = learnBeanStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(
    69. new SerializableTimestampAssigner() {
    70. @Override
    71. public long extractTimestamp(DwdLearnPlayBean element, long recordTimestamp) {
    72. return element.getTs();
    73. }
    74. }
    75. ));
    76. //TODO 5 按照会话id分组
    77. KeyedStream keyedStream = withWatermarkStream.keyBy(new KeySelector() {
    78. @Override
    79. public String getKey(DwdLearnPlayBean value) throws Exception {
    80. return value.getSessionId();
    81. }
    82. });
    83. //TODO 6 聚合统计
    84. WindowedStream windowStream = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(3L)));
    85. SingleOutputStreamOperator reducedStream = windowStream.reduce(
    86. new ReduceFunction() {
    87. @Override
    88. public DwdLearnPlayBean reduce(DwdLearnPlayBean value1, DwdLearnPlayBean value2) throws Exception {
    89. value1.setPlaySec(value1.getPlaySec() + value2.getPlaySec());
    90. if (value2.getTs() > value1.getTs()) {
    91. value1.setPositionSec(value2.getPositionSec());
    92. }
    93. return value1;
    94. }
    95. }, new ProcessWindowFunction() {
    96. @Override
    97. public void process(String key, Context context, Iterable elements, Collector out) throws Exception {
    98. for (DwdLearnPlayBean element : elements) {
    99. out.collect(element);
    100. }
    101. }
    102. }
    103. );
    104. //TODO 7 转换结构
    105. SingleOutputStreamOperator jsonStrStream = reducedStream.map(JSON::toJSONString);
    106. //TODO 8 输出到kafka主题Kafka dwd_learn_play
    107. String targetTopic = "dwd_learn_play";
    108. jsonStrStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic,"learn_pay_trans"));
    109. //TODO 9 执行任务
    110. env.execute();
    111. }
    112. }

    P048

    先启动消费者DwdLearnPlay,再mock数据。

    kafka没有消费到数据,DwdLearnPlay:将并发改为1(TODO 1)、改时间(TODO 6,时间改为3s),窗口和并发调小一些。

    同一个人看的同一个视频,时间不一样,看的位置也不一样。

    [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_learn_play
    1. [atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
    2. [atguigu@node001 01-onlineEducation]$ ll
    3. 总用量 30460
    4. -rw-rw-r-- 1 atguigu atguigu 2223 9月 19 10:43 application.yml
    5. -rw-rw-r-- 1 atguigu atguigu 4057995 7月 25 10:28 edu0222.sql
    6. -rw-rw-r-- 1 atguigu atguigu 27112074 7月 25 10:28 edu2021-mock-2022-06-18.jar
    7. drwxrwxr-x 2 atguigu atguigu 4096 11月 2 11:13 log
    8. -rw-rw-r-- 1 atguigu atguigu 1156 7月 25 10:44 logback.xml
    9. -rw-rw-r-- 1 atguigu atguigu 633 7月 25 10:45 path.json
    10. [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
    11. SLF4J: Class path contains multiple SLF4J bindings.
    12. SLF4J: Found binding in [jar:file:/opt/module/data_mocker/01-onlineEducation/edu2021-mock-2022-06-18.jar!/BOOT-INF/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    13. SLF4J: Found binding in [jar:file:/opt/module/data_mocker/01-onlineEducation/edu2021-mock-2022-06-18.jar!/BOOT-INF/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    1. {"brand":"Xiaomi","channel":"xiaomi","isNew":"0","machineId":"mid_293","model":"Xiaomi Mix2 ","operatingSystem":"Android 10.0","playSec":30,"positionSec":690,"provinceId":"18","sessionId":"a1fb6d22-f8ef-40e6-89c2-262cd5a351be","sourceId":"1","ts":1645460612085,"userId":"46","versionCode":"v2.1.134","videoId":"108"}
    2. {"brand":"Xiaomi","channel":"xiaomi","isNew":"0","machineId":"mid_293","model":"Xiaomi Mix2 ","operatingSystem":"Android 10.0","playSec":30,"positionSec":720,"provinceId":"18","sessionId":"a1fb6d22-f8ef-40e6-89c2-262cd5a351be","sourceId":"1","ts":1645460642085,"userId":"46","versionCode":"v2.1.134","videoId":"108"}
    3. {
    4. "brand":"Xiaomi",
    5. "channel":"xiaomi",
    6. "isNew":"0",
    7. "machineId":"mid_293",
    8. "model":"Xiaomi Mix2 ",
    9. "operatingSystem":"Android 10.0",
    10. "playSec":30,
    11. "positionSec":690,
    12. "provinceId":"18",
    13. "sessionId":"a1fb6d22-f8ef-40e6-89c2-262cd5a351be",
    14. "sourceId":"1",
    15. "ts":1645460612085,
    16. "userId":"46",
    17. "versionCode":"v2.1.134",
    18. "videoId":"108"
    19. }

    P049

    9.5 用户域用户登录事务事实表

    9.5.1 主要任务

    读取页面日志数据,筛选用户登录记录,写入 Kafka 用户登录主题。

    9.5.2 思路分析

    9.5.3 图解

    P050

    DwdUserUserLogin

    //TODO 1 创建环境设置状态后端

    //TODO 2 读取kafka的dwd_traffic_page_log主题数据

    //TODO 3 过滤及转换

    //TODO 4 添加水位线

    //TODO 5 按照会话id分组

    P051

    DwdUserUserLogin、DwdUserUserLoginBean

    1. package com.atguigu.edu.realtime.app.dwd.log;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONObject;
    4. import com.atguigu.edu.realtime.bean.DwdUserUserLoginBean;
    5. import com.atguigu.edu.realtime.util.DateFormatUtil;
    6. import com.atguigu.edu.realtime.util.EnvUtil;
    7. import com.atguigu.edu.realtime.util.KafkaUtil;
    8. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    9. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    10. import org.apache.flink.api.common.functions.FlatMapFunction;
    11. import org.apache.flink.api.common.functions.MapFunction;
    12. import org.apache.flink.api.common.state.StateTtlConfig;
    13. import org.apache.flink.api.common.state.ValueState;
    14. import org.apache.flink.api.common.state.ValueStateDescriptor;
    15. import org.apache.flink.api.common.time.Time;
    16. import org.apache.flink.api.java.functions.KeySelector;
    17. import org.apache.flink.configuration.Configuration;
    18. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    19. import org.apache.flink.streaming.api.datastream.KeyedStream;
    20. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    21. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    22. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    23. import org.apache.flink.util.Collector;
    24. import java.time.Duration;
    25. /**
    26. * @author yhm
    27. * @create 2023-04-23 16:02
    28. */
    29. public class DwdUserUserLogin {
    30. public static void main(String[] args) throws Exception {
    31. //TODO 1 创建环境设置状态后端
    32. StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
    33. //TODO 2 读取kafka的dwd_traffic_page_log主题数据
    34. String topicName = "dwd_traffic_page_log";
    35. String groupId = "dwd_user_user_login";
    36. DataStreamSource pageStream = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId), WatermarkStrategy.noWatermarks(), "user_login");
    37. //TODO 3 过滤及转换
    38. SingleOutputStreamOperator jsonObjStream = pageStream.flatMap(new FlatMapFunction() {
    39. @Override
    40. public void flatMap(String value, Collector out) throws Exception {
    41. try {
    42. JSONObject jsonObject = JSON.parseObject(value);
    43. if (jsonObject.getJSONObject("common").getString("uid") != null) {
    44. out.collect(jsonObject);
    45. }
    46. } catch (Exception e) {
    47. e.printStackTrace();
    48. }
    49. }
    50. });
    51. //TODO 4 添加水位线
    52. SingleOutputStreamOperator withWaterMarkStream = jsonObjStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner() {
    53. @Override
    54. public long extractTimestamp(JSONObject element, long recordTimestamp) {
    55. return element.getLong("ts");
    56. }
    57. }));
    58. //TODO 5 按照会话id分组
    59. KeyedStream keyedStream = withWaterMarkStream.keyBy(new KeySelector() {
    60. @Override
    61. public String getKey(JSONObject value) throws Exception {
    62. return value.getJSONObject("common").getString("mid");
    63. }
    64. });
    65. //TODO 6 使用状态找出每个会话第一条数据
    66. SingleOutputStreamOperator firstStream = keyedStream.process(new KeyedProcessFunction() {
    67. ValueState firstLoginDtState;
    68. @Override
    69. public void open(Configuration parameters) throws Exception {
    70. super.open(parameters);
    71. ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("first_login_dt", JSONObject.class);
    72. // 添加状态存活时间
    73. valueStateDescriptor.enableTimeToLive(StateTtlConfig
    74. .newBuilder(Time.days(1L))
    75. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    76. .build());
    77. firstLoginDtState = getRuntimeContext().getState(valueStateDescriptor);
    78. }
    79. @Override
    80. public void processElement(JSONObject jsonObject, Context ctx, Collector out) throws Exception {
    81. // 处理数据
    82. // 获取状态
    83. JSONObject firstLoginDt = firstLoginDtState.value();
    84. Long ts = jsonObject.getLong("ts");
    85. if (firstLoginDt == null) {
    86. firstLoginDtState.update(jsonObject);
    87. // 第一条数据到的时候开启定时器
    88. ctx.timerService().registerEventTimeTimer(ts + 10 * 1000L);
    89. } else {
    90. Long lastTs = firstLoginDt.getLong("ts");
    91. if (ts < lastTs) {
    92. firstLoginDtState.update(jsonObject);
    93. }
    94. }
    95. }
    96. @Override
    97. public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
    98. super.onTimer(timestamp, ctx, out);
    99. out.collect(firstLoginDtState.value());
    100. }
    101. });
    102. //TODO 7 转换结构
    103. SingleOutputStreamOperator mapStream = firstStream.map(new MapFunction() {
    104. @Override
    105. public String map(JSONObject jsonObj) throws Exception {
    106. JSONObject common = jsonObj.getJSONObject("common");
    107. Long ts = jsonObj.getLong("ts");
    108. String loginTime = DateFormatUtil.toYmdHms(ts);
    109. String dateId = loginTime.substring(0, 10);
    110. DwdUserUserLoginBean dwdUserUserLoginBean = DwdUserUserLoginBean.builder()
    111. .userId(common.getString("uid"))
    112. .dateId(dateId)
    113. .loginTime(loginTime)
    114. .channel(common.getString("ch"))
    115. .provinceId(common.getString("ar"))
    116. .versionCode(common.getString("vc"))
    117. .midId(common.getString("mid"))
    118. .brand(common.getString("ba"))
    119. .model(common.getString("md"))
    120. .sourceId(common.getString("sc"))
    121. .operatingSystem(common.getString("os"))
    122. .ts(ts)
    123. .build();
    124. return JSON.toJSONString(dwdUserUserLoginBean);
    125. }
    126. });
    127. //TODO 8 输出数据
    128. String sinkTopic = "dwd_user_user_login";
    129. mapStream.sinkTo(KafkaUtil.getKafkaProducer(sinkTopic, "user_login_trans"));
    130. //TODO 9 执行任务
    131. env.execute();
    132. }
    133. }

    P052

    [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_user_user_login
    1. [atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
    2. [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
  • 相关阅读:
    E. Qpwoeirut and Vertices(思维 + MST + lca + 线段树)
    国庆作业2
    行为模式-责任链模式
    day25-登录和代理IP
    基于windows、GDAL2.2.3版本和Java集成安装和使用GDAL库的方法
    dc_shell的change_names命令,信号[]/_
    video元素与audio元素详解
    带你一分钟看懂 “Docker”
    Windows 系统下怎么获取 UDP 本机地址
    linux网桥简单理解和持久化配置
  • 原文地址:https://blog.csdn.net/weixin_44949135/article/details/134178909