• Flink之OperatorState


    Flink中状态主要分为三种:

    • Operator State(算子状态)
    • Keyed State(键控状态)
    • Broadcast State(广播状态)

    这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解

    • 数据源
      这里选用Socket作为Source输入,便于测试
      ➜  ~ nc -lk 8888
      a
      b
      c
      k
      k
      k
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • 状态算子代码
      /**
      * @Description TODO 自定义状态MapFunc
      **/
      // 状态算子必须要实现对应的算子接口和CheckpointFunction接口
      class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{
        private ListState<String> strListState;
      
        /**
         * @Param o
         * @return String
         * @Description TODO map方法的正常处理逻辑
        **/
        @Override
        public String map(String s) throws Exception {
            // 模拟Task失败
            if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {
                throw new Exception("Task 异常");
            }
            // 将数据添加到状态存储器中
            strListState.add(s);
      
            Iterable<String> strings = strListState.get();
            StringBuilder builder = new StringBuilder();
            for (String string : strings) {
                builder.append(string);
            }
            return builder.toString();
        }
      
        /**
         * @Param functionSnapshotContext
         * @return void
         * @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控
        **/
        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());
        }
      
        /**
         * @Param functionInitializationContext
         * @return void
         * @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化
        **/
        @Override
        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            // 获取算子状态存储器
            OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();
      
            /**
             * ListStateDescriptor状态描述
             * 参数1:一个自定义名称
             * 参数2:存储的数据类型
            **/
            ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);
            /**
             * 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据
             * getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载
            **/
            strListState = operatorStateStore.getListState(stateDescriptor);
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.
    • 业务代码
        public class FlinkOperatorState {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度1
            env.setParallelism(1);
            // 开启Checkpoint, 8秒一个周期并开启一次性语义
            env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);
            // 指定checkpoint持久化路径
            env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
            // 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));
            // 获取Socket数据源
            DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
            // 将自定义的StateOperator传入
            SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());
            // 打印结果
            map.print();
            env.execute("Operator State");
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20

    具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.

  • 相关阅读:
    随机密码生成器(Python)
    frp篇---frp-notify + Gotify 实现 FRP 用户上线通知
    十大网站助力人工智能学习之路
    html-docx-js网页转为word格式框架
    工具分享:Springboot+Netty+Xterm搭建一个网页版的SSH终端
    申请全国400电话的步骤及注意事项
    springboot215基于springboot技术的美食烹饪互动平台的设计与实现
    Java算法-力扣leetcode-125. 验证回文串
    计算机网络面试常问问题--保研及考研复试
    Python requests库(爬虫和接口测试)
  • 原文地址:https://blog.csdn.net/AnameJL/article/details/134445693