• 大数据-玩转数据-Flink页面广告点击量统计


    一、应用场景

    电商网站的市场营销商业指标中,除了自身的APP推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。
    对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
    在之PV,PU统计中,已经统计的广告的点击次数总和,但是没有实现窗口操作,并且也未增加排名处理.

    二、实现代码及解析

    package com.lyh.flink11;
    
    import com.lyh.bean.AdsClickLog;
    import com.mysql.cj.x.protobuf.MysqlxDatatypes;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Comparator;
    
    public class pro_High_Ads {
        public static void main(String[] args) throws Exception {
            //创建流式执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //运行环境的默认并发数
            env.setParallelism(2);
            // 创建WatermarkStrategy水印策略
            WatermarkStrategy<AdsClickLog> wms = WatermarkStrategy
                    //指定Watermark生成策略,最大延迟长度20秒
                    .<AdsClickLog>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                    //SerializableTimestampAssigner接口中实现了extractTimestamp方法来指定如何从事件数据中抽取时间戳
                    .withTimestampAssigner(new SerializableTimestampAssigner<AdsClickLog>() {
                        @Override
                        public long extractTimestamp(AdsClickLog element, long recordTimestamp) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            // 读取文件,源数据读取
            env.readTextFile("input/AdClickLog.csv")
                    // map算子,将数据流中的数据进行转换, 形成新的数据流
                    .map(line -> {
                        String[] data = line.split(",");
                        return new AdsClickLog(
                          Long.valueOf(data[0]),
                          Long.valueOf(data[1]),
                          data[2],
                          data[3],
                          Long.valueOf(data[4]));
                        //  指定水印和时间戳
                    }).assignTimestampsAndWatermarks(wms)
                    // 数据处理,按广告ID,省份 分组,输入AdsClickLog格式,输出Tuple2格式,从而获取省份、广告ID
            .keyBy(new KeySelector<AdsClickLog, Tuple2<String,Long>>() {
                @Override
                public Tuple2<String, Long> getKey(AdsClickLog log) throws Exception {
                    return Tuple2.of(log.getProvince(),log.getAdId());
                }
                // 按照EventTime分配窗口,SlidingEventTimeWindows 时间滑动窗口,窗口大小3秒,步长1秒
            }).window(SlidingEventTimeWindows.of(Time.seconds(3),Time.seconds(1)))
            //allowedLateness就是针对event time而言,
            // 对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据
                    .allowedLateness(Time.seconds(2))
                    //sideOutputLateData() 侧输出流:是一个兜底方案,数据延迟严重,可以保证数据不丢失
            .sideOutputLateData(new OutputTag<AdsClickLog>("ads_late"){
            })
             //聚合出当前点击量在时间窗口内的统计数量 参数1:统计函数  参数2:窗口函数
            .aggregate(new AggregateFunction<AdsClickLog, Long, Long>() {
                           // 创建一个新的累加器,开始一个新的聚合。累加器是正在运行的聚合的状态。累计器初始值为0
                           @Override
                           public Long createAccumulator() {
                               return 0L;
                           }
    
                           // 将给定的输入添加到给定的累加器,并返回新的累加器值。
                           @Override
                           public Long add(AdsClickLog value, Long acc) {
                               return acc + 1L;
                           }
                            //从累加器获取聚合结果。
                           @Override
                           public Long getResult(Long acc) {
                               return acc;
                           }
    
                           // 如果有两个分区 合并两个分区的数据.合并两个累加器,返回合并后的累加器的状态
                           @Override
                           public Long merge(Long a, Long b) {
                               return a + b;
                           }
                       },
                        // 输入、输出、key、窗口
                    new ProcessWindowFunction<Long, Tuple4<String,Long,Long,Long>, Tuple2<String, Long>, TimeWindow>() {
    
                           @Override
                           public void process(Tuple2<String, Long> key,
                                               Context ctx,
                                               Iterable<Long> elements,
                                               Collector<Tuple4<String, Long, Long, Long>> out) throws Exception {
                               out.collect(Tuple4.of(key.f0,key.f1,elements.iterator().next(),ctx.window().getEnd()));
    
                           }
                       }).keyBy(t -> t.f3)
                    // 参数1:key 类型 , 参数2:输入 参数3:输出
                    //KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。
                    // 它还提供了计时器的功能,在特定场景下,非常适合
                    //KeyedProcessFunction继承AbstractRichFunction,它和ProcessFunction类似,
                    // 都有processElement()、onTimer(),且都是富函数,自然有open()和close()方法
            .process(new KeyedProcessFunction<Long, Tuple4<String, Long, Long, Long>, Object>() {
                private ValueState<Long> windowEnd;
                private ListState<Tuple4<String, Long, Long, Long>> datas;
                @Override
                public void open(Configuration parameters) throws Exception {
                    datas = getRuntimeContext().getListState(new ListStateDescriptor<Tuple4<String, Long, Long, Long>>("datas", TypeInformation.of(new TypeHint<Tuple4<String, Long, Long, Long>>() {
                    })));
                    windowEnd = getRuntimeContext().getState(new ValueStateDescriptor<Long>("windowEed", Long.class));
                }
    
                @Override
                public void processElement(Tuple4<String, Long, Long, Long> value,
                                           Context ctx,
                                           Collector<Object> out) throws Exception {
                    datas.add(value);
                    if (windowEnd.value() == null) {
                        ctx.timerService().registerProcessingTimeTimer(value.f3 + 10L);
                        windowEnd.update(value.f3);
                    }
                }
    
                @Override
                public void onTimer(long timestamp,
                                    OnTimerContext ctx,
                                    Collector<Object> out) throws Exception {
                    ArrayList<Tuple4<String,Long,Long,Long>>  result =  new ArrayList<>();
                    for (Tuple4<String, Long, Long, Long> t : datas.get()) {
                        result.add(t);
                    }
                    // 清空状态
                    windowEnd.clear();
                    datas.clear();
                    //排序, 取top3
                    result.sort(new Comparator<Tuple4<String, Long, Long, Long>>() {
                        @Override
                        public int compare(Tuple4<String, Long, Long, Long> o1, Tuple4<String, Long, Long, Long> o2) {
                            return (int) (o2.f2 - o1.f2);
                        }
                    });
                    StringBuilder sb = new StringBuilder();
                    sb.append("窗口时间结束").append(timestamp - 10L).append("\n");
                    sb.append("------------------------------");
                    for (int i = 0 ; i < Math.min(3,result.size()); i++){
                        sb.append(result.get(i)).append("\n");
                    }
                    sb.append("------------------");
                    out.collect(sb.toString());
                }
            }).print();
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
  • 相关阅读:
    (学习日记)2022.8.11
    2023-2028年中国硫酸铝钾市场发展态势及未来发展趋势报告
    Web前端大作业——基于HTML+CSS+JavaScript仿英雄联盟LOL游戏网站
    Python 潮流周刊#21:如何提升及测量 Python 代码的性能?
    BLE广播事件包解析&空口事例
    mysql源码分析——聚簇索引
    day01 Linux
    C51--串口发送中断请求
    多点DMALL × Apache Kyuubi:构建统一SQL Proxy探索实践
    使用xmake配合arm-none-eabi-gcc构建stm32工程
  • 原文地址:https://blog.csdn.net/s_unbo/article/details/132922520