• flink去重(一)flink、flink-sql中常见的去重方案总结


    flink、flink-sql中常见的去重方案

    去重计算是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通常可以通过distinct的方式得到去重结果,而实时计算是一种增量、长期计算过程,在面对不同的场景,例如数据量的大小、计算结果精准度要求等可以使用不同的方案。

    1、利用状态去重

    计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID、用户设备ID、点击时间。

    (1)实现步骤:

    • 为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分

    • 数据分组使用广告位ID+点击事件所属的小时

    • 选择processFunction来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量

    • 计算完成之后的数据清理,按照时间进度注册定时器清理

    (2)代码实现

    主程序代码

    package com.yyds.flink_distinct;
    
    import com.yyds.utils.FlinkUtils;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.table.runtime.operators.window.TimeWindow;
    
    import java.time.Duration;
    
    
    /**
     * flink 去重
     *
     * 计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID、用户设备ID(idfa/imei/cookie)、点击时间。
     *
     *
     * 实现步骤:
     * 1、为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分
     * 2、数据分组使用广告位ID+点击事件所属的小时
     * 3、选择processFunction来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量
     * 4、计算完成之后的数据清理,按照时间进度注册定时器清理
     */
    public class _01_MapStateDistinct {
        public static void main(String[] args) throws Exception {
    
            // 1、从kafka中读取数据
            ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
            FlinkUtils.env.setParallelism(1);
            DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(parameterTool, SimpleStringSchema.class);
    
            // 2、将数据进行切分,转换为javaBean
            SingleOutputStreamOperator<_01_AdvertiseMentData> mapStream = kafkaStream.map(new MapFunction<String, _01_AdvertiseMentData>() {
                @Override
                public _01_AdvertiseMentData map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new _01_AdvertiseMentData(Integer.parseInt(arr[0]), arr[1], Long.parseLong(arr[2]));
                }
            });
    
            // 3、注册水位线
            SingleOutputStreamOperator<_01_AdvertiseMentData> watermarkStream = mapStream.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<_01_AdvertiseMentData>forBoundedOutOfOrderness(Duration.ofMinutes(1L)) // 设置延迟时间为1min
                            .withTimestampAssigner(new SerializableTimestampAssigner<_01_AdvertiseMentData>() {
                                @Override
                                public long extractTimestamp(_01_AdvertiseMentData element, long recordTimestamp) {
                                    return element.getTime();
                                }
                            })
            );
    
            // 4、分组(广告id、窗口结束时间)
            KeyedStream<_01_AdvertiseMentData, _01_AdKey> keyedStream = watermarkStream.keyBy(new KeySelector<_01_AdvertiseMentData, _01_AdKey>() {
                @Override
                public _01_AdKey getKey(_01_AdvertiseMentData data) throws Exception {
    
                    int id = data.getId();
                    // 时间的转换选择TimeWindow.getWindowStartWithOffset Flink在处理window中自带的方法,使用起来很方便,
                    // 第一个参数 表示数据时间,
                    // 第二个参数offset偏移量,默认为0,正常窗口划分都是整点方式,例如从0开始划分,这个offset就是相对于0的偏移量,
                    // 第三个参数表示窗口大小,得到的结果是数据时间所属窗口的开始时间,这里加上了窗口大小,使用结束时间与广告位ID作为分组的Key。
                    long endTime = TimeWindow.getWindowStartWithOffset(data.getTime(), 0, Time.hours(1).toMilliseconds()) + Time.hours(1).toMilliseconds();
                    return new _01_AdKey(id, endTime);
                }
            });
    
    
            keyedStream.process(new _01_DistinctProcessFunction());
    
    
            FlinkUtils.env.execute("_01_MapStateDistinct");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    udf代码

    package com.yyds.flink_distinct;
    
    import org.apache.flink.api.common.state.*;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    /**
     * 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后输出
     */
    public class _01_DistinctProcessFunction extends KeyedProcessFunction<_01_AdKey,_01_AdvertiseMentData,Void> {
    
    
        // 定义第一个状态MapState
        MapState<String,Integer> deviceIdState ;
        // 定义第二个状态ValueState
        ValueState<Long> countState ;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
            /*
              MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,
                如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,
                mapstate中value作为rocksdb中的value, rocksdb中value大小是有上限的,这种方式可以减少rocksdb value的大小;
             */
            deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);
    
            ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
            /*
              ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。
             */
            countState = getRuntimeContext().getState(countStateDescriptor);
    
        }
    
        @Override
        public void processElement(_01_AdvertiseMentData data, Context context, Collector<Void> collector) throws Exception {
            // 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果
            long currw = context.timerService().currentWatermark();
            if(context.getCurrentKey().getTime() + 1 <= currw){
                System.out.println("迟到的数据:" + data);
                return;
            }
    
            String devId = data.getDevId();
            Integer i = deviceIdState.get(devId);
            if(i == null){
                i = 0;
            }
    
            if(  i == 1  ){
                // 表示已经存在
            }else {
                // 表示不存在,放入到状态中
                deviceIdState.put(devId,1);
                // 将统计的数据 + 1
                Long count = countState.value();
    
                if(count == null){
                    count = 0L;
                }
                count ++;
                countState.update(count);
                // 注册一个定时器,定期清理状态中的数据
                context.timerService().registerEventTimeTimer(context.getCurrentKey().getTime() + 1);
            }
    
            System.out.println("countState.value() = " + countState.value());
        }
    
    
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Void> out) throws Exception {
            System.out.println(timestamp + " exec clean~~~");
            System.out.println("countState.value() = " + countState.value());
            // 清除状态
            deviceIdState.clear();
            countState.clear();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    javaBean

    package com.yyds.flink_distinct;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class _01_AdKey {
        private int id;
        private Long time;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2、利用Flink Sql进行去重

    使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,
    对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本,我们更多希望能够以sql的方式提供给业务开发完成自己的去重逻辑。

    package com.yyds.flink_distinct;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * 使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,
     * 对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本,我们更多希望能够以sql的方式提供给业务开发完成自己的去重逻辑
     */
    public class _02_DistinctFlinkSQL {
        public static void main(String[] args) {
    
            // 创建表的执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
            // 从kafka中读取数据
            String sourceTable = "CREATE TABLE source_table (\n" +
                    "  `id` string,\n" +
                    "  `devId` string,\n" +
                    "  `ts` bigint,\n" +
                    "   `rt` AS TO_TIMESTAMP_LTZ(ts, 3) ,\n" +
                    "   watermark for rt as rt - interval '60' second \n" +
                    ") WITH (\n" +
                    "  'connector' = 'kafka',\n" +
                    "  'topic' = 'ad',\n" +
                    "  'properties.bootstrap.servers' = 'hadoop01:9092',\n" +
                    "  'properties.group.id' = 'testGroup',\n" +
                    "  'scan.startup.mode' = 'earliest-offset',\n" +
                    "  'format' = 'csv'\n" +
                    ")";
    
            tenv.executeSql(sourceTable);
    
    
            String selectSql = "select \n" +
                    "  window_start,\n" +
                    "  window_end,\n" +
                    "  count(distinct devId) as cnt\n" +
                    "from table (\n" +
                    "  tumble(table source_table,descriptor(rt),interval '60' minute ) --滚动窗口 \n" +
                    ")\n" +
                    "group by window_start,window_end";
    
            tenv.executeSql(selectSql).print();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    3、利用HyperLogLog进行去重(或者布隆过滤器,Flink-sql注册udaf函数)

    关于HyperLogLog算法原理可以参考:https://www.jianshu.com/p/55defda6dcd2

    udaf函数的实现

    package com.yyds.flink_distinct;
    
    import com.clearspring.analytics.stream.cardinality.HyperLogLog;
    import org.apache.flink.table.functions.AggregateFunction;
    
    /**
     * 自定义udaf函数
     *
     * 返回类型是long 也就是去重的结果,accumulator是一个HyperLogLog类型的结构
     */
    public class _03_HLLDistinctFunction extends AggregateFunction<Long, HyperLogLog> {
        @Override
        public HyperLogLog createAccumulator() {
            return new HyperLogLog(0.001);
        }
    
    
        public void accumulate(HyperLogLog hll,String id){
            hll.offer(id);
        }
    
        @Override
        public Long getValue(HyperLogLog hll) {
            return hll.cardinality();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    flink-sql实现

    package com.yyds.flink_distinct;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * 使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,
     * 对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本,我们更多希望能够以sql的方式提供给业务开发完成自己的去重逻辑
     */
    public class _03_HLLFlinkSQL {
        public static void main(String[] args) {
    
            // 创建表的执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
            // 注册udaf函数
            tenv.registerFunction("hllDistinct",new _03_HLLDistinctFunction());
    
            // 从kafka中读取数据
            String sourceTable = "CREATE TABLE source_table (\n" +
                    "  `id` string,\n" +
                    "  `devId` string,\n" +
                    "  `ts` bigint,\n" +
                    "   `rt` AS TO_TIMESTAMP_LTZ(ts, 3) ,\n" +
                    "   watermark for rt as rt - interval '60' second \n" +
                    ") WITH (\n" +
                    "  'connector' = 'kafka',\n" +
                    "  'topic' = 'ad',\n" +
                    "  'properties.bootstrap.servers' = 'hadoop01:9092',\n" +
                    "  'properties.group.id' = 'testGroup',\n" +
                    "  'scan.startup.mode' = 'earliest-offset',\n" +
                    "  'format' = 'csv'\n" +
                    ")";
    
            tenv.executeSql(sourceTable);
    
    
            String selectSql = "select \n" +
                    "  window_start,\n" +
                    "  window_end,\n" +
                    "  hllDistinct(distinct devId) as cnt\n" +
                    "from table (\n" +
                    "  tumble(table source_table,descriptor(rt),interval '1' minute ) --滚动窗口 \n" +
                    ")\n" +
                    "group by window_start,window_end";
    
            tenv.executeSql(selectSql).print();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    4、利用HyperLogLog进行去重(优化版本)

    在HyperLogLog去重实现中,如果要求误差在0.001以内,那么就需要1048576个int,
    也就是会消耗4M的存储空间,但是在实际使用中有很多的维度的统计是达不到这个数据量,那么可以在这里做一个优化
    优化方式是:初始HyperLogLog内部使用存储是一个set集合,当set大小达到了指定大小(1048576)就转换为HyperLogLog存储方式。
    这种方式可以有效减小内存消耗。

    package com.yyds.flink_distinct;
    
    import com.clearspring.analytics.hash.MurmurHash;
    import com.clearspring.analytics.stream.cardinality.HyperLogLog;
    
    import java.util.HashSet;
    import java.util.Set;
    
    /**
     * HLL 优化
     *
     *
     * 在HyperLogLog去重实现中,如果要求误差在0.001以内,那么就需要1048576个int,
     * 也就是会消耗4M的存储空间,但是在实际使用中有很多的维度的统计是达不到这个数据量,那么可以在这里做一个优化
     *
     *
     * 优化方式是:初始HyperLogLog内部使用存储是一个set集合,当set大小达到了指定大小(1048576)就转换为HyperLogLog存储方式。这种方式可以有效减小内存消耗。
     */
    public class _04_HLLOptimization {
        //hyperloglog结构
        private HyperLogLog hyperLogLog;
        //初始的一个set
        private Set<Integer> set;
    
        private double rsd;
    
        //hyperloglog的桶个数,主要内存占用
        private int bucket;
    
        public _04_HLLOptimization(double rsd){
            this.rsd=rsd;
    //        this.bucket=1 << HyperLogLog.log2m(rsd);
            this.bucket=1 << (int)(Math.log(1.106D / rsd * (1.106D / rsd)) / Math.log(2.0D));
            set=new HashSet<>();
        }
    
        //插入一条数据
        public void offer(Object object){
            final int x = MurmurHash.hash(object);
            int currSize=set.size();
            if(hyperLogLog==null && currSize+1>bucket){
                //升级为hyperloglog
                hyperLogLog=new HyperLogLog(rsd);
                for(int d: set){
                    hyperLogLog.offerHashed(d);
                }
                set.clear();
            }
    
            if(hyperLogLog!=null){
                hyperLogLog.offerHashed(x);
            }else {
                set.add(x);
            }
        }
    
        //获取大小
        public long cardinality() {
            if(hyperLogLog!=null) return hyperLogLog.cardinality();
            return set.size();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    自定义udaf函数

    package com.yyds.flink_distinct;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    /**
     * 自定义udaf函数
     */
    public class _04_HLLDistinctFunctionPlus extends AggregateFunction<Long,_04_HLLOptimization> {
    
        @Override
        public _04_HLLOptimization createAccumulator() {
            return new _04_HLLOptimization(0.001);
        }
    
    
        public void accumulate(_04_HLLOptimization hll,String id){
            hll.offer(id);
        }
    
    
        @Override
        public Long getValue(_04_HLLOptimization hll) {
            return hll.cardinality();
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    主程序

    package com.yyds.flink_distinct;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * 使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,
     * 对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本,我们更多希望能够以sql的方式提供给业务开发完成自己的去重逻辑
     *
     *
     * 在HyperLogLog去重实现中,如果要求误差在0.001以内,那么就需要1048576个int,
     *   也就是会消耗4M的存储空间,但是在实际使用中有很多的维度的统计是达不到这个数据量,那么可以在这里做一个优化
     *
     *   优化方式是:初始HyperLogLog内部使用存储是一个set集合,当set大小达到了指定大小(1048576)就转换为HyperLogLog存储方式。这种方式可以有效减小内存消耗。
     */
    public class _04_HLLFlinkSQLPlus {
        public static void main(String[] args) {
    
            // 创建表的执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
            // 注册udaf函数
            tenv.registerFunction("hllDistinctPlus",new _04_HLLDistinctFunctionPlus());
    
            // 从kafka中读取数据
            String sourceTable = "CREATE TABLE source_table (\n" +
                    "  `id` string,\n" +
                    "  `devId` string,\n" +
                    "  `ts` bigint,\n" +
                    "   `rt` AS TO_TIMESTAMP_LTZ(ts, 3) ,\n" +
                    "   watermark for rt as rt - interval '60' second \n" +
                    ") WITH (\n" +
                    "  'connector' = 'kafka',\n" +
                    "  'topic' = 'ad',\n" +
                    "  'properties.bootstrap.servers' = 'hadoop01:9092',\n" +
                    "  'properties.group.id' = 'testGroup',\n" +
                    "  'scan.startup.mode' = 'earliest-offset',\n" +
                    "  'format' = 'csv'\n" +
                    ")";
    
            tenv.executeSql(sourceTable);
    
    
            String selectSql = "select \n" +
                    "  window_start,\n" +
                    "  window_end,\n" +
                    "  hllDistinctPlus(distinct devId) as cnt\n" +
                    "from table (\n" +
                    "  tumble(table source_table,descriptor(rt),interval '1' minute ) --滚动窗口 \n" +
                    ")\n" +
                    "group by window_start,window_end";
    
            tenv.executeSql(selectSql).print();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    5、利用BitMap精确去重进行去重

    在前面提到的精确去重方案都是会保存全量的数据,但是这种方式是以牺牲存储为代价的,
    而hyperloglog方式虽然减少了存储但是损失了精度,
    那么如何能够做到精确去重又能不消耗太多的存储呢,可以使用bitmap做精确去重。

    package com.yyds.flink_distinct;
    
    import org.roaringbitmap.longlong.Roaring64NavigableMap;
    
    
    /**
     * 自定义精确去重计算器
     */
    public class _05_PreciseAccumulator {
    
        private Roaring64NavigableMap bitmap;
    
        public _05_PreciseAccumulator(){
            bitmap=new Roaring64NavigableMap();
        }
    
        public void add(long id){
            bitmap.addLong(id);
        }
    
        public long getCardinality(){
            return bitmap.getLongCardinality();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    package com.yyds.flink_distinct;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    /**
     * 自定义udaf函数
     */
    public class _05_BitMapDistinctFunction extends AggregateFunction<Long,_05_PreciseAccumulator> {
    
        @Override
        public _05_PreciseAccumulator createAccumulator() {
            return new _05_PreciseAccumulator();
        }
    
    
        public void accumulate(_05_PreciseAccumulator preciseAccumulator,String id){
            // 注意:此处有问题,用hash可能会存在冲突,需要用别的算法,生成唯一id
            preciseAccumulator.add(Long.parseLong(id.hashCode()+""));
        }
    
    
        @Override
        public Long getValue(_05_PreciseAccumulator preciseAccumulator) {
            return preciseAccumulator.getCardinality();
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    package com.yyds.flink_distinct;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    /**
     * 在前面提到的精确去重方案都是会保存全量的数据,但是这种方式是以牺牲存储为代价的,
     *
     * 而hyperloglog方式虽然减少了存储但是损失了精度,
     *
     * 那么如何能够做到精确去重又能不消耗太多的存储呢,可以使用bitmap做精确去重。
     */
    public class _05_BitMapDistinct {
        public static void main(String[] args) {
    
    
            // 创建表的执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
            // 注册udaf函数
            tenv.registerFunction("bitmapDistinct",new _05_BitMapDistinctFunction());
    
            // 从kafka中读取数据
            String sourceTable = "CREATE TABLE source_table (\n" +
                    "  `id` string,\n" +
                    "  `devId` string,\n" +
                    "  `ts` bigint,\n" +
                    "   `rt` AS TO_TIMESTAMP_LTZ(ts, 3) ,\n" +
                    "   watermark for rt as rt - interval '60' second \n" +
                    ") WITH (\n" +
                    "  'connector' = 'kafka',\n" +
                    "  'topic' = 'ad',\n" +
                    "  'properties.bootstrap.servers' = 'hadoop01:9092',\n" +
                    "  'properties.group.id' = 'testGroup',\n" +
                    "  'scan.startup.mode' = 'earliest-offset',\n" +
                    "  'format' = 'csv'\n" +
                    ")";
    
            tenv.executeSql(sourceTable);
    
    
            String selectSql = "select \n" +
                    "  window_start,\n" +
                    "  window_end,\n" +
                    "  bitmapDistinct(distinct devId) as cnt\n" +
                    "from table (\n" +
                    "  tumble(table source_table,descriptor(rt),interval '1' minute ) --滚动窗口 \n" +
                    ")\n" +
                    "group by window_start,window_end";
    
            tenv.executeSql(selectSql).print();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
  • 相关阅读:
    【django+vue】项目搭建、解决跨域访问
    mfc140.dll丢失如何修复,分享多种有效的修复方法
    如何系统地自学Python?
    818专业课【考经】—《信号系统》之章节概要:第九章 系统的状态变量分析法
    ai studio的基本操作
    Linux基础操作
    各类经典VRP,车间调度问题,组合优化问题基准测试集Benchmark
    F5.5G落进现实:目标网带来的光之路
    【汽修帮手】数据采集,爬虫,根据pdf文件流合并pdf文件
    使用 Apache Commons Exec 管理外部进程
  • 原文地址:https://blog.csdn.net/qq_44665283/article/details/125980572