检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令;TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向JobManager 返回确认信息。这个过程是分布式的,当 JobManger 收到所有TaskManager 的返回信息后,就会确认当前检查点成功保存,而这一切工作的协调,就需要一个“专职人员”状态后端来完成。
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快 照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个 流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果 发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程, 就如同“读档”一样。
- // 启用检查点,间隔时间 1 秒
- env.enableCheckpointing(1000);
- CheckpointConfig checkpointConfig = env.getCheckpointConfig();
- // 设置精确一次模式
- checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- // 最小间隔时间 500 毫秒
- checkpointConfig.setMinPauseBetweenCheckpoints(500);
- // 超时时间 1 分钟
- checkpointConfig.setCheckpointTimeout(60000);
- // 同时只能有一个检查点
- checkpointConfig.setMaxConcurrentCheckpoints(1);
- // 开启检查点的外部持久化保存,作业取消后依然保留
- checkpointConfig.enableExternalizedCheckpoints(
- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // 启用不对齐的检查点保存方式
- checkpointConfig.enableUnalignedCheckpoints();
- 291
- // 设置检查点存储,可以直接传入一个 String,指定文件系统的路径
- checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")
存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由 Flink 自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。