Checkpoint
在 Flink 中,检查点的保存是周期性触发的,间隔时间可以进行设置,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。
当检查点保存操作被触发时,保存状态的时机是当所有任务都恰好处理完一个相同的输入数据的时候。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存,相当于构建了一个“事务”(transaction。
以 WordCount 为例,具体的检查点保存流程为:源(Source)任务从外部数据源读取数据,并记录当前的偏移量,作为算子状态(Operator State)保存下来;Sum 算子会把当前求和的结果作为按键分区状态(Keyed State)保存下来;当触发了检查点保存时,所有任务都已经处理了 3 条数据 (“hello”,“world”,“hello”),此时就会将 Source 算子的偏移量 3 和 sum 算子的状态 (“hello” -> 2,“world” -> 1) 保存成一个检查点,写入外部存储中。具体的存储系统由状态后端的配置项 “检查点存储” (CheckpointStorage) 来决定的,有作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)两种选择。一般情况下,会将检查点写入持久化的分布式文件系统。
当 Flink 任务发生故障时,会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程,发生故障时正在处理的所有数据都需要重新处理,所以需要让源(source)任务向数据源重新提交偏移量、请求重放数据,因此需要外部数据源能够重置偏移量,如 Kafka。
以 WordCount 为例,具体的检查点故障恢复流程为:
程序之前在处理完 (“hello”,“world”,“hello”) 三个数据后保存了一个检查点,之后又正常处理了一个数据 (“flink”),但在处理第五个数据 (“hello”) 时发生了故障,此时 Source 任务已经处理完毕且偏移量为 5,Map 任务也处理完成,而 sum 任务在处理中发生了故障,此时状态并未保存
第一步是重启发生故障的应用,此时所有任务的状态会清空
第二步是找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。此时,Flink 内部所有任务的状态,都恢复到了保存检查点的那一时刻,即刚好处理完 (“hello”,“world”,“hello”) 三个数据的时候;注意,虽然故障发生前 (“flink”) 数据已经处理完,但由于没保存在检查点中,所以恢复后的任务中并没有 (“flink”) 的处理结果
第三步是通过 Source 任务向外部数据源重新提交偏移量(offset)来实现从保存检查点后开始重新读取数据,由于从检查点恢复状态时之前所有在检查点保存后到发生故障前的这段时间内的数据 (“flink”,“hello”) 都丢失了,为了重新处理这些丢失的数据,必须从数据源重放这些数据。
第四步是重放第 4、5 个数据后,然后继续读取后面的数据正常处理,此时,既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫作实现了“精确一次”(exactly-once)的状态一致性保证。
Flink 采用了基于 Chandy-Lamport 算法的分布式快照保存检查点
检查点分界线或检查点屏障
案例:一个有两个输入流的应用程序,用并行的两个 Source 任务来读取自然数,然后按照奇偶数进行分组后进行 sum 操作,最终输出结果
在 JobManager 中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会周期性地向每个 TaskManager 发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点
收到指令后,TaskManager 会将带有检查点 ID 的分界线(barrier)插入到当前的数据流中并向下游传递,同时让所有的 Source 任务把自己的偏移量(算子状态)保存起来,状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完成
带有检查点 ID 的分界线(barrier)会被上游的 Source 任务广播给下游所有的 Sum 分区任务,sum 任务会等待所有上游输入分区的 barrier 到达,即分界线对齐;对于 barrier 已经到达的分区,在 barrier 之后继续到达的数据会被缓存;对于 barrier 尚未到达的分区,其所有到达数据会被继续正常 sum 处理,直到该分区的 barrier 到达
当一个 sum 任务收到所有上游输入分区的 barrier 时,该任务就将其结果状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发
该任务向下游转发检查点 barrier 后,会继续处理之前缓存或到达的数据
Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕,当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
为了防止 Flink 任务在保存检查点时占据大量时间导致数据处理性能降低,可以在代码中对检查点进行相关配置
public class TestFlinkCheckpointSet {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.开启checkpoint功能,checkpoint默认是关闭的
//env.enableCheckpointing(); //已过时,默认500毫秒间隔保存一次检查点
env.enableCheckpointing(1000L); //推荐方法,参数为Long类型的毫秒数,表示保存检查点间隔时间
//2.配置检查点存储
//需要传入一个CheckpointStorage的实现类
//2.1 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
//2.2 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
//3.其他高级配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//3.1 设置检查点一致性的保证级别
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//精确一次模式
//checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);//至少一次模式
//3.2 配置检查点保存的超时时间,超时没完成就会被丢弃掉
checkpointConfig.setCheckpointTimeout(60000L); //传入Long型毫秒数
//3.3 配置最大并发检查点数量
//指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量
checkpointConfig.setMaxConcurrentCheckpoints(1);
//3.4 配置两次检查点保存的最小间隔时间
//指定在上一个检查点完成之后,检查点协调器最快等多久可以出发保存下一个检查点的指令。即使已经达到了触发检查点保存的周期间隔时间点,只要距离上一个检查点完成的时间小于最小间隔时间,就依然不能开启下一次检查点的保存。这为正常处理数据留下了充足的间隙。
//指定了最小间隔时间,则 maxConcurrentCheckpoints 的值强制为 1
checkpointConfig.setMinPauseBetweenCheckpoints(500L); //传入Long型毫秒数
//3.5 配置是否只从检查点恢复
//默认值是 false,即从最近的检查点或保存点恢复
checkpointConfig.setPreferCheckpointForRecovery(true); //即使有最近的保存点,也只从检查点恢复
//3.6 配置容许检查点保存的失败次数
checkpointConfig.setTolerableCheckpointFailureNumber(0);
env.execute();
}
}
public class TestRestartStrategy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.固定延迟重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, //尝试重启次数
100000L //尝试重启的时间间隔
));
//2.失败率重启
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, //失败次数
Time.minutes(10), //重启时间范围
Time.minutes(1) //重启时间间隔
));
env.execute();
}
}
对于 Flink 流处理器内部来说,所谓的状态一致性,其实就是计算结果要保证准确。在流式计算正常的处理过程中,由于数据是来一个计算处理一个,所以结果肯定是正确的;但当发生故障后需要恢复状态进行回滚时也要保证发生故障后的数据既不丢失也不被重复计算,恢复以后的重新计算,结果应该也是完全正确的。
对于 Flink 内部流处理器来说,检查点机制可以保证故障恢复后数据不丢(在能够重放的前提下),并且只处理一次,所以已经可以做到 exactly-once 的一致性语义了。
Flink 的 checkpoint 加上可重放数据的输入数据源就可以保证 At_Least_Once 级别一致性,但最终要实现 Exactly_Once 一致性则还需要保证数据不能重复输出写入到外部系统
不可重放 Source | 可重放 Source | |
---|---|---|
任意 Sink | At-most-once | At-least-once |
幂等写入 Sink | At-most-once | Exactly-once(故障恢复时会出现暂时不一致) |
预写入日志 Sink | At-most-once | At-least-once |
两阶段提交 Sink | At-most-once | Exactly-once |
在Flink 流式处理程序中,一条数据由 Source 从数据源读取,经过 transform 操作过程,最终由 Sink 写入外部存储系统,当第一条数据从 Sink 写入 Kafka 时就会开启一个 kafka 的事务(transaction1),正常写入 Kafka 分区日志但标记为“未确认”(uncommitted),这就是“预提交”
jobmanager 触发 checkpoint 操作,会将检查点分界线(barrier1)注入数据流从 source 开始向下传递
Source 任务会将当前读取数据的偏移量作为状态写入检查点,存入状态后端;每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里并告知 JobManager
Sink 任务收到 barrier1 后,首先会保存自己当前的状态,存入 checkpoint 并通知 jobmanager,同时开启下一阶段的 kafka 的事务(transaction2),用于提交下一个检查点的数据,此时由于 barrier1 的检查点保存还未完成,所以上一个 Kafka 事务(transaction1)还未提交关闭
当 JobManager 收到所有算子任务完成 barrier1 的 checkpoint 保存时,JobManager 会向所有任务发通知确认这次 checkpoint 的完成,Sink 任务收到确认通知后,正式提交 transaction1 事务中的数据,kafka 关闭 transaction1 事务并将分区中未确认数据改为“已确认“,数据可以被正常消费