• Flink的算子列表状态的使用


    背景

    算子的列表状态是平时比较常见的一种状态,本文通过官方的例子来看一下怎么使用算子列表状态

    算子列表状态

    算子列表状态支持应用的并行度扩缩容,如下所示:
    在这里插入图片描述
    使用方法参见官方示例,我加了几个注解:

    public class BufferingSink
            implements SinkFunction<Tuple2<String, Integer>>,
                       CheckpointedFunction {//要实现CheckpointedFunction接口
    
        private final int threshold;
    
       //算子操作状态对象--算子级别的
        private transient ListState<Tuple2<String, Integer>> checkpointedState;
        //本地变量,保存这个算子任务的本地变量--任务级别的 
        private List<Tuple2<String, Integer>> bufferedElements;
    
        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }
    
    //invoke方法中一般都是操作本地变量bufferedElements,不会直接操作算子列表状态
        @Override
        public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() >= threshold) {
                for (Tuple2<String, Integer> element: bufferedElements) {
                    // send it to the sink
                }
                bufferedElements.clear();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            for (Tuple2<String, Integer> element : bufferedElements) {
                // 把本地变量的值设置到算子列表状态中,算子列表状态会自动会被持久化
                checkpointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                    "buffered-elements",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
            // 定义算子列表状态
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    
            if (context.isRestored()) {
            // 算子列表状态的值设置到本地变量中
                for (Tuple2<String, Integer> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
  • 相关阅读:
    Java SSM Spring概述+IOC概念和作用+Spring IOC解决程序耦合
    详解卡尔曼滤波原理
    某60区块链安全之重入漏洞实战记录
    众和策略:题材股什么意思?
    jmeter+nmon+crontab简单的执行接口定时压测
    LinkedIn领英怎么避免封号?封号怎么解决?(建议收藏)
    张量-算术操作函数
    cesium 重点区域大屏展示效果(加载行政区划)
    Python+Selenium自动化测试项目实战
    RL 从敲门到入门
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133870358