• Flink入门系列08-State


    引言

    flink 提供了内置的状态淑君管理机制,包括故障发生后的状态一致性维护、以及状态数据的高效存储和访问。用户不用担心状态数据在程序失败及恢复时所引入的一系列问题,从而使得开发人员可以专注于应用程序的逻辑开发。

    算子状态和键控状态

    • 算子状态
      是每个subtask自己持有一份独立的状态数据,一旦job重启且待状态算子的并行度发生了变化,则之前的状态数据将在新的 subtask 间均匀分配。
      算子函数实现 CheckpointedFunction后,即可使用算子状态。
      算子状态通常用于source算子,其他场景下建议使用KeyedState(键控状态)
    • 键控状态
      只能应用于 KeyedStream 的算子中(keyby后的处理算子)
      算子为每一个 key 绑定一份独立的状态数据。

    算子状态测试代码

    task级别的失败重启策略

    1. 开启Checkpoint
    2. 设置自动重启策略
    // 开启  task级别故障自动 failover
    // env.setRestartStrategy(RestartStrategies.noRestart()); // 默认是,不会自动failover;一个task故障了,整个job就失败了
    // 使用的重启策略是: 固定重启上限和重启时间间隔
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));
    
    // 需要使用map算子来达到一个效果:
    // 每来一条数据(字符串),输出 该条字符串拼接此前到达过的所有字符串
    source.map(new StateMapFunction()).print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    /**
     * 要使用 operator state (算子状态),需要让用户自己的 Function类去实现 CheckpointedFunction
     * 然后在其中的方法 initializeState 中,去拿到 operator state 存储器
     */
    class StateMapFunction implements  MapFunction<String,String> , CheckpointedFunction{
    
        ListState<String> listState;
    
        /**
         * 正常的MapFunction的处理逻辑方法
         */
        @Override
        public String map(String value) throws Exception {
    
            /**
             * 故意埋一个异常,来测试 task级别自动容错效果
             */
            if(value.equals("x") && RandomUtils.nextInt(1,15)% 4 == 0)
                throw new Exception("哈哈哈哈,出错了");
    
            // 将本条数据,插入到状态存储器中
            listState.add(value);
    
            // 然后拼接历史以来的字符串
            Iterable<String> strings = listState.get();
            StringBuilder sb = new StringBuilder();
            for (String string : strings) {
                sb.append(string);
            }
    
            return sb.toString();
        }
    
        /**
         * 系统对状态数据做快照(持久化)时会调用的方法,用户利用这个方法,在持久化前,对状态数据做一些操控
         * @param context the context for drawing a snapshot of the operator
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // System.out.println("checkpoint 触发了,checkpointId : " +context.getCheckpointId());
        }
    
        /**
         * 算子任务在启动之初,会调用下面的方法,来为用户进行状态数据初始化
         * @param context the context for initializing the operator
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
    
            // 从方法提供的context中拿到一个算子状态存储器
            OperatorStateStore operatorStateStore = context.getOperatorStateStore();
    
            // 算子状态存储器,只提供List数据结构来为用户存储数据
            ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("strings", String.class); // 定义一个状态存储结构描述器
    
            // getListState方法,在task失败后,task自动重启时,会帮用户自动加载最近一次的快照状态数据
            // 如果是job重启,则不会自动加载此前的快照状态数据
            listState = operatorStateStore.getListState(stateDescriptor);  // 在状态存储器上调用get方法,得到具体结构的状态管理器
    
    
            /**
             * unionListState 和普通 ListState的区别:
             * unionListState的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据
             * ListState的快照存储数据,在系统重启后,list数据的重分配模式为: round-robin; 轮询平均分配
             */
            //ListState unionListState = operatorStateStore.getUnionListState(stateDescriptor);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    键控状态测试代码

    // 开启 task 级别故障自动 failover
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));
    
    // 需要使用map算子来达到一个效果:
    // 没来一条数据(字符串),输出 该条字符串拼接此前到达过的所有字符串
    source
       .keyBy(s->"0")
       .map(new RichMapFunction<String, String>() {
    
           ListState<String> lstState;
           @Override
           public void open(Configuration parameters) throws Exception {
               RuntimeContext runtimeContext = getRuntimeContext();
               // 获取一个List结构的状态存储器
               lstState = runtimeContext.getListState(new ListStateDescriptor<String>("lst", String.class));
           }
    
           @Override
           public String map(String value) throws Exception {
    
               // 将本条数据,装入状态存储器
               lstState.add(value);
    
               // 遍历所有的历史字符串,拼接结果
               StringBuilder sb = new StringBuilder();
               for (String s : lstState.get()) {
                   sb.append(s);
               }
               return sb.toString();
           }
       }).print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    状态数据结构介绍

    算子状态提供的数据结构

    • ListState
    • UnionListState

    unionListState 和普通 ListState的区别:

    1. unionListState 的快照存储数据,在系统重启后,list数据的重分配模式为: 广播模式; 在每个subtask上都拥有一份完整的数据。
    2. ListState 的快照存储数据,在系统重启后,list数据的重分配模式为: round-robin; 轮询平均分配。

    键控状态提供的数据结构

    • ValueState
    • ListState
    • MapState
    • ReducingState
    • AggregateState
    // 开启  task级别故障自动 failover
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));
    
    
    DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
    
    // 需要使用map算子来达到一个效果:
    // 没来一条数据(字符串),输出 该条字符串拼接此前到达过的所有字符串
    source
            .keyBy(s->"0")
            .map(new RichMapFunction<String, String>() {
    
                // 单值状态
                ValueState<String> valueState;
                // list状态
                ListState<String> lstState;
                // map状态
                MapState<String, String> mapState;
                // reducing状态
                ReducingState<Integer> reduceState;
                // aggregate状态
                AggregatingState<Integer, Double> aggState;
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    RuntimeContext runtimeContext = getRuntimeContext();
    
                    // 获取一个 单值 结构的状态存储器
                    valueState = runtimeContext.getState(new ValueStateDescriptor<String>("vstate", String.class));
    
    
                    // 获取一个List结构的状态存储器
                    lstState = runtimeContext.getListState(new ListStateDescriptor<String>("lst", String.class));
    
    
    
                    // 获取一个 Map 结构的状态存储器
                    mapState = runtimeContext.getMapState(new MapStateDescriptor<String, String>("xx", String.class, String.class));
    
    
                    // 获取一个reduce聚合状态
                    reduceState = runtimeContext.getReducingState(new ReducingStateDescriptor<Integer>("reduceState", new ReduceFunction<Integer>() {
                        @Override
                        public Integer reduce(Integer value1, Integer value2) throws Exception {
                            return value1 + value2;
                        }
                    }, Integer.class));
    
    
                    // 获取一个aggregate聚合状态
                    // 比如,我们要插入整数,返回平均值
                    aggState = runtimeContext.getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("aggState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                        @Override
                        public Tuple2<Integer, Integer> createAccumulator() {
                            return Tuple2.of(0, 0);
                        }
    
                        @Override
                        public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                            return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value);
                        }
    
                        @Override
                        public Double getResult(Tuple2<Integer, Integer> accumulator) {
    
                            return accumulator.f1 / (double) accumulator.f0;
                        }
    
                        @Override
                        public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
    
                            return Tuple2.of(a.f0 + b.f0, a.f1 + a.f1);
                        }
    
                    }, TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {
                    })));
    
    
                }
    
                @Override
                public String map(String value) throws Exception {
    
                    /**
                     * ValueState的数据操作api
                     */
                    valueState.update("xxx");  // 更新掉状态中的值
                    String str = valueState.value();  // 获取状态中的值
    
    
                    /**
                     * listState的数据操作api
                     */
                    Iterable<String> strings = lstState.get(); // 拿到整个liststate的数据迭代器
    
                    lstState.add("a"); // 添加一个元素到liststate中
    
                    lstState.addAll(Arrays.asList("b","c","d"));  // 一次性放入多个元素到liststate中
    
                    lstState.update(Arrays.asList("1","2","3")); // 一次性将liststate中的数据替换为传入的元素
    
                    /**
                     * mapState的数据操作api
                     */
                    String v = mapState.get("a");  // 从mapstate中根据一个key来获取它的value
    
                    boolean contain = mapState.contains("a");  // 判断mapstate中是否包含指定的key
    
                    Iterator<Map.Entry<String, String>> entryIterator = mapState.iterator(); // 拿到mapstate的entry迭代器
                    Iterable<Map.Entry<String, String>> entryIterable = mapState.entries(); // 拿到mapstate的entry的 Iterable(内含迭代器)
    
                    mapState.put("a","100");  // 往mapstate中插入一对KV
    
                    boolean isEmpty = mapState.isEmpty();  // 判断mapstate中是否没有元素(是否为空)
    
                    HashMap<String, String> dataMap = new HashMap<>();
                    dataMap.put("a","1");
                    dataMap.put("b","2");
    
                    mapState.putAll(dataMap);  // 通过一个hashmap对象,来一次性放入多对KV到mapstate中
    
                    Iterable<String> keys = mapState.keys();  // 拿到mapstate中所有key
                    Iterable<String> values = mapState.values(); // 拿到mapstate中的所有value
    
                    mapState.remove("a"); // 从mapstate移除key=“a"的条目
    
                    /**
                     * reduce 状态使用
                     */
                    reduceState.add(10);  // 往聚合状态中添加数据  ,此刻状态中的数据是10
                    reduceState.add(20);  // 往聚合状态中添加数据  ,此刻状态中的数据是30
                    Integer stateValue = reduceState.get();  // 从聚合状态中获取数据值
    
    
                    /**
                     * aggreate 状态使用
                     */
                    aggState.add(10);
                    aggState.add(20);
    
                    Double avgDouble = aggState.get(); // 获取状态值 : 15.0
    
    
                    return null;
                }
            }).setParallelism(2)
            .print().setParallelism(2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147

    状态后端

    所谓状态后端,就是状态数据的存储管理实现,包含状态数据的本地读写、快照远端存储功能;
    状态后端是可插拔替换的,它对上层屏蔽了底层的差异,因为在更换状态后端时,用户的代码不需要做任何修改。

    可用的状态后端类型

    • HashMapStateBackend
    • EmbeddedRocksDBStateBackend
      新版本中,FsStateBackend 和 MemoryStateBackend整合了HashMapStateBackend,而且 HashMapStateBackend 和 EmbeddedRocksDBStateBackend 所生成的快照文件也统一了格式,因而在job重新部署或者版本升级时,可以任意替换 statebackend
      如果不设置,默认使用 HashMapStateBackend。

    状态后端的配置代码

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    /**
     * 设置要使用的状态后端
    */
    HashMapStateBackend hashMapStateBackend = new HashMapStateBackend(); // 默认配置
    env.setStateBackend(hashMapStateBackend); // 使用HashMapStateBackend  作为状态后端
    
    EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
    //env.setStateBackend(embeddedRocksDBStateBackend);  // 设置 EmbeddedRocksDBStateBackend 作为状态后端
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    HashMapStateBackend 状态后端

    • 状态数据是以 java 对象形式存储在 heap 内存中;
    • 内存空间不够时,也会溢出一部分数据到本地磁盘文件;
    • 可以支撑大规模的状态数据。

    EmbeddedRocksDBStateBackend 状态后端

    • 状态数据时交给 rocksdb 来管理;
    • Rocksdb 中的数据是以序列化的 kv 字节进行存储;
    • Rocksdb 中的数据,有内存缓存的部分,也有磁盘文件的部分;
    • Rocksdb 的磁盘文件数据读写速度相对较快,所以在支持超大规模状态数据时,数据的读写效率不太有太大的降低。
  • 相关阅读:
    vscode 通过ssh 连接虚拟机vmware(ubuntu)
    【进阶C语言】数据在内存中的存储
    低代码开发那些事儿
    WEB攻防-ASP安全-ASP后门植入连接
    JDK、eclipse软件的安装
    记录:2022-9-10 完美数 快乐数 括号生成 请求调页 页面置换 写时复制 页面缓冲算法
    java计算机毕业设计网课系统源码+系统+mysql数据库+LW文档+部署文件
    静态和默认路由配置-----计算机网络
    Appium
    java-net-php-python-ssmA公司运维管理系统计算机毕业设计程序
  • 原文地址:https://blog.csdn.net/qq_17310871/article/details/126547951