在分布式架构中,当某个节点出现故障,其他节点基本不受影响。这时只需要重启应用,恢复之前某个时间点的状态继续处理就可以了。这一切看似简单,可是在实时流处理中,我们不仅需要保证故障后能够重启继续运行,还要保证结果的正确性、故障恢复的速度、对处理性能的影响,在 Flink 中,有一套完整的容错机制(fault tolerance)来保证故障后的恢复,其中最重要的就是检查点(checkpoint)和 保存点(Savepoint)
发生故障之后,最简单的想法当然是重启机器、重启应用。由于是分布式的集群,即使一个节点无法恢复,也不会影响应用的重启执行。这里的问题在于,流处理应用中的任务都是有状态的,而为了快速访问这些状态一般会直接放在堆内存里;现在重启应用,内存中的状态已经丢失,就意味着之前的计算全部白费了,需要从头来过。
所以我们需要把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”。遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。
在Flink中检查点的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。
每一次的触发时间是当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;Kafka 就是满足这些要求的一个最好的例子。
如上图,当“hello”“world”“hello”这批数据被处理完后,触发checkpoint,会将状态数据写入外部存储中。
在运行流处理程序时,Flink 会周期性地保存检查点。当发生故障时,就需要找到最近一
次成功保存的检查点来恢复状态。
如上图,第5条数据hello在sum计算中出错,这里 Source 任务已经处理完毕,所以偏移量为 5;Map 任务也处理完成了。而 Sum 任务在处理中发生了故障,此时状态并未保存。
接下来就需要从检查点来恢复状态了。具体的步骤为:
这样,就好像没有发生过故障一样;我们既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫作实现了“精确一次”(exactly-once)的状态一致性保证。
在 JobManager 中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会定期向 TaskManager 发出指令,要求保存检查点(带着检查点 ID),TaskManager 会让所有的 Source 任务把自己的偏移量(算子状态)保存起来,并将带有检查点 ID 的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递,之后 Source 任务就可以继续读入新的数据了。
Barrier是种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的
“分界线”(Checkpoint Barrier)。当收到Barrier这个特殊数据的时候,当前算子就把当前的状态进行快照。所以barrier 可以理解为“之前所有数据的状态更改保存入当前检查点”
通过在流中插入分界线(barrier),我们可以明确地指示触发检查点保存的时间。在一条单一的流上,数据依次进行处理,顺序保持不变,可是对于处理多个分区的传递时数据的顺序就会出现乱序的问题。
算法的核心就是两个原则:
检查点保存的算法具体过程如下:
除了检查点外,Flink 还提供了另一个非常独特的镜像保存功能——保存点。保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。
- //要在命令行中为运行的作业创建一个保存点镜像,只需要执行:
- bin/flink savepoint :jobId [:targetDirectory]
- //这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点
- state.savepoints.dir: hdfs:///flink/savepoints
- //当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:
- env.setDefaultSavepointDir("hdfs:///flink/savepoints");
- //由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:
- bin/flink stop --savepointPath [:targetDirectory] :jobId
- //我们已经知道,提交启动一个 Flink 作业,使用的命令是 flink run;现在要从保存点重启一个应用,其实本质是一样的:
- bin/flink run -s :savepointPath [:runArgs]
- //这里只要增加一个-s 参数,指定保存点的路径就可以了,其他启动时的参数还是完全一样的。还有一个“Savepoint Path”,这就是从保存点启动应用的配置。