• Flink的算子列表状态的使用


    背景

    算子的列表状态是平时比较常见的一种状态,本文通过官方的例子来看一下怎么使用算子列表状态

    算子列表状态

    算子列表状态支持应用的并行度扩缩容,如下所示:
    在这里插入图片描述
    使用方法参见官方示例,我加了几个注解:

    public class BufferingSink
            implements SinkFunction<Tuple2<String, Integer>>,
                       CheckpointedFunction {//要实现CheckpointedFunction接口
    
        private final int threshold;
    
       //算子操作状态对象--算子级别的
        private transient ListState<Tuple2<String, Integer>> checkpointedState;
        //本地变量,保存这个算子任务的本地变量--任务级别的 
        private List<Tuple2<String, Integer>> bufferedElements;
    
        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }
    
    //invoke方法中一般都是操作本地变量bufferedElements,不会直接操作算子列表状态
        @Override
        public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() >= threshold) {
                for (Tuple2<String, Integer> element: bufferedElements) {
                    // send it to the sink
                }
                bufferedElements.clear();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            for (Tuple2<String, Integer> element : bufferedElements) {
                // 把本地变量的值设置到算子列表状态中,算子列表状态会自动会被持久化
                checkpointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                    "buffered-elements",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
            // 定义算子列表状态
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    
            if (context.isRestored()) {
            // 算子列表状态的值设置到本地变量中
                for (Tuple2<String, Integer> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
  • 相关阅读:
    格林公式的理解
    Flume实践案例
    springboot启动自动配置原理分析
    创新型智慧农业信息化系统建设方案
    2022.10.9-10.16 AI行业周刊(第119期):相信坚持的力量
    计算机网络课程设计——中小型网络工程设计
    Algorithm Review 4
    贪心算法与DFS:九度OJ1030
    【JavaScript】浏览器调试控制台console的功能有了解多少
    小白的Python+Anaconda+vscode安装使用教程(win11系统手把手教学)(1)
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133870358