• Flink的CheckPoint机制


    这里已经是Flink的第三篇原创啦。第一篇:Flink入门讲解了Flink的基础和相关概念,第二篇:压背原理,讲解了什么是背压,在Flink背压大概的流程是怎么样的。

    这篇来讲Flink另一个比较重要的知识,就是它的容错机制checkpoint原理。

    所谓的CheckPoint其实就是Flink会在指定的时间段上保存状态的信息,如果Flink挂了可以将上一次状态信息再捞出来,重放还没保存的数据来执行计算,最终可以实现exactly once

    状态只持久化一次最终的存储介质中(本地数据库/HDFS),在Flink下就叫做exactly once(计算的数据可能会重复(无法避免),但状态在存储介质上只会存储一次)。

    开胃菜(复习)

    作为用户,我们写好Flink的程序,上管理平台提交,Flink就跑起来了(只要程序代码没有问题),细节对用户都是屏蔽的。

    0dad9b6857744b0086bbd5db948732d9.png 

    实际上大致的流程是这样的:

    1. Flink会根据我们所写代码,会生成一个StreamGraph的图出来,来代表我们所写程序的拓扑结构。

    2. 然后在提交的之前会将StreamGraph这个图优化一把(可以合并的任务进行合并),变成JobGraph

    3. JobGraph提交给JobManager

    4. JobManager收到之后JobGraph之后会根据JobGraph生成ExecutionGraphExecutionGraph 是 JobGraph 的并行化版本)

    5. TaskManager接收到任务之后会将ExecutionGraph生成为真正的物理执行图

    1543bfdd13be42ac9c99a965ed237413.png 

    可以看到物理执行图真正运行在TaskManagerTransformSink之间都会有ResultPartitionInputGate这俩个组件,ResultPartition用来发送数据,而InputGate用来接收数据。

    155fdf7d46b64f38bbd780d412f2970f.png 

    屏蔽掉这些Graph,可以发现Flink的架构是:Client->JobManager->TaskManager

    4ce60a34d961486487dd383fcffe9f01.png 

    从名字就可以看出,JobManager是干「管理」,而TaskManager是真正干活的。回到我们今天的主题,checkpoint就是由JobManager发出。

    9131e1c9de614acc93346856f4a93e88.png 

    Flink本身就是有状态的,Flink可以让你选择执行过程中的数据保存在哪里,目前有三个地方,在Flink的角度称作State Backends

    • MemoryStateBackend(内存)

    • FsStateBackend(文件系统,一般是HSFS)

    • RocksDBStateBackend(RocksDB数据库)

    同样地,checkpoint信息就是保存在State Backends

    71c5d14a38a748bb8df5ecec318d6314.png 

    先来简单描述一下checkpoint的实现流程:

    checkpoint的实现大致就是插入barrier,每个operator收到barrier就上报给JobManager,等到所有的operator都上报了barrier,那JobManager 就去完成一次checkpointi

    305afdc2d76a4de8905c43093506c98e.png 

    因为checkpoint机制是Flink实现容错机制的关键,我们在实际使用中,往往都要配置checkpoint相关的配置,例如有以下的配置:

    1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. env.enableCheckpointing(5000);
    3. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    4. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    5. env.getCheckpointConfig().setCheckpointTimeout(60000);
    6. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    7. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    简单铺垫过后,我们就来撸源码了咯?

    Checkpoint(原理)

    JobManager发送checkpoint

    从上面的图我们可以发现 checkpoint是由JobManager发出的,并且JobManager收到的是JobGraph,会将JobGraph转换成ExecutionGraph

    这块在JobMaster的构造器就能体现出来:

    1. public JobMaster(...) throws Exception {
    2.   // 创建ExecutionGraph
    3.   this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
    4.  }

    我们点击进去createAndRestoreExecutionGraph看下:

    b80b5581ba6e4691be681a93f54903b1.png 

    CheckpointCoordinator这个名字,就觉得他很重要,有木有?它从ExecutionGraph来,我们就进去createExecutionGraph里边看看呗。

    点了两层buildGraph()方法,可以看到在方法的末尾处有checkpoint相关的信息:

    1. executionGraph.enableCheckpointing(
    2.     chkConfig.getCheckpointInterval(),
    3.     chkConfig.getCheckpointTimeout(),
    4.     chkConfig.getMinPauseBetweenCheckpoints(),
    5.     chkConfig.getMaxConcurrentCheckpoints(),
    6.     chkConfig.getCheckpointRetentionPolicy(),
    7.     triggerVertices,
    8.     ackVertices,
    9.     confirmVertices,
    10.     hooks,
    11.     checkpointIdCounter,
    12.     completedCheckpoints,
    13.     rootBackend,
    14.     checkpointStatsTracker);

    前面的几个参数就是我们在配置checkpoint参数的时候指定的,而triggerVertices/confirmVertices/ackVertices我们溯源看了一下,在源码中注释也写得清清楚楚的。

    1. // collect the vertices that receive "trigger checkpoint" messages.
    2. // currently, these are all the sources 
    3. List<JobVertexID> triggerVertices = new ArrayList<>();
    4. // collect the vertices that need to acknowledge the checkpoint
    5. // currently, these are all vertices
    6. List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
    7. // collect the vertices that receive "commit checkpoint" messages
    8. // currently, these are all vertices
    9. List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

    下面还是进去enableCheckpointing()看看大致做了些什么吧:

    1. // 将上面的入参分别封装成ExecutionVertex数组
    2. ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
    3. ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
    4. ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
    5. // 创建触发器
    6. checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
    7. // 创建checkpoint协调器
    8. checkpointCoordinator = new CheckpointCoordinator(
    9.   jobInformation.getJobId(),
    10.   interval,
    11.   checkpointTimeout,
    12.   minPauseBetweenCheckpoints,
    13.   maxConcurrentCheckpoints,
    14.   retentionPolicy,
    15.   tasksToTrigger,
    16.   tasksToWaitFor,
    17.   tasksToCommitTo,
    18.   checkpointIDCounter,
    19.   checkpointStore,
    20.   checkpointStateBackend,
    21.   ioExecutor,
    22.   SharedStateRegistry.DEFAULT_FACTORY);
    23. // 设置触发器
    24. checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
    25. // 状态变更监听器
    26. // job status changes (running -> onall other states -> off)
    27. if (interval != Long.MAX_VALUE) {
    28.   registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
    29. }

    值得一提的是,点进去CheckpointCoordinator()构造方法可以发现有状态后端StateBackend的身影(因为checkpoint就是保存在所配置的状态后端)

    6cf4a2f415f145a7b0c30d044ca9abbe.png 

    如果Job的状态变更了,CheckpointCoordinatorDeActivator是能监听到的。

    4c9639d7e6a24da09186c0a6b14a2a36.png 

    当我们的Job启动的时候,又简单看看startCheckpointScheduler()里边究竟做了些什么操作:

    e7f22064c4274bfeb960dc1a28513f2d.png 

    它会启动一个定时任务,我们具体看看定时任务具体做了些什么ScheduledTrigger,然后看到比较重要的方法:triggerCheckpoint()

    这块代码的逻辑有点多,我们简单来总结一下

    1. 前置检查(是否可以触发checkpoint,距离上一次checkpoint的间隔时间是否符合...)

    2. 检查是否所有的需要做checkpoint的Task都处于running状态

    3. 生成checkpointId,然后生成PendingCheckpoint对象来代表待处理的检查点

    4. 注册一个定时任务,如果checkpoint超时后取消checkpoint

    34d43aacfa12451da6e788c6af31b906.png 

    注:检查task的任务状态时,只会把sourcetask封装给进Execution[]数组

    da0fe19026f44a52b0573ffa779650bf.png 

    JobManager侧只会发给sourcetask发送checkpoint

    d113933538024e968728f85adcb51918.png 

    JobManager发送总结

    贴的图有点多,最后再来简单总结一波,顺便画个流程图,你就会发现还是比较清晰的。

    1. JobManager 收到client提交的JobGraph

    2. JobManger 需要通过JobGraph生成ExecutionGraph

    3. 在生成ExcutionGraph的过程中实际上就会触发checkpoint的逻辑

      1. 定时任务会前置检查(其实就是你实际上配置的各种参数是否符合)

      2. 判断checkpoint相关的task是否都是running状态,将source的任务封装到Execution数组中

      3. 创建checkpointID/checkpointStorageLocation(checkpoint保存的地方)/PendingCheckpoint(待处理的checkpoint)

      4. 创建定时任务(如果当checkpoint超时,会将相关状态清除,重新触发)

      5. 真正触发checkPointTaskManager(只会发给sourcetask)

      6. 找出所有source和需要ack的Task

      7. 创建checkpointCoordinator 协调器

      8. 创建CheckpointCoordinatorDeActivator监听器,监听Job状态的变更

      9. Job启动时,会触发ScheduledTrigger 定时任务

    682be6251ffa42da910081fea86172fa.png 

    TaskManager(source Task接收)

    前面提到了,JobManager 在生成ExcutionGraph时,会给所有的source 任务发送checkpoint,那么source收到barrier又是怎么处理的呢?会到TaskExecutor这里进行处理。

    TaskExecutor有个triggerCheckpoint()方法对接收到的checkpoint进行处理:

    233052514d624f41832283260b679dbf.png 

    进入triggerCheckpointBarrier()看看:

    47f1304628ff46d88a7cd288741403f1.png 

    再想点进去triggerCheckpoint()看实现时,我们会发现走到performCheckpoint()这个方法上:

    cea1b46aa17243bcabb9ecd250a1d643.png 

    从实现的注释我们可以很方便看出方法大概做了什么:

    4d9ae195f1ca4d7d9aaec4c9caf710cd.png   

    这块我们先在这里放着,知道Source的任务接收到Checkpoint会广播到下游,然后会做快照处理就好。

    下面看看非Source 的任务接收到checkpoint是怎么处理的。

    TaskManager(非source Task接收)

    在上一篇《压背原理》又或是这篇的基础铺垫上,其实我们可以看到在Flink接收数据用的是InputGate,所以我们还是回到org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput这个方法上

    随后定位到处理数据的逻辑:

    final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
    

    想点击进去,发现有两个实现类:

    • BarrierBuffer

    • BarrierTracker

    d7f68d3da5f243f39202289d6be29eba.png 

    这两个实现类其实就是对应着AT_LEAST_ONCEEXACTLY_ONCE这两种模式。

    1. /**
    2.  * The BarrierTracker keeps track of what checkpoint barriers have been received from
    3.  * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
    4.  * it notifies its listener of a completed checkpoint.
    5.  *
    6.  * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
    7.  * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
    8.  * guarantees. It can, however, be used to gain "at least once" processing guarantees.
    9.  *
    10.  * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
    11.  */
    12. /**
    13.  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
    14.  * all inputs have received the barrier for a given checkpoint.
    15.  *
    16.  * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
    17.  * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
    18.  * the blocks are released.
    19.  */

    简单翻译下就是:

    • BarrierTrackerat least once模式,只要inputChannel接收到barrier,就直接通知完成处理checkpoint

    • BarrierBufferexactly-once模式,当所有的inputChannel接收到barrier才通知完成处理checkpoint,如果有的inputChannel还没接收到barrier,那已接收到barrierinputChannel会读数据到缓存中,直到所有的inputChannel都接收到barrier,这有可能会造成反压。

    说白了,就是BarrierBuffer会有对齐barrier的处理。

    这里又提到exactly-onceat least once了。在文章开头也说过Flink是可以实现exactly-once的,含义就是:状态只持久化一次最终的存储介质中(本地数据库/HDFS)。

    在这里我还是画个图和举个例子配合BarrierBuffer/BarrierTracker来解释一下。

    现在我有一个Topic,假定这个Topic有两个分区partition(又或者你可以理解我设置消费的并行度是2)。现在要拉取Kafka这两个分区的数据,由算子Map进行消费转换,期间在转化的时候可能会存储些信息到State(Flink给我们提供的存储,你就当做是会存到HDFS上就好了),最终输出到Sink

    efd637d6dd074a55a33efb8aa20643c4.png 

    从上面的知识点我们应该可以知道, 在Flinkcheckpoint的时候JobManager往每个Source任务(简单对应图上的两个paritiion) 发送checkpointId,然后做快照存储。

    显然,source任务存储最主要的内容就是消费分区的offset嘛。比如现在source 1offerset100,而source2offset105

    d4fe73fe02ee48d1a7acfa99308a1c59.png 

    目前看来source2的数据会比source1的数据先到达Map

    假定我们用的是BarrierBuffer exactly-once模式,那么source2barrier到达Map算子的后,source2之后的数据只能停下来,放到buffer上,不做处理。等source1 的barrier来了以后,再真正处理source2放在buffer的数据。

    这就是所谓的barrier对齐

    5046b5d974b94527b93dad7fe4dc5655.png 

    假定我们用的是BarrierTracker at least once模式,那么source2barrier到达Map算子的后,source2之后的数据不会停下来等待source1,后面的数据会继续处理。

    deee250d0e9e4cd5bd10c52e712e9b56.png 

    现在问题就来了,那对不对齐的区别是什么呢?

    依照上面图的的运行状态(无论是BarrierTracker at least once模式还是BarrierBuffer exactly-once模式),现在我们的checkpoint都没做,因为source1barrier还没到sink端呢。现在Flink挂了,那显然会重新拉取source 1offerset小于100,而source2offset小于105的数据,State的最终信息也不会保存。

    c4ead777d6ae4d6ca39dd01c6b681aba.png 

    checkpoint从没做过的时候,对数据不会产生任何的影响(所以这里在Flink的内部是没啥区别的)

    而假设我们现在是BarrierTracker at least once模式,没有任何问题,程序继续执行。现在source1barrier也走到了slink,最后完成了一次checkpoint

    65801965a5154ad386cd1a3f0b0d7656.png 

    由于source2barriersource1barrier要快,那么source1所处理的State的数据实际是包括offset>105的数据的,自然Flink保存的时候也会把这部分保存进去。

    程序继续运行,刚好保存完checkpoint后,此时系统出了问题,挂了。因为checkpoint已经做完了,所以Flink会从source 1offerset100,而source2offset105重新消费。

    但是,由于我们是BarrierTracker at least once模式,所以State里边的保存状态实际上有过source2offset 大于105 的记录了。那source2重新从offset105开始消费,那就是会重复消费!

    0ca281faef404d0fad7b448f547c9640.png 

    理解了上面所讲的话,那再看BarrierBuffer exactly-once模式应该就不难理解了(各位大哥大嫂你也要经过这个operator处理保存吗?我们一起吧?有问题,我们一起重来,没问题我们一起保存

    41bdaef7c00442d6aa21bd8baeed7cdc.png 

    无论是BarrierTracker还是BarrierBuffer也好,在处理checkpoint的时候都需要调用notifyCheckpoint() 方法,而notifyCheckpoint()方法最终调用的是triggerCheckpointOnBarrier

    ca7f9f02d92f4310b8c04d60d363eecc.png 

    triggerCheckpointOnBarrier()最终还是会调用performCheckpoint()方法,所以无论是source接收到checkpoint还是operator接收到checkpoint,最终还是会调用performCheckpoint()方法。

    a36b350a040141ddad10aba7a822fc61.png 

    大家有兴趣可以进去checkpointState()方法里边详细看看,里边会对State状态信息进行写入,完成后上报给TaskManager

    066e4319584b48c2beddd2673e314ef8.png 

    TaskManager总结

    99c27ab8304f4afca31e7d489f77cfe5.png 

    • TaskExecutor接收到JobManager下发的checkpoint,由triggerCheckpoint方法进行处理

      • triggerCheckpoint方法最终会调用org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpoint,而最主要的就是performCheckpoint方法

      • performCheckpoint方法会对checkpoint做前置处理,barrier广播到下游,处理State状态做快照,最后回到成功消息给JobManager

    • 普通算子由org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput这个方法读取数据,具体处理逻辑在getNextNonBlocked方法上。

      • 该方法有两个实例,分别是BarrierBufferBarrierTracker,这两个实例对应着checkpoint不同的模式(至少一次和精确一次)。精确一次需要对barrier对齐,有可能导致反压的情况

      • 最后处理完,会调用notifyCheckpoint方法,实际上还是会调performCheckpoint方法

    所以说,最终处理checkpoint的逻辑是一致的,只是会source会直接通过TaskExecutor处理,而普通算子会根据不同的配置在接收到后有不同的实例处理:BarrierTracker/BarrierBuffer

    JobManager接收回应

    前面提到了,无论是source还是普通算子,都会调用performCheckpoint方法进行处理。

    performCheckpoint方法里边处理完State快照的逻辑,会调用reportCompletedSnapshotStates告诉JobManager快照已经处理完了。

    reportCompletedSnapshotStates方法里边又会调用acknowledgeCheckpoint方法通过RPC去通知JobManager

    cc802ccda80146e995be9c9c02ca77e5.png 

    兜兜转转,最后还是会回到checkpointCoordinator上,调用receiveAcknowledgeMessage进行处理

    e7b239b867fc467d8b37facdee4bae29.png 

    进入到receiveAcknowledgeMessage上,主要就是下面图的逻辑:处理完返回不同的状态,根据不同的状态进行处理

    8e4a9ae991554b89a5ca826d893a0809.png 

    主要我们看的其实就是acknowledgeTask方法里边做了些什么。

    在 PendingCheckpoint维护了两个Map:

    1. //  已经接收到 Ack 的算子的状态句柄
    2. private final Map<OperatorID, OperatorState> operatorStates;
    3. // 需要 Ack 但还没有接收到的 Task
    4. private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;

    然后我们进去acknowledgeTask简单了解一下可以发现就是在处理operatorStatesnotYetAcknowledgedTasks

    1. synchronized (lock) {
    2.    if (discarded) {
    3.     return TaskAcknowledgeResult.DISCARDED;
    4.    }
    5.    
    6.     // 接收到Task了,从notYetAcknowledgedTasks移除
    7.    final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
    8.    if (vertex == null) {
    9.     if (acknowledgedTasks.contains(executionAttemptId)) {
    10.      return TaskAcknowledgeResult.DUPLICATE;
    11.     } else {
    12.      return TaskAcknowledgeResult.UNKNOWN;
    13.     }
    14.    } else {
    15.     acknowledgedTasks.add(executionAttemptId);
    16.    }
    17.     // ...
    18.    if (operatorSubtaskStates != null) {
    19.     for (OperatorID operatorID : operatorIDs) {
    20.      // ...
    21.      OperatorState operatorState = operatorStates.get(operatorID);
    22.      // 新来的operatorID,添加到operatorStates
    23.      if (operatorState == null) {
    24.       operatorState = new OperatorState(
    25.        operatorID,
    26.        vertex.getTotalNumberOfParallelSubtasks(),
    27.        vertex.getMaxParallelism());
    28.       operatorStates.put(operatorID, operatorState);
    29.      }
    30.           //....
    31.     }
    32.    }

    等到所有的Task都到齐以后,就会调用isFullyAcknowledged进行处理。

    2c995fe5abd84f0a9a53a1cbae2706bd.png 

    最后调用completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();来实现最终的存储,所有完毕以后会通知所有的Task 现在checkpoint已经完成了。

    f6ace152ea304192b18af0031af300d2.png 

    最后

    总的来说,这篇文章带着大家走马观花撸了下Checkpoint,很多细节我也没去深入,但我认为这篇文章可以让你大概了解到Checkpoint的实现过程。

    最后再来看看官网的图,看完应该大概就能看得懂啦:

    80a5c30877e9480796bf4c7ba34ad672.png 

    相信我,或许你现在还没用到Flink,但等你真正去用Flink的时候,checkpoint是肯定得搞搞的(:现在可能有的同学还没看懂,没关系,先点个赞👍,收藏起来,后面就用得上了。

  • 相关阅读:
    数据结构 第七章(查找算法)
    基于opencv的手指静脉识别(附源码)
    【ArcGIS Pro微课1000例】0020:ArcGIS Pro中河流(曲线)、湖泊(水体色)图例制作案例教程
    【DOCKER】显示带UI的软件
    扬帆际海:如何度过shopee销量瓶颈期?
    C++核心编程:P19->STL----常用算法(下)
    Servlet 与Spring对比!
    基于python的多种图像增强算法实现
    【MySQL日志与备份篇】其他数据库日志
    Python接口自动化测试之post请求详解
  • 原文地址:https://blog.csdn.net/m0_72088858/article/details/126519159