• 【Flink实战】新老用户方案优化使用状态与布隆过滤器的方式


    🚀 作者 :“大数据小禅”

    🚀 文章简介 :新老用户方案优化使用状态与布隆过滤器的方式

    🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


    什么是布隆过滤器

    • 布隆过滤器(Bloom Filter)是一种经过哈希函数处理的数据结构,用于快速判断一个元素是否可能存在于一个集合中。它可以用来检索大规模数据集中的元素,过滤掉不存在的元素,从而减少昂贵的磁盘或网络访问操作。

    • 布隆过滤器的核心思想是使用一个位数组(通常由二进制位组成)和多个哈希函数。当将元素添加到布隆过滤器时,通过哈希函数将元素映射到位数组的多个位置,并将这些位置的二进制位设置为1。当需要查询某个元素是否存在时,同样通过哈希函数将元素映射到位数组的相应位置,并检查这些位置的二进制位,如果所有位置的二进制位都为1,则说明元素可能存在;如果有任何一个位置的二进制位为0,则说明元素一定不存在。

    • 由于布隆过滤器的位数组可以被复用,其空间占用相对较小。同时,通过适当的哈希函数设计和位数组大小的选择,可以控制误判率(即判断元素存在时的假阳性率)。

    • 布隆过滤器的优势在于对于大规模数据集的快速查询和判断,具有高效的时间和空间复杂度。但也存在一定的限制,如不能删除元素、存在一定的误判率以及无法提供元素具体的位置等。

    • 布隆过滤器在实际应用中有许多用途,如缓存击穿防护、恶意网址过滤、URL去重、数据同步检查等。但在使用过程中需要根据具体的应用场景和需求,权衡误判率和空间使用,并合理确定哈希函数的个数和位数组大小,以获得最佳的性能和准确性。

    新的需求:使用Flink 新老用户->状态+布隆过滤器标识

    • 使用布隆过滤器的方式 加上状态管理
    • 读取数据后进行keyby根据设备类型 之后使用process窗口函数进行操作
    
    /**
     * @Description 新老用户统计分析
     *          原来是根据数据中的某个字段
     *          现在我们是根据每个 device(设备) 来判断是否是新老用户
     * 思考 => device放在状态里面去呢?
     * 我们的实现:状态 + 布隆过滤器
     */
    public class OsUserCntAppV3 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = environment.readTextFile("data/access.json");
            environment.setParallelism(1); //设置并行度为1方便观察
            SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
                @Override
                public Access map(String s) throws Exception {
                    // json 转 Access
                    try {
                        return JSON.parseObject(s, Access.class);
                    } catch (Exception e) {
                        e.printStackTrace();
                        return null;
                    }
                }
                //这里是只要不为空的数据  x != null等于把上面的空的数据过滤掉
            }).filter(x -> x != null).filter(new FilterFunction<Access>() {
                @Override
                public boolean filter(Access access) throws Exception {
                    //只过滤出来 event='startup'的数据
                    return "startup".equals(access.event);
                }
            });
            //根据设别类型进行keyBy
            filter.keyBy(x->x.deviceType)
                    //全窗口 key I O
                    .process(new KeyedProcessFunction<String, Access, Access>() {
    
                        private ValueState<BloomFilter<String>> state;
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            //使用布隆过滤器
                            //import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;  注意路径
                            //状态初始化 使用ValueState描述符 布隆过滤器传入设备编号
                            //初始化传入名字和类型
                            ValueStateDescriptor<BloomFilter<String>> descriptor =
                                    new ValueStateDescriptor<>("s", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
                            state = getRuntimeContext().getState(descriptor);
                        }
    
                        @Override
                        public void processElement(Access value, Context context, Collector<Access> collector) throws Exception {
                            //来一条处理一条 "device":"4759947c-cd47-433c-ac8f-ae923a6d38b6" 设备ID
                            String device = value.device;
                            //状态中获取值
                            BloomFilter<String> bloomFilter = state.value();
                            //布隆过滤器固定写法 数据值
                            if(null==bloomFilter){
                                bloomFilter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
                            }
                            //来一条数据判断一次看过滤器是否包含
                            //mightContain() 可能包含    !bloomFilter.mightContain(device) 肯定不包含
                            if(!bloomFilter.mightContain(device)){
                                //不包含放入设备ID
                                bloomFilter.put(device);
                                //1是新用户  这里布直接修改nu字段 应该会造成进来的可能全是 nu=1 这里使用新字段nu2
                                //nu2第一次赋值为1后是新用户  再进来就会给布隆写成0了
                                value.nu2=1;
                                //更新状态
                                state.update(bloomFilter);
                            }
                            //输出
                            collector.collect(value);
                        }
                    }).print();
    
            environment.execute("OsUserCntAppV1");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    核心代码代码详解

    • 这段代码实现了一个新老用户统计分析的需求。原始数据中有大量的设备访问记录,代码通过使用状态和布隆过滤器来判断每个设备是否是新用户。

    • 代码首先读取了一个包含访问记录的文本文件,并将每行数据解析为Access对象。然后通过一系列过滤操作,过滤出其中eventType为"startup"的数据。

    • 接下来,代码根据设备类型进行keyBy操作,并使用全窗口处理函数(KeyedProcessFunction)进行处理。在处理过程中,使用一个布隆过滤器保存已经处理过的设备ID,用于判断设备是否是新用户。代码中通过状态(ValueState)来保存和更新布隆过滤器。

    • 对于每条访问记录,代码会先判断布隆过滤器是否包含该设备ID,如果不包含,则将该设备ID添加到布隆过滤器中,并修改Access对象的字段nu2为1,表示该设备是新用户。最后,输出处理过的Access对象。

    • 通过以上的处理,代码可以对大量的设备访问记录进行分析,判断每个设备是否是新用户,并输出结果。通过使用布隆过滤器来保存已处理过的设备ID,可以在大规模数据集中快速判断设备的新旧状态,提高处理效率。

    结果字段截取

    event='startup', net='WiFi', channel='华为商城', uid='user_1', nu=1, nu2=1, ip='171.11.85.21', 
    
    event='startup', net='WiFi', channel='华为商城', uid='user_1', nu=1, nu2=0, ip='171.11.85.21'
    
    • 1
    • 2
    • 3
  • 相关阅读:
    【wpa_supplicant】driver如何告诉supplicant自己做的一些事情以及结果
    win10环境安装kettle&linux环境安装kettle
    Redis如何实现多可用区?
    黄灰色鱼骨流程图图表合集PPT模板
    【数据结构与算法】链表
    CSS 简介
    Intel GPU Gen 9 架构
    【gzoj8.12综合三】银河摆渡【二分+DP】
    【手撕数据结构】(三)顺序表和链表
    BMP文件格式-笔记
  • 原文地址:https://blog.csdn.net/weixin_45574790/article/details/132860059