• 8.Flink实时项目之CEP计算访客跳出


    1.访客跳出明细介绍

    首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么就要抓住几个特征:

    该页面是用户近期访问的第一个页面,这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。

    首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问

    这第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?

    最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识别某个事件。

    用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。

    • 流程图

    2.代码实现

    创建任务类UserJumpDetailApp.java,从kafka读取页面日志

    import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    ​
    /**
     * @author zhangbao
     * @date 2021/10/17 10:38
     * @desc
     */
    public class UserJumpDetailApp {
        public static void main(String[] args) {
            //webui模式,需要添加pom依赖
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    //        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
            //设置并行度
            env.setParallelism(4);
            //设置检查点
    //        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    //        env.getCheckpointConfig().setCheckpointTimeout(60000);
    //        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
    //        //指定哪个用户读取hdfs文件
    //        System.setProperty("HADOOP_USER_NAME","zhangbao");
    ​
            //从kafka读取数据源
            String sourceTopic = "dwd_page_log";
            String group = "user_jump_detail_app_group";
            String sinkTopic = "dwm_user_jump_detail";
            FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
            DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
    ​
            kafkaDs.print("user jump detail >>>");
    ​
            try {
                env.execute("user jump detail task");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    3. flink CEP编程

    官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/libs/cep.html

    处理流程

    1.从kafka读取日志数据

    2.设定时间语义为事件时间并指定事件时间字段ts

    3.按照mid分组

    4.配置CEP表达式

    • 1.第一次访问的页面:last_page_id == null

    • 2.第一次访问的页面在10秒内,没有进行其他操作,没有访问其他页面

    5.根据表达式筛选流

    6.提取命中的数据

    • 设定超时时间标识 timeoutTag

    • flatSelect 方法中,实现 PatternFlatTimeoutFunction 中的 timeout 方法。

    • 所有 out.collect 的数据都被打上了超时标记

    • 本身的 flatSelect 方法因为不需要未超时的数据所以不接受数据。

    • 通过 SideOutput 侧输出流输出超时数据

    7.将跳出数据写回到kafka

    package com.zhangbao.gmall.realtime.app.dwm;
    ​
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkGenerator;
    import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternFlatSelectFunction;
    import org.apache.flink.cep.PatternFlatTimeoutFunction;
    import org.apache.flink.cep.PatternStream;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    ​
    import java.util.List;
    import java.util.Map;
    ​
    /**
     * @author zhangbao
     * @date 2021/10/17 10:38
     * @desc
     */
    public class UserJumpDetailApp {
        public static void main(String[] args) {
            //webui模式,需要添加pom依赖
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    //        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
            //设置并行度
            env.setParallelism(4);
            //设置检查点
    //        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    //        env.getCheckpointConfig().setCheckpointTimeout(60000);
    //        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
    //        //指定哪个用户读取hdfs文件
    //        System.setProperty("HADOOP_USER_NAME","zhangbao");
    ​
            //从kafka读取数据源
            String sourceTopic = "dwd_page_log";
            String group = "user_jump_detail_app_group";
            String sinkTopic = "dwm_user_jump_detail";
            FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
            DataStreamSource<String> jsonStrDs = env.addSource(kafkaSource);
    ​
            /*//测试数据
            DataStream<String> jsonStrDs = env
             .fromElements(
                    "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                    "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
    ​
                    "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                            "\"home\"},\"ts\":15000} ",
    ​
                    "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                            "\"detail\"},\"ts\":30000} "
            );
            dataStream.print("in json:");*/
    ​
            //对读取到的数据进行结构转换
            SingleOutputStreamOperator<JSONObject> jsonObjDs = jsonStrDs.map(jsonStr -> JSON.parseObject(jsonStr));
    ​
    //        jsonStrDs.print("user jump detail >>>");
            //从flink1.12开始,时间语义默认是事件时间,不需要额外指定,如果是之前的版本,则要按以下方式指定事件时间语义
            //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    ​
            //指定事件时间字段
            SingleOutputStreamOperator<JSONObject> jsonObjWithTSDs = jsonObjDs.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<JSONObject>forMonotonousTimestamps().withTimestampAssigner(
                            new SerializableTimestampAssigner<JSONObject>() {
                                @Override
                                public long extractTimestamp(JSONObject jsonObject, long l) {
                                    return jsonObject.getLong("ts");
                                }
                            }
            ));
    ​
            //按照mid分组
            KeyedStream<JSONObject, String> ketByDs = jsonObjWithTSDs.keyBy(
                    jsonObject -> jsonObject.getJSONObject("common").getString("mid")
            );
    ​
            /**
             * flink CEP表达式
             * 跳出规则,满足两个条件:
             *  1.第一次访问的页面:last_page_id == null
             *  2.第一次访问的页面在10秒内,没有进行其他操作,没有访问其他页面
             */
            Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first")
                    .where( // 1.第一次访问的页面:last_page_id == null
                        new SimpleCondition<JSONObject>() {
                            @Override
                            public boolean filter(JSONObject jsonObject) throws Exception {
                                String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
                                System.out.println("first page >>> "+lastPageId);
                                if (lastPageId == null || lastPageId.length() == 0) {
                                    return true;
                                }
                                return false;
                            }
                        }
                    ).next("next")
                    .where( //2.第一次访问的页面在10秒内,没有进行其他操作,没有访问其他页面
                            new SimpleCondition<JSONObject>() {
                                @Override
                                public boolean filter(JSONObject jsonObject) throws Exception {
                                    String pageId = jsonObject.getJSONObject("page").getString("page_id");
                                    System.out.println("next page >>> "+pageId);
                                    if(pageId != null && pageId.length()>0){
                                        return true;
                                    }
                                    return false;
                                }
                            }
                    //时间限制模式,10S
                    ).within(Time.milliseconds(10000));
    ​
            //将cep表达式运用到流中,筛选数据
            PatternStream<JSONObject> patternStream = CEP.pattern(ketByDs, pattern);
    ​
            //从筛选的数据中再提取数据超时数据,放到侧输出流中
            OutputTag<String> timeOutTag = new OutputTag<String>("timeOut"){};
            SingleOutputStreamOperator<Object> outputStreamDS = patternStream.flatSelect(
                    timeOutTag,
                    //获取超时数据
                    new PatternFlatTimeoutFunction<JSONObject, String>() {
                        @Override
                        public void timeout(Map<String, List<JSONObject>> map, long l, Collector<String> collector) throws Exception {
                            List<JSONObject> first = map.get("first");
                            for (JSONObject jsonObject : first) {
                                System.out.println("time out date >>> "+jsonObject.toJSONString());
                                //所有 out.collect 的数据都被打上了超时标记
                                collector.collect(jsonObject.toJSONString());
                            }
                        }
                    },
                    //获取未超时数据
                    new PatternFlatSelectFunction<JSONObject, Object>() {
                        @Override
                        public void flatSelect(Map<String, List<JSONObject>> map, Collector<Object> collector) throws Exception {
                            //不超时的数据不提取,所以这里不做操作
                        }
                    }
            );
    ​
            //获取侧输出流的超时数据
            DataStream<String> timeOutDs = outputStreamDS.getSideOutput(timeOutTag);
            timeOutDs.print("jump >>> ");
            
            //将跳出数据写回到kafka
            timeOutDs.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));
    ​
            try {
                env.execute("user jump detail task");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    ​

    测试数据

    将从kafka读取数据的方式切换成固定数据内容,如下:

    //测试数据
            DataStream<String> jsonStrDs = env
             .fromElements(
                    "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                    "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
    ​
                    "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                            "\"home\"},\"ts\":15000} ",
    ​
                    "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                            "\"detail\"},\"ts\":30000} "
            );
            dataStream.print("in json:");

    然后从dwm_user_jump_detail主题消费数据

    ./kafka-console-consumer.sh --bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic dwm_user_jump_detail

  • 相关阅读:
    记录因Sharding Jdbc批量操作引发的一次fullGC
    MVVM与Vue响应式的实现
    【Spring boot 导入 spring XML 配置文件】
    【Unity】【VR】详解Oculus Integration输入
    TensorFlow Lite Micro简介与使用
    【DRAM存储器五】DRAM存储器的架构演进-part2
    中英文说明书丨Abbkine细胞迁移分析试剂盒
    微服务系列:分布式文件存储之 MinIO 入门指南
    UOS_arm64安装qt5开发环境
    使用Python从零实现多分类SVM
  • 原文地址:https://www.cnblogs.com/zhangbaohpu/p/15987524.html