• Flink之状态TTL机制


    在Flink状态使用过程中有时需要清除State中不许需要的数据,否则State中的数据会越来越多,既增加了内存压力,也降低了计算效率.而TTL机制可以很好的帮我们解决这个分体,利用TTL机制可以将状态中的冷热数据分离,将使用率很低的冷数据及时清除.
    这里以Operator State为例子

    class StateMapFunc2 implements MapFunction<String, List<Tuple2<String, String>>>, CheckpointedFunction {
        private ListState<Tuple2<String, String>> listState;
    
        @Override
        public List<Tuple2<String, String>> map(String s) throws Exception {
            // ...
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
        }
    
        @Override
        public void initializeState(FunctionInitializationContext ctx) throws Exception {
            OperatorStateStore operatorStateStore = ctx.getOperatorStateStore();
            // 配置State TTL
            StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10)) // 设置数据存活时长,当该数据在State中存活时间超过10s时删除该数据
                    // 这个方法也是设置数据存活时长,和StateTtlConfig.Builder(Time.seconds(10))的作用一样,可以不用这个方法,如果用了会覆盖上面设置的时长
                    .setTtl(Time.seconds(10))
                    /**
                     * updateTtlOnCreateAndWrite和updateTtlOnReadAndWrite二选一即可, 这两个方法的主要作用就是配合setTtl方法将冷热数据进行分离
                     **/
                    // 当该条数据在State中插入或者更新的时候,刷新计时
                    .updateTtlOnCreateAndWrite()
                    // 读或写都刷新该数据的TTL计时
                    .updateTtlOnReadAndWrite()
                    /**
                     * setStateVisibility就是设置状态的可见性,前面setTtl方法是设置删除过期数据,删除过期数据实际上是由另一个异步线程周期性(定时器)的完成,也就是说超过10s的数据不一定会马上被删除,但是
                     * 获取数据的时候底层会将超过存活时间的数据进行判断过滤,setStateVisibility就是可以设置是否可以查询到这些过期的数据,NeverReturnExpired和ReturnExpiredIfNotCleanedUp二选一.
                     **/
                    // 不返回过期数据,这个也是默认策略
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    // 返回还没有被清除的过期数据
                    .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                    // 指定TTL计时时间语义(默认处理时间)
                    .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                    .build();
            // 配置状态描述,在ListStateDescriptor构造器中声明数据类型,简单类型可以使用xxx.class,符合类型需要使用到TypeInformation.of()
            ListStateDescriptor descriptor = new ListStateDescriptor("MapState", TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
            }));
            // 状态描述器加载TTL配置
            descriptor.enableTimeToLive(ttlConfig);
            listState = operatorStateStore.getListState(descriptor);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    代码中只需要关注initializeState()方法即可,里面列出了有关TTL常用的API,注释中也进行了相关的介绍.

  • 相关阅读:
    学生信息管理系统
    趣味C语言——【猜数字】小游戏
    【Linux】【网络】传输层协议:UDP
    Java面向对象-包-权限修饰符-final-常量-枚举-抽象类-接口
    Bika LIMS 开源LIMS集——实验室检验流程概述及主页、面板
    MySQL8 NDB Cluster安装部署
    YOLOv8中训练参数中文解释
    【Go】十九、网络连接与请求发送
    Monaco Editor教程(五): 实现同时多文件编辑,tab切换
    如何查看centos7中启动了几个nginx
  • 原文地址:https://blog.csdn.net/AnameJL/article/details/134524594