• Flink之KeyedState


    前面的文章中介绍过Operator State,这里介绍一下Keyed State.
    在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

    • 需求
      将接收到的Socket数据源中的字符串进行拼接
      在命令行开启socket命令:
      nc -lk 8888
      
      • 1
    • 业务代码
      public class FlinkKeyedState {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度为1,便于观察
            env.setParallelism(1);
            // 开启Checkpoint, 8秒一个周期并开启一次性语义
            env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);
            // 指定checkpoint持久化路径
            env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
            // 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));
            // 获取Socket数据源
            DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
            // 将数据进行分组,将分组key给一个常量值
            SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")
                    // 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等
                    .map(new RichMapFunction<String, String>() {
                        ListState<String> listState;
      
                        // open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            // 获取上下文
                            RuntimeContext ctx = getRuntimeContext();
                            // 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等
                            listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));
                        }
      
                        // 在map方法中正常编写业务逻辑
                        @Override
                        public String map(String s) throws Exception {
                            // 模拟Task失败
                            if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {
                                throw new Exception("Task 异常");
                            }
                            // 将数据添加到状态存储器中
                            listState.add(s);
      
                            Iterable<String> strings = listState.get();
                            StringBuilder builder = new StringBuilder();
                            for (String string : strings) {
                                builder.append(string);
                            }
                            return builder.toString();
                        }
                    });
            map.print();
            env.execute("Keyed State");
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.
  • 相关阅读:
    keepalived+nginx高可用集群
    3D模型怎么贴法线贴图?
    Go基础学习笔记(二):错误处理和资源管理、Goroutine、Channel、迷宫的广度优先搜索、http及其他标准库
    高阶运维管理,这个工具和思路值得看看
    《Head First HTML5 javascript》第6章 函数
    【Linux】常用工具(上)
    决定迭代次数的两种效应
    Vue多级路由的实现
    【Gazebo入门教程】第三讲 SDF文件的静/动态编程建模
    LeetCode 15. 三数之和
  • 原文地址:https://blog.csdn.net/AnameJL/article/details/134463782