• 广播状态实现注意事项


    背景:

    日常我们事件流总要关联上其他的静态数据来组成一条完整的记录,例如事件流+规则表来组合出一条完整的记录流,这个时候规则表就要设置成广播状态的形式来支持快速流操作

    技术实现

    // 广播处理函数
            new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
    
                // 键值分区状态
                private final MapStateDescriptor<String, List<Item>> mapStateDesc =
                        new MapStateDescriptor<>(
                                "items",
                                BasicTypeInfo.STRING_TYPE_INFO,
                                new ListTypeInfo<>(Item.class));
    
                // 广播状态
                private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
                        new MapStateDescriptor<>(
                                "RulesBroadcastState",
                                BasicTypeInfo.STRING_TYPE_INFO,
                                TypeInformation.of(new TypeHint<Rule>() {}));
    
                @Override
                public void processBroadcastElement(Rule value,
                        Context ctx,
                        Collector<String> out) throws Exception {
                    // 更新广播状态
                    ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
                    // 这里不能访问单个键值分区状态,因为广播元素没有对应的键值key,但是这里提供一个函数可以对所有的键值key进行处理
                    ctx.applyToKeyedState(mapStateDesc, new KeyedStateFunction(){
    
                        @Override
                        public void process(Object key, State state) throws Exception {
                            // key是键值, state是状态
                            Color colorKey = (Color) key;
                            final MapState<String, List<Item>> kvMapState = (MapState<String, List<Item>>) state);
                            // 可以对每个键值进行处理
                        }
                    });
                }
    
                @Override
                public void processElement(Item value,
                        ReadOnlyContext ctx,
                        Collector<String> out) throws Exception {
                    // 操作键值分区状态
                    final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
                    final Shape shape = value.getShape();
                    // 操作广播状态
                    for (Map.Entry<String, Rule> entry :
                            ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
                        final String ruleName = entry.getKey();
                        final Rule rule = entry.getValue();
    
                        List<Item> stored = state.get(ruleName);
                        if (stored == null) {
                            stored = new ArrayList<>();
                        }
    
                        if (shape == rule.second && !stored.isEmpty()) {
                            for (Item i : stored) {
                                out.collect("MATCH: " + i + " - " + value);
                            }
                            stored.clear();
                        }
    
                        // there is no else{} to cover if rule.first == rule.second
                        if (shape.equals(rule.first)) {
                            stored.add(value);
                        }
    
                        if (stored.isEmpty()) {
                            state.remove(ruleName);
                        } else {
                            state.put(ruleName, stored);
                        }
                    }
                }
    
                //可以注册定时器
                public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<OUT> out)
                        throws Exception {
                    // the default implementation does nothing.
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    要点总结:

    1.在处理广播元素的方法processBroadcastElement中是没法访问单个键值分区状态的,因为广播元素并没有对应某个键值,但是在该方法中可以对所有的键值状态进行处理,也就是对键值状态进行统一的处理,此时你可以只处理对应的键值(不建议这么做)

    2.处理广播元素的方法processBroadcastElement中更新广播状态时不要依赖于广播元素到达的顺序,当上游算子的并行度大于1时,下游处理广播元素的算子收到的广播元素的顺序有可能不一样

    3.KeyedBroadcastProcessFunction也可以注册计时器,这个计时器是和对应的键值的key关联的

    4.广播状态也可以应用于DataStream,也就是使用BroadcastProcessFunction应用于普通的数据流,而不一定是KeyStreamed

    5.广播状态和代码中使用executor执行器定时更新内存记录的区别是广播状态可以持久化,而使用executor执行器定时更新内存记录可以不依赖于flink的状态管理,比如定时加载配置表到内存中也可以实现类似广播想要达到的效果

  • 相关阅读:
    【C++】详解 eventpp 事件调度程序和回调列表库
    用ChatGPT,快速设计一个真实的账号系统
    Mysql 内外链接,索引,事务,用户管理以及用C语言链接Mysql
    CSDN编程竞赛第八期 | 参赛经历分享
    036、目标检测-锚框
    解决VSCode下载速度特别慢的问题
    多环境与多数据源切换
    18_C++_面向对象_explicit/new/delete关键字_静态成员_this指针_常函数和常对象_友元_单例模式
    C++QT实现压缩文件、文件夹和解压缩操作
    绿色信贷数据合集(更新至2021年)
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133690771