目录
9.3 流量域用户跳出事务事实表
DwdTrafficUserJumpDetail
// TODO 1 创建环境设置状态后端
// TODO 2 从kafka的page主题读取数据
// TODO 3 过滤加转换数据
// TODO 4 添加水位线
// TODO 5 按照mid分组
- package com.atguigu.edu.realtime.app.dwd.log;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.util.EnvUtil;
- import com.atguigu.edu.realtime.util.KafkaUtil;
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.cep.CEP;
- import org.apache.flink.cep.PatternFlatSelectFunction;
- import org.apache.flink.cep.PatternFlatTimeoutFunction;
- import org.apache.flink.cep.PatternStream;
- import org.apache.flink.cep.pattern.Pattern;
- import org.apache.flink.cep.pattern.conditions.IterativeCondition;
- import org.apache.flink.streaming.api.datastream.*;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
- import org.apache.flink.util.OutputTag;
-
- import java.util.List;
- import java.util.Map;
-
- /**
- * @author yhm
- * @create 2023-04-21 17:54
- */
- public class DwdTrafficUserJumpDetail {
- public static void main(String[] args) throws Exception {
- // TODO 1 创建环境设置状态后端
- StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);
-
- // TODO 2 从kafka的page主题读取数据
- String topicName = "dwd_traffic_page_log";
- DataStreamSource
logDS = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, "dwd_traffic_user_jump_detail"), WatermarkStrategy.noWatermarks(), "user_jump_source"); -
- // 测试数据
- DataStream
kafkaDS = env - .fromElements(
- "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
- "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
- "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\"},\"ts\":15000} ",
- "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
- "\"detail\"},\"ts\":30000} "
- );
-
- // TODO 3 过滤加转换数据
- SingleOutputStreamOperator
jsonObjStream = kafkaDS.flatMap(new FlatMapFunction() { - @Override
- public void flatMap(String value, Collector
out) throws Exception { - try {
- JSONObject jsonObject = JSON.parseObject(value);
- out.collect(jsonObject);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- // TODO 4 添加水位线
- SingleOutputStreamOperator
withWatermarkStream = jsonObjStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps() - .withTimestampAssigner(new SerializableTimestampAssigner
() { - @Override
- public long extractTimestamp(JSONObject element, long recordTimestamp) {
- return element.getLong("ts");
- }
- }));
-
- // TODO 5 按照mid分组
- KeyedStream
keyedStream = withWatermarkStream.keyBy(new KeySelector() { - @Override
- public String getKey(JSONObject jsonObject) throws Exception {
- return jsonObject.getJSONObject("common").getString("mid");
- }
- });
-
- // TODO 6 定义cep匹配规则
- Pattern
pattern = Pattern.begin("first").where(new IterativeCondition() { - @Override
- public boolean filter(JSONObject jsonObject, Context
ctx) throws Exception { - // 一个会话的开头 -> last_page_id 为空
- String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
- return lastPageId == null;
- }
- }).next("second").where(new IterativeCondition
() { - @Override
- public boolean filter(JSONObject jsonObject, Context
ctx) throws Exception { - // 满足匹配的条件
- // 紧密相连,又一个会话的开头
- String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
- return lastPageId == null;
- }
- }).within(Time.seconds(10L));
-
- // TODO 7 将CEP作用到流上
- PatternStream
patternStream = CEP.pattern(keyedStream, pattern); -
- // TODO 8 提取匹配数据和超时数据
- OutputTag
timeoutTag = new OutputTag("timeoutTag") { - };
- SingleOutputStreamOperator
flatSelectStream = patternStream.flatSelect(timeoutTag, new PatternFlatTimeoutFunction() { - @Override
- public void timeout(Map
> pattern, long timeoutTimestamp, Collector out) throws Exception { - JSONObject first = pattern.get("first").get(0);
- out.collect(first.toJSONString());
- }
- }, new PatternFlatSelectFunction
() { - @Override
- public void flatSelect(Map
> pattern, Collector out) throws Exception { - JSONObject first = pattern.get("first").get(0);
- out.collect(first.toJSONString());
- }
- });
-
- SideOutputDataStream
timeoutStream = flatSelectStream.getSideOutput(timeoutTag); -
- // TODO 9 合并数据写出到kafka
- DataStream
unionStream = flatSelectStream.union(timeoutStream); - String targetTopic = "dwd_traffic_user_jump_detail";
- unionStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, "user_jump_trans"));
-
- // TODO 10 执行任务
- env.execute();
- }
- }
超时数据
9.4 学习域播放事务事实表
DwdLearnPlay、DwdLearnPlayBean
//TODO 1 创建环境设置状态后端
//TODO 2 读取kafka播放日志数据
//TODO 3 清洗转换
//TODO 4 添加水位线
- package com.atguigu.edu.realtime.app.dwd.log;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.bean.DwdLearnPlayBean;
- import com.atguigu.edu.realtime.util.EnvUtil;
- import com.atguigu.edu.realtime.util.KafkaUtil;
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.datastream.WindowedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- import java.time.Duration;
-
- /**
- * @author yhm
- * @create 2023-04-23 14:21
- */
- public class DwdLearnPlay {
- public static void main(String[] args) throws Exception {
- //TODO 1 创建环境设置状态后端
- StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
-
- //TODO 2 读取kafka播放日志数据
- String topicName = "dwd_traffic_play_pre_process";
- String groupId = "dwd_learn_play";
- DataStreamSource
playSource = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId), WatermarkStrategy.noWatermarks(), "learn_play"); -
- //TODO 3 清洗转换
- SingleOutputStreamOperator
learnBeanStream = playSource.flatMap(new FlatMapFunction() { - @Override
- public void flatMap(String value, Collector
out) throws Exception { - try {
- JSONObject jsonObject = JSON.parseObject(value);
- JSONObject common = jsonObject.getJSONObject("common");
- JSONObject appVideo = jsonObject.getJSONObject("appVideo");
- Long ts = jsonObject.getLong("ts");
- DwdLearnPlayBean learnPlayBean = DwdLearnPlayBean.builder()
- .provinceId(common.getString("ar"))
- .brand(common.getString("ba"))
- .channel(common.getString("ch"))
- .isNew(common.getString("is_new"))
- .model(common.getString("md"))
- .machineId(common.getString("mid"))
- .operatingSystem(common.getString("os"))
- .sourceId(common.getString("sc"))
- .sessionId(common.getString("sid"))
- .userId(common.getString("uid"))
- .versionCode(common.getString("vc"))
- .playSec(appVideo.getInteger("play_sec"))
- .videoId(appVideo.getString("video_id"))
- .positionSec(appVideo.getInteger("position_sec"))
- .ts(ts)
- .build();
- out.collect(learnPlayBean);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- //TODO 4 添加水位线
- SingleOutputStreamOperator
withWatermarkStream = learnBeanStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner( - new SerializableTimestampAssigner
() { - @Override
- public long extractTimestamp(DwdLearnPlayBean element, long recordTimestamp) {
- return element.getTs();
- }
- }
- ));
-
- //TODO 5 按照会话id分组
- KeyedStream
keyedStream = withWatermarkStream.keyBy(new KeySelector() { - @Override
- public String getKey(DwdLearnPlayBean value) throws Exception {
- return value.getSessionId();
- }
- });
-
- //TODO 6 聚合统计
- WindowedStream
windowStream = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(3L))); - SingleOutputStreamOperator
reducedStream = windowStream.reduce( - new ReduceFunction
() { - @Override
- public DwdLearnPlayBean reduce(DwdLearnPlayBean value1, DwdLearnPlayBean value2) throws Exception {
- value1.setPlaySec(value1.getPlaySec() + value2.getPlaySec());
- if (value2.getTs() > value1.getTs()) {
- value1.setPositionSec(value2.getPositionSec());
- }
- return value1;
- }
- }, new ProcessWindowFunction
() { - @Override
- public void process(String key, Context context, Iterable
elements, Collector out) throws Exception { - for (DwdLearnPlayBean element : elements) {
- out.collect(element);
- }
- }
- }
- );
-
- //TODO 7 转换结构
- SingleOutputStreamOperator
jsonStrStream = reducedStream.map(JSON::toJSONString); -
- //TODO 8 输出到kafka主题Kafka dwd_learn_play
- String targetTopic = "dwd_learn_play";
- jsonStrStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic,"learn_pay_trans"));
-
- //TODO 9 执行任务
- env.execute();
- }
- }
先启动消费者DwdLearnPlay,再mock数据。
kafka没有消费到数据,DwdLearnPlay:将并发改为1(TODO 1)、改时间(TODO 6,时间改为3s),窗口和并发调小一些。
同一个人看的同一个视频,时间不一样,看的位置也不一样。
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_learn_play
- [atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
- [atguigu@node001 01-onlineEducation]$ ll
- 总用量 30460
- -rw-rw-r-- 1 atguigu atguigu 2223 9月 19 10:43 application.yml
- -rw-rw-r-- 1 atguigu atguigu 4057995 7月 25 10:28 edu0222.sql
- -rw-rw-r-- 1 atguigu atguigu 27112074 7月 25 10:28 edu2021-mock-2022-06-18.jar
- drwxrwxr-x 2 atguigu atguigu 4096 11月 2 11:13 log
- -rw-rw-r-- 1 atguigu atguigu 1156 7月 25 10:44 logback.xml
- -rw-rw-r-- 1 atguigu atguigu 633 7月 25 10:45 path.json
- [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar
- SLF4J: Class path contains multiple SLF4J bindings.
- SLF4J: Found binding in [jar:file:/opt/module/data_mocker/01-onlineEducation/edu2021-mock-2022-06-18.jar!/BOOT-INF/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: Found binding in [jar:file:/opt/module/data_mocker/01-onlineEducation/edu2021-mock-2022-06-18.jar!/BOOT-INF/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- {"brand":"Xiaomi","channel":"xiaomi","isNew":"0","machineId":"mid_293","model":"Xiaomi Mix2 ","operatingSystem":"Android 10.0","playSec":30,"positionSec":690,"provinceId":"18","sessionId":"a1fb6d22-f8ef-40e6-89c2-262cd5a351be","sourceId":"1","ts":1645460612085,"userId":"46","versionCode":"v2.1.134","videoId":"108"}
- {"brand":"Xiaomi","channel":"xiaomi","isNew":"0","machineId":"mid_293","model":"Xiaomi Mix2 ","operatingSystem":"Android 10.0","playSec":30,"positionSec":720,"provinceId":"18","sessionId":"a1fb6d22-f8ef-40e6-89c2-262cd5a351be","sourceId":"1","ts":1645460642085,"userId":"46","versionCode":"v2.1.134","videoId":"108"}
- {
- "brand":"Xiaomi",
- "channel":"xiaomi",
- "isNew":"0",
- "machineId":"mid_293",
- "model":"Xiaomi Mix2 ",
- "operatingSystem":"Android 10.0",
- "playSec":30,
- "positionSec":690,
- "provinceId":"18",
- "sessionId":"a1fb6d22-f8ef-40e6-89c2-262cd5a351be",
- "sourceId":"1",
- "ts":1645460612085,
- "userId":"46",
- "versionCode":"v2.1.134",
- "videoId":"108"
- }
9.5 用户域用户登录事务事实表
9.5.1 主要任务
读取页面日志数据,筛选用户登录记录,写入 Kafka 用户登录主题。
9.5.2 思路分析
9.5.3 图解
DwdUserUserLogin
//TODO 1 创建环境设置状态后端
//TODO 2 读取kafka的dwd_traffic_page_log主题数据
//TODO 3 过滤及转换
//TODO 4 添加水位线
//TODO 5 按照会话id分组
DwdUserUserLogin、DwdUserUserLoginBean
- package com.atguigu.edu.realtime.app.dwd.log;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.bean.DwdUserUserLoginBean;
- import com.atguigu.edu.realtime.util.DateFormatUtil;
- import com.atguigu.edu.realtime.util.EnvUtil;
- import com.atguigu.edu.realtime.util.KafkaUtil;
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.state.StateTtlConfig;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.state.ValueStateDescriptor;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.util.Collector;
-
- import java.time.Duration;
-
- /**
- * @author yhm
- * @create 2023-04-23 16:02
- */
- public class DwdUserUserLogin {
- public static void main(String[] args) throws Exception {
- //TODO 1 创建环境设置状态后端
- StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
-
- //TODO 2 读取kafka的dwd_traffic_page_log主题数据
- String topicName = "dwd_traffic_page_log";
- String groupId = "dwd_user_user_login";
- DataStreamSource
pageStream = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId), WatermarkStrategy.noWatermarks(), "user_login"); -
- //TODO 3 过滤及转换
- SingleOutputStreamOperator
jsonObjStream = pageStream.flatMap(new FlatMapFunction() { - @Override
- public void flatMap(String value, Collector
out) throws Exception { - try {
- JSONObject jsonObject = JSON.parseObject(value);
- if (jsonObject.getJSONObject("common").getString("uid") != null) {
- out.collect(jsonObject);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- //TODO 4 添加水位线
- SingleOutputStreamOperator
withWaterMarkStream = jsonObjStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner() { - @Override
- public long extractTimestamp(JSONObject element, long recordTimestamp) {
- return element.getLong("ts");
- }
- }));
-
- //TODO 5 按照会话id分组
- KeyedStream
keyedStream = withWaterMarkStream.keyBy(new KeySelector() { - @Override
- public String getKey(JSONObject value) throws Exception {
- return value.getJSONObject("common").getString("mid");
- }
- });
-
- //TODO 6 使用状态找出每个会话第一条数据
- SingleOutputStreamOperator
firstStream = keyedStream.process(new KeyedProcessFunction() { - ValueState
firstLoginDtState; -
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- ValueStateDescriptor
valueStateDescriptor = new ValueStateDescriptor<>("first_login_dt", JSONObject.class); - // 添加状态存活时间
- valueStateDescriptor.enableTimeToLive(StateTtlConfig
- .newBuilder(Time.days(1L))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build());
- firstLoginDtState = getRuntimeContext().getState(valueStateDescriptor);
- }
-
- @Override
- public void processElement(JSONObject jsonObject, Context ctx, Collector
out) throws Exception { - // 处理数据
- // 获取状态
- JSONObject firstLoginDt = firstLoginDtState.value();
- Long ts = jsonObject.getLong("ts");
- if (firstLoginDt == null) {
- firstLoginDtState.update(jsonObject);
- // 第一条数据到的时候开启定时器
- ctx.timerService().registerEventTimeTimer(ts + 10 * 1000L);
- } else {
- Long lastTs = firstLoginDt.getLong("ts");
- if (ts < lastTs) {
- firstLoginDtState.update(jsonObject);
- }
- }
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector
out) throws Exception { - super.onTimer(timestamp, ctx, out);
- out.collect(firstLoginDtState.value());
- }
- });
-
- //TODO 7 转换结构
- SingleOutputStreamOperator
mapStream = firstStream.map(new MapFunction() { - @Override
- public String map(JSONObject jsonObj) throws Exception {
- JSONObject common = jsonObj.getJSONObject("common");
- Long ts = jsonObj.getLong("ts");
- String loginTime = DateFormatUtil.toYmdHms(ts);
- String dateId = loginTime.substring(0, 10);
-
- DwdUserUserLoginBean dwdUserUserLoginBean = DwdUserUserLoginBean.builder()
- .userId(common.getString("uid"))
- .dateId(dateId)
- .loginTime(loginTime)
- .channel(common.getString("ch"))
- .provinceId(common.getString("ar"))
- .versionCode(common.getString("vc"))
- .midId(common.getString("mid"))
- .brand(common.getString("ba"))
- .model(common.getString("md"))
- .sourceId(common.getString("sc"))
- .operatingSystem(common.getString("os"))
- .ts(ts)
- .build();
- return JSON.toJSONString(dwdUserUserLoginBean);
- }
- });
-
- //TODO 8 输出数据
- String sinkTopic = "dwd_user_user_login";
- mapStream.sinkTo(KafkaUtil.getKafkaProducer(sinkTopic, "user_login_trans"));
-
- //TODO 9 执行任务
- env.execute();
- }
- }
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_user_user_login
- [atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
- [atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar