• Flink 的 Checkpoint配置详解


    Flink 的 Checkpoint 总结
    1、简介
    1)概述

    Flink中的每个函数和运算符都可以有状态,状态中存储计算的中间结果。

    状态可以用于容错,在任务被动失败或者主动重启时,可以通过 Checkpoint 或 Savepoint 从先前的状态中恢复计算数据,以保证数据计算的 ExactlyOnec(精准一次)或 AtleastOnce (至少一次)。

    2)检查点算法

    1.Barrier对齐: 一个Task 收到 所有上游 同一个编号的 barrier之后,才会对自己的本地状态做 备份
    精准一次: 在barrier对齐过程中,barrier后面的数据 阻塞等待(不会越过barrier)
    至少一次: 在barrier对齐过程中,先到的barrier,其后面的数据 不阻塞 接着计算

    2.非Barrier对齐: 一个Task 收到 第一个 barrier 时,就开始 执行备份,能保证 精准一次
    先到的barrier,将 本地状态 备份, 其后面的数据接着计算输出
    未到的barrier,其 前面的数据 接着计算输出,同时 也保存到 备份中
    最后一个barrier到达 该Task时,这个Task的备份结束

    2、前提
    1) ExactlyOnec(精准一次)

    上游:可以重发数据(如:消息队列:Kafka\分布式文件系统:HDFS)

    下游:支持幂等性(如:Doris 支持去重)

    2) AtleastOnce(至少一次)

    上游:可以重发数据(如:消息队列:Kafka\分布式文件系统:HDFS)

    3、启用检查点
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // 启用 Checkpoint 每 5 秒 一次,模式为 EXACTLY_ONCE
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    
    • 1
    • 2
    • 3
    • 4
    4、常用配置参数
    1)最终检查点
    // 最终检查点,1.15开始,默认是true
    configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
    
    • 1
    • 2
    2)开启 Changelog
    // 要求checkpoint的最大并发必须为1
    env.enableChangelogStateBackend(true);
    
    • 1
    • 2
    3)代码中用到HDFS,需要导入hadoop依赖、指定访问HDFS的用户名
    System.setProperty("HADOOP_USER_NAME", "HADOOP");
    
    • 1
    4)开启非对齐检查点(barrier非对齐)
    // 开启的要求: Checkpoint模式必须是精准一次,最大并发必须设为1
    checkpointConfig.enableUnalignedCheckpoints();
    
    // 开启非对齐检查点才生效: 默认0,表示一开始就直接用 非对齐的检查点
    // 如果大于0,一开始用 对齐的检查点(barrier对齐),对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐)
    checkpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(4));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    5)检查点常用配置
    // 1、启用检查点: 默认是barrier对齐的,周期为5s, 精准一次
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            
    // 2、指定检查点的存储位置
    checkpointConfig.setCheckpointStorage("hdfs:///ip:port/dir");
    
    // 3、checkpoint的超时时间: 默认10分钟
    checkpointConfig.setCheckpointTimeout(60000);
    
    // 4、同时运行中的checkpoint的最大数量
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    
    // 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔
    checkpointConfig.setMinPauseBetweenCheckpoints(1000);
    
    // 6、取消作业时,checkpoint的数据 是否保留在外部系统
    // DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删)
    // RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来
            checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    // 7、允许 checkpoint 连续失败的次数,默认 0 表示 checkpoint 一失败,job 就挂掉
    checkpointConfig.setTolerableCheckpointFailureNumber(10);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    6)其它配置参数
    KeyDefaultTypeDescription
    state.backend.incrementalfalseBoolean是否开启增量检查点
    state.backend.local-recoveryfalseBoolean是否开启本地恢复(支持 EmbeddedRocksDBStateBackend 和 HashMapStateBackend).
    state.checkpoints.num-retained1Integer要保留的已完成检查点的最大数量。
    state.savepoints.dir(none)Stringsavepoint保存的地址
    state.storage.fs.memory-threshold20 kbMemorySize状态数据文件的最小大小。所有小于此状态块的状态块都内联存储在根检查点元数据文件中。此配置的最大内存阈值为1MB。
    state.storage.fs.write-buffer-size4096Integer写入文件系统的检查点流的写入缓冲区的默认大小。实际的写入缓冲区大小被确定为此选项和选项“state.storage.fs.memory-threshold”的最大值。
    taskmanager.state.local.root-dirs(none)String定义了用于存储基于文件的状态以进行本地恢复的根目录。
  • 相关阅读:
    k8s学习-CKA真题-网络策略NetworkPolicy
    PostgreSQL 创建数据库、创建用户、赋予权限、创建表、主键总结
    PGRouting导航规划-AStar算法
    Hystrix的原理及应用:构建微服务容错体系的利器(一)
    UE5加载websocket模块为空
    设计模式 - 观察者模式
    JSON对象相互转换
    Java-NIO之Buffer(缓冲区)
    基于前馈式模糊控制的公路隧道通风系统研究
    实验室专利书写指南
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/132969226