• Flink算子通用状态应用测试样例


    Flink算子通用状态应用测试样例

    1. 获取Flink执行环境
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
        
    
    • 1
    • 2
    • 3

    2. 创建数据源,生成随机数
            DataStream<Map<String, String>> source = env.addSource(new SourceFunction<Map<String, String>>() {
                @Override
                public void run(SourceContext<Map<String, String>> ctx) throws Exception {
                    while (true) {
                        HashMap<String, String> hashMap = new HashMap<>();
                        hashMap.put("ID", new Random().nextInt(3) + 1 + "");
                        hashMap.put("AMT", "1");
                        System.out.println("------");
                        System.out.println("生产数据:" + hashMap);
                        ctx.collect(hashMap);
                        Thread.sleep(1000);
                    }
                }
                @Override
                public void cancel() {}
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3. 按照ID和星期几进行分组
            KeyedStream<Map<String, String>, String> keyedStream = source.keyBy(new KeySelector<Map<String, String>, String>() {
                @Override
                public String getKey(Map<String, String> value) throws Exception {
                    return value.get("ID") + LocalDate.now().getDayOfWeek();
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4. 处理函数,实现状态初始化和元素处理逻辑
            SingleOutputStreamOperator<Map<String, String>> process = keyedStream.process(new KeyedProcessFunction<String, Map<String, String>, Map<String, String>>() {
                private AggregatingState<Map<String, String>, Map<String, String>> aggState;
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    // 配置状态的TTL
                    StateTtlConfig ttlConfig = StateTtlConfig
                            .newBuilder(Time.days(1))
                            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 仅在创建和写入时清除,另一个读和写时清除
                            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不退回过期值
                            .build();
                    // 初始化状态
                    AggregatingStateDescriptor<Map<String, String>, Map<String, String>, Map<String, String>> aggRes = new AggregatingStateDescriptor<>("aggRes", new AggregateFunction<Map<String, String>, Map<String, String>, Map<String, String>>() {
                        @Override
                        public Map<String, String> createAccumulator() {
                            return new HashMap<>();
                        }
    
                        @Override
                        public Map<String, String> add(Map<String, String> in, Map<String, String> acc) {
                            String amt = acc.get("AMT");
                            if (amt == null) {
                                acc.put("ID", in.get("ID"));
                                acc.put("AMT", in.get("AMT"));
                            } else {
                                acc.put("AMT", Integer.valueOf(in.get("AMT")) + Integer.valueOf(amt) + "");
                            }
                            return acc;
                        }
    
                        @Override
                        public Map<String, String> getResult(Map<String, String> acc) {
                            return acc;
                        }
    
                        @Override
                        public Map<String, String> merge(Map<String, String> a, Map<String, String> b) {
                            return null;
                        }
                    }, TypeInformation.of(new TypeHint<Map<String, String>>() {
                    }));
                    aggRes.enableTimeToLive(ttlConfig);
                    aggState = getRuntimeContext().getAggregatingState(aggRes);
                }
    
                @Override
                public void processElement(Map<String, String> value, KeyedProcessFunction<String, Map<String, String>, Map<String, String>>.Context ctx, Collector<Map<String, String>> out) throws Exception {
                    aggState.add(value);
                    out.collect(aggState.get());
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    5. 打印聚合结果
            process.map((MapFunction<Map<String, String>, Object>) value -> {
                System.out.println("聚合结果:" + value);
                return null;
            });
    
    • 1
    • 2
    • 3
    • 4

    6. 执行作业
            env.execute("Flink Common State Test");
    
    • 1

    7. 执行结果
    ------
    生产数据:{AMT=1, ID=2}
    聚合结果:{AMT=1, ID=2}
    ------
    生产数据:{AMT=1, ID=3}
    聚合结果:{AMT=1, ID=3}
    ------
    生产数据:{AMT=1, ID=3}
    聚合结果:{AMT=2, ID=3}
    ------
    生产数据:{AMT=1, ID=1}
    聚合结果:{AMT=1, ID=1}
    ------
    生产数据:{AMT=1, ID=1}
    聚合结果:{AMT=2, ID=1}
    ------
    生产数据:{AMT=1, ID=1}
    聚合结果:{AMT=3, ID=1}
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这段代码实现了一个 Flink 作业,生成随机数据并对数据进行状态聚合处理。其中包括数据源生成、按键分区、状态初始化、元素聚合处理和结果输出。可以作为多场景下通用的实时数据处理模型。

  • 相关阅读:
    3款超实用的电脑软件,免费又良心,内存满了也绝不卸载
    低代码助力生产管理:ERP生产管理系统
    18亿欧元大动作,法国瞄准实现量子飞跃
    Android8.1 MTK 去掉锁屏功能
    废液收集系统物联网远程监控解决方案
    maven exclusion 理解
    vim 从嫌弃到依赖(23)——最后的闲扯
    KoTime:v2.3.6-新增当前Java程序占用的内存统计以及页面刷新功能
    Stable Diffusion 秋葉aaaki整合包远程访问设置
    【大数据】-- dataworks 创建odps 的 hudi 外表
  • 原文地址:https://blog.csdn.net/qq_36382892/article/details/136378860