• Flink算子通用状态应用测试样例


    Flink算子通用状态应用测试样例

    1. 获取Flink执行环境
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
        
    
    • 1
    • 2
    • 3

    2. 创建数据源,生成随机数
            DataStream<Map<String, String>> source = env.addSource(new SourceFunction<Map<String, String>>() {
                @Override
                public void run(SourceContext<Map<String, String>> ctx) throws Exception {
                    while (true) {
                        HashMap<String, String> hashMap = new HashMap<>();
                        hashMap.put("ID", new Random().nextInt(3) + 1 + "");
                        hashMap.put("AMT", "1");
                        System.out.println("------");
                        System.out.println("生产数据:" + hashMap);
                        ctx.collect(hashMap);
                        Thread.sleep(1000);
                    }
                }
                @Override
                public void cancel() {}
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3. 按照ID和星期几进行分组
            KeyedStream<Map<String, String>, String> keyedStream = source.keyBy(new KeySelector<Map<String, String>, String>() {
                @Override
                public String getKey(Map<String, String> value) throws Exception {
                    return value.get("ID") + LocalDate.now().getDayOfWeek();
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4. 处理函数,实现状态初始化和元素处理逻辑
            SingleOutputStreamOperator<Map<String, String>> process = keyedStream.process(new KeyedProcessFunction<String, Map<String, String>, Map<String, String>>() {
                private AggregatingState<Map<String, String>, Map<String, String>> aggState;
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    // 配置状态的TTL
                    StateTtlConfig ttlConfig = StateTtlConfig
                            .newBuilder(Time.days(1))
                            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 仅在创建和写入时清除,另一个读和写时清除
                            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不退回过期值
                            .build();
                    // 初始化状态
                    AggregatingStateDescriptor<Map<String, String>, Map<String, String>, Map<String, String>> aggRes = new AggregatingStateDescriptor<>("aggRes", new AggregateFunction<Map<String, String>, Map<String, String>, Map<String, String>>() {
                        @Override
                        public Map<String, String> createAccumulator() {
                            return new HashMap<>();
                        }
    
                        @Override
                        public Map<String, String> add(Map<String, String> in, Map<String, String> acc) {
                            String amt = acc.get("AMT");
                            if (amt == null) {
                                acc.put("ID", in.get("ID"));
                                acc.put("AMT", in.get("AMT"));
                            } else {
                                acc.put("AMT", Integer.valueOf(in.get("AMT")) + Integer.valueOf(amt) + "");
                            }
                            return acc;
                        }
    
                        @Override
                        public Map<String, String> getResult(Map<String, String> acc) {
                            return acc;
                        }
    
                        @Override
                        public Map<String, String> merge(Map<String, String> a, Map<String, String> b) {
                            return null;
                        }
                    }, TypeInformation.of(new TypeHint<Map<String, String>>() {
                    }));
                    aggRes.enableTimeToLive(ttlConfig);
                    aggState = getRuntimeContext().getAggregatingState(aggRes);
                }
    
                @Override
                public void processElement(Map<String, String> value, KeyedProcessFunction<String, Map<String, String>, Map<String, String>>.Context ctx, Collector<Map<String, String>> out) throws Exception {
                    aggState.add(value);
                    out.collect(aggState.get());
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    5. 打印聚合结果
            process.map((MapFunction<Map<String, String>, Object>) value -> {
                System.out.println("聚合结果:" + value);
                return null;
            });
    
    • 1
    • 2
    • 3
    • 4

    6. 执行作业
            env.execute("Flink Common State Test");
    
    • 1

    7. 执行结果
    ------
    生产数据:{AMT=1, ID=2}
    聚合结果:{AMT=1, ID=2}
    ------
    生产数据:{AMT=1, ID=3}
    聚合结果:{AMT=1, ID=3}
    ------
    生产数据:{AMT=1, ID=3}
    聚合结果:{AMT=2, ID=3}
    ------
    生产数据:{AMT=1, ID=1}
    聚合结果:{AMT=1, ID=1}
    ------
    生产数据:{AMT=1, ID=1}
    聚合结果:{AMT=2, ID=1}
    ------
    生产数据:{AMT=1, ID=1}
    聚合结果:{AMT=3, ID=1}
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这段代码实现了一个 Flink 作业,生成随机数据并对数据进行状态聚合处理。其中包括数据源生成、按键分区、状态初始化、元素聚合处理和结果输出。可以作为多场景下通用的实时数据处理模型。

  • 相关阅读:
    磐基2.0搭建es集群
    RibbitMQ学习笔记延迟队列
    项目管理范围(上)
    Git与IDEA: 解决`dev`分支切换问题及其背后原因 为何在IDEA中无法切换到`dev`分支?全面解析!
    Spring Cloud Alibaba 学习笔记
    java运算操作符示例大全
    Mybatis基础支持层-反射模块:ObjectFactory/Property工具类
    Playwright中page.locator快速查找网页元素和对象交互操作
    低代码招投标应用:全程不见面,一次都不跑,快速优化招投标流程
    牛客《算法入门》链表(题解C++)
  • 原文地址:https://blog.csdn.net/qq_36382892/article/details/136378860