• 【Flink状态管理(六)】Checkpoint的触发方式(1)通过CheckpointCoordinator触发算子的Checkpoint操作


    Checkpoint的触发方式有两种

    • 一种是数据源节点中的Checkpoint操作触发,通过CheckpointCoordinator组件进行协调和控制。 CheckpointCoordinator通过注册定时器的方式按照配置的时间间隔触发数据源节点的Checkpoint操作。数据源节点会向下游算子发出Checkpoint Barrier事件,供下游节点使用。
    • 另一种是下游算子节点根据上游发送的Checkpoint Barrier事件控制算子中Checkpoint操作的触发时机,即只有接收到所有上游Barrier事件后,才会触发本节点的Checkpoint操作。

    本文先介绍通过CheckpointCoordinator触发算子的Checkpoint操作

    CheckpointCoordinator在整个作业中扮演了Checkpoint协调者的角色,负责在数据源节点触发Checkpoint以及整个作业的Checkpoint管理,并且CheckpointCoordinator组件会接收TaskMananger在Checkpoint执行完成后返回的Ack消息。

     

    一. 启动CheckpointCoordinator

    当作业的JobStatus转换为Running时,通知CheckpointCoordinatorDeActivator监听器启动CheckpointCoordinator服务。

    如代码CheckpointCoordinatorDeActivator.jobStatusChanges()方法主要包含如下逻辑。

    > 1. 当`newJobStatus == JobStatus.RUNNING`时,立即调用
    > coordinator.startCheckpointScheduler()方法启动整个Job的调度器
    > CheckpointCoordinator,此时Checkpoint的触发依靠CheckpointCoordinator进行协调。
    > 
    > 2.JobStatus为其他类型状态时,调用coordinator.stopCheckpointScheduler()方法,
    > 停止当前Job中的Checkpoint操作。
    
    
    
    public class CheckpointCoordinatorDeActivator implements JobStatusListener {
       private final CheckpointCoordinator coordinator;
       public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
          this.coordinator = checkNotNull(coordinator);
       }
       @Override
       public void jobStatusChanges(JobID jobId,JobStatus newJobStatus, long timestamp,
                                  Throwable error) {
          if (newJobStatus == JobStatus.RUNNING) {
             // 启动Checkpoint调度程序
             coordinator.startCheckpointScheduler();
          } else {
             // 直接停止CheckpointScheduler
             coordinator.stopCheckpointScheduler();
          }
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

     

    二. 开启CheckpointScheduler线程

    接下来在CheckpointCoordinator.startCheckpointScheduler()方法中调用scheduleTriggerWithDelay()方法进行后续操作,向创建好的checkpointCoordinatorTimer线程池添加定时调度执行的Runnable线程。

    如代码所示:

    在CheckpointCoordinator.scheduleTriggerWithDelay()方法中指定baseInterval参数,设定执行Checkpoint操作的时间间隔,通过定时器周期性地触发ScheduledTrigger线程,Checkpoint的具体操作在ScheduledTrigger线程中实现。

    private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
       return timer.scheduleAtFixedRate(
          new ScheduledTrigger(),
          initDelay, baseInterval, TimeUnit.MILLISECONDS);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

     

    三. 触发Checkpoint

    如代码,ScheduledTrigger也是CheckpointCoordinator的内部类,实现了Runnable接口。在ScheduledTrigger.run()方法中调用了CheckpointCoordinator.triggerCheckpoint()方法触发和执行Checkpoint操作。

    private final class ScheduledTrigger implements Runnable {
       @Override
       public void run() {
          try {
             // 调用triggerCheckpoint()方法触发Checkpoint操作
             triggerCheckpoint(System.currentTimeMillis(), true);
          }
          catch (Exception e) {
             LOG.error("Exception while triggering checkpoint for job {}.", job, e);
          }
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    CheckpointCoordinator.triggerCheckpoint()方法包含的执行逻辑非常多,这里重点介绍其中的主要逻辑。根据CheckpointCoordinator触发Checkpoint操作的过程分为以下几个部分。

    1. Checkpoint执行前的工作

    1. 首先检查Checkpoint的执行环境和参数,满足条件后触发执行Checkpoint操作。Checkpoint执行过程分为异步和同步两种:

    调用preCheckBeforeTriggeringCheckpoint()方法进行一些前置检查,主要包括检查CheckpointCoordinator当前的状态是否为shutdown、Checkpoint尝试次数是否超过配置的最大值。

    1. 构建执行和触发Checkpoint操作对应的Task节点实例的Execution集合,其中tasksToTrigger数组中存储了触发Checkpoint操作的ExecutionVertex元素,实际上就是所有的数据源节点。

    CheckpointCoordinator仅会触发数据源节点的Checkpoint操作,其他节点则是通过Barrier对齐的方式触发的。

    1. 构建需要发送Ack消息的ExecutionVertex集合,主要是从tasksToWaitFor集合中转换而来。

    tasksToWaitFor中存储了ExecutonGraph中所有的ExecutionVertex,也就是说每个ExecutionVertex节点对应的Task实例都需要向CheckpointCoordinator中汇报Ack消息

    // 主要做前置检查
       synchronized (lock) {
          preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
       }
       // 创建需要执行的Task对应的Execution集合
      Execution[] executions = new Execution[tasksToTrigger.length];
       // 遍历tasksToTrigger集合,构建Execution集合
       for (int i = 0; i < tasksToTrigger.length; i++) {
       //获取Task对应的Execution集合
          Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
          if (ee == null) {
          // 如果Task对应的Execution集合为空,代表Task没有被执行,则抛出异常
             LOG.info("Checkpoint triggering task {} of job {} is not being 
                executed at the moment. Aborting checkpoint.", tasksToTrigger[i].
                getTaskNameWithSubtaskIndex(), job);
             throw new CheckpointException(
                CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
          } else if (ee.getState() == ExecutionState.RUNNING) {
             // 如果ExecutionState为RUNNING,则添加到executions集合中
          executions[i] = ee;
          } else {
          // 如果其他ExecutionState不为RUNNING,则抛出异常
             LOG.info("Checkpoint triggering task {} of job {} is not in state {} 
               but {} instead. Aborting checkpoint.",
                 tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                 job,
                 ExecutionState.RUNNING,
                 ee.getState());
             throw new CheckpointException(
                CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
          }
       }
       // 组装用于需要发送Ack消息的Task集合
       Map<ExecutionAttemptID, ExecutionVertex> ackTasks = 
          new HashMap<>(tasksToWaitFor.length);
       for (ExecutionVertex ev : tasksToWaitFor) {
          Execution ee = ev.getCurrentExecutionAttempt();
          if (ee != null) {
             ackTasks.put(ee.getAttemptId(), ev);
          } else {
             LOG.info("Checkpoint acknowledging task {} of job {} is not being 
                executed at the moment. Aborting checkpoint.", ev.getTaskNameWith
                   SubtaskIndex(), job);
             throw new CheckpointException(
                CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
          }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

     

    2. 创建PendingCheckpoint

    在执行Checkpoint操作之前,需要构建PendingCheckpoint对象,从字面意思上讲就是挂起Checkpoint操作。

    从开始执行Checkpoint操作直到Task实例返回Ack确认成功消息,Checkpoint会一直处于Pending状态,确保Checkpoint能被成功执行。

    如代码逻辑:

    1. Checkpoint有唯一的checkpointID标记,根据高可用模式选择不同的计数器。

    如果基于ZooKeeper实现了高可用集群,会调用ZooKeeperCheckpointIDCounter实现checkpointID计数;如果是非高可用集群,则会通过StandaloneCheckpointIDCounter完成checkpointID计数。

    1. 创建checkpointStorageLocation,用于定义Checkpoint过程中状态快照数据存放的位置。

    checkpointStorageLocation通过checkpointStorage创建和初始化,不同的checkpointStorage实现创建的checkpointStorageLocation会有所不同。

    1. 创建PendingCheckpoint对象。

    包括checkpointID、ackTasks以及checkpointStorageLocation等参数信息。将创建好的PendingCheckpoint存储在pendingCheckpoints集合中,并异步执行PendingCheckpoint操作。

    final CheckpointStorageLocation checkpointStorageLocation;
    final long checkpointID;
    try {
       //通过checkpointIdCounter获取checkpointID
       checkpointID = checkpointIdCounter.getAndIncrement();
          // 获取checkpointStorageLocation
       checkpointStorageLocation = props.isSavepoint() ?
             checkpointStorage
          .initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
             checkpointStorage.initializeLocationForCheckpoint(checkpointID);
    }
    // 省略部分代码
    // 创建PendingCheckpoint对象
    final PendingCheckpoint checkpoint = new PendingCheckpoint(
       job,
       checkpointID,
       timestamp,
       ackTasks,
       masterHooks.keySet(),
       props,
       checkpointStorageLocation,
       executor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

     

    3. Checkpoint的触发与执行

    在CheckpointCoordinator.triggerCheckpoint()方法中,会在synchronized(lock)模块内定义和执行Checkpoint操作的具体逻辑,主要包含如下步骤。

    1. 获取coordinator对象锁,对TriggeringCheckpoint对象进行预检查,主要包括检查CheckpointCoordinator状态和PendingCheckpoint尝试次数等。

    2. 将PendingCheckpoint存储在pendingCheckpoints键值对中,使用定时器创建cancellerHandle对象,cancellerHandle用于清理过期的Checkpoint操作。

    通过checkpoint.setCancellerHandle()方法设置Checkpoint的CancellerHandle,设置成功则返回True,如果失败则返回false,说明当前Checkpoint已经被释放。

    1. 调用并执行MasterHook。可以通过实现MasterHook函数,准备外部系统环境或触发相应的系统操作。

    2. 遍历执行executions集合中的Execution节点,判断props.isSynchronous()方法是否为True,如果为True则调用triggerSynchronousSavepoint()方法同步执行Checkpoint操作。
      其他情况则调用triggerCheckpoint()方法异步执行Checkpoint操作。

    // 获取coordinator-wide lock
    synchronized (lock) {
       // TriggeringCheckpoint检查
       preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
       LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, 
          job);
          // 将checkpoint存储在pendingCheckpoints KV集合中
       pendingCheckpoints.put(checkpointID, checkpoint);
          // 调度canceller线程,清理过期的Checkpoint对象
       ScheduledFuture<?> cancellerHandle = timer.schedule(
             canceller,
             checkpointTimeout, TimeUnit.MILLISECONDS);
          // 确定Checkpoint是否已经被释放
       if (!checkpoint.setCancellerHandle(cancellerHandle)) {
          cancellerHandle.cancel(false);
       }
       // 调用MasterHook方法
       for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
          final MasterState masterState =
             MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
                .get(checkpointTimeout, TimeUnit.MILLISECONDS);
          checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
       }
       Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
    }
    // 创建CheckpointOptions
    final CheckpointOptions checkpointOptions = new CheckpointOptions(
          props.getCheckpointType(),
          checkpointStorageLocation.getLocationReference());
    // 分别执行executions中的Execution节点
    for (Execution execution: executions) {
       if (props.isSynchronous()) {
          // 如果是同步执行,则调用triggerSynchronousSavepoint()方法
          execution.triggerSynchronousSavepoint(checkpointID, timestamp, 
                                                checkpointOptions,
                                                advanceToEndOfTime);
       } else {
          // 其他情况则调用triggerCheckpoint()异步方法执行
          execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
       }
    }
    // 返回Checkpoint中的CompletionFuture对象
    numUnsuccessfulCheckpointsTriggers.set(0);
    return checkpoint.getCompletionFuture();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    以上就完成了在CheckpointCoordinator中触发Checkpoint的全部操作,具体的执行过程调用Execution完成。

     

    四. Task节点的Checkpoint操作

    在Execution.triggerCheckpoint()方法中实际上调用triggerCheckpointHelper()方法完成Execution对应的Task节点的Checkpoint操作,并通过Task实例触发数据源节点的Checkpoint操作,如代码所示。

    1. 触发准备

    1. 获取当前Execution分配的LogicalSlot,如果LogicalSlot不为空,说明Execution成功分配到Slot计算资源,否则说明Execution中没有资源,Execution对应的Task实例不会被执行和启动。

    2. 调用TaskManagerGateway.triggerCheckpoint()的RPC方法,触发和执行指定Task的Checkpoint操作。

    3. TaskExecutor收到来自CheckpointCoordinator的Checkpoint触发请求后,会在TaskExecutor实例中完成对应Task实例的Checkpoint操作。

    private void triggerCheckpointHelper(long checkpointId, 
                                         long timestamp, 
                                         CheckpointOptions checkpointOptions, 
                                         boolean advanceToEndOfEventTime) {
       final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
       if (advanceToEndOfEventTime 
           && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
           throw new IllegalArgumentException("Only synchronous savepoints are 
             allowed to advance the watermark to MAX.");
       }
          // 获取当前Execution分配的LogicalSlot资源
       final LogicalSlot slot = assignedResource;
       // 如果LogicalSlot不为空,说明Execution运行正常
       if (slot != null) {
          // 通过slot获取TaskManagerGateway对象
          final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
                // 调用triggerCheckpoint()方法
          taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), 
                                               checkpointId, timestamp, 
                                               checkpointOptions,
                                               advanceToEndOfEventTime);
       } else {
          // 否则说明Execution中没有资源,不再执行Execution对应的Task实例
          LOG.debug("The execution has no slot assigned. This indicates that the 
          execution is no longer running.");
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

     

    2. 调用TaskExecutor执行Checkpoint操作

    TaskExecutor接收到来自CheckpointCoordinator的Checkpoint触发请求后,立即根据Execution信息确认Task实例线程,并且调用Task实例触发和执行数据源节点的Checkpoint操作。如代码,TaskExecutor.triggerCheckpoint()方法逻辑如下。

    1. 检查CheckpointType的类型,CheckpointType共有三种类型,分别为CHECKPOINT、SAVEPOINT和SYNC_SAVEPOINT,且只有在同步Savepoints操作时才能调整Watermark为MAX。

    2. 从taskSlotTable中获取Execution对应的Task实例,如果Task实例不为空,则调用task.triggerCheckpointBarrier()方法执行Task实例中的Checkpoint操作。

    3. 如果Task实例为空,说明Task目前处于异常,无法执行Checkpoint操作。此时调用FutureUtils.completedExceptionally()方法,并封装CheckpointException异常信息,返回给管理节点的CheckpointCoordinator进行处理。

    public CompletableFuture<Acknowledge> triggerCheckpoint(
          ExecutionAttemptID executionAttemptID,
          long checkpointId,
          long checkpointTimestamp,
          CheckpointOptions checkpointOptions,
          boolean advanceToEndOfEventTime) {
       log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
          checkpointTimestamp, executionAttemptID);
          //检查CheckpointType,确保只有同步的savepoint操作才能将Watermark调整为MAX
       final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
       if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && 
           checkpointType.isSavepoint())) {
          throw new IllegalArgumentException("Only synchronous savepoints are 
             allowed to advance the watermark to MAX.");
       }
          // 从taskSlotTable中获取当前Execution对应的Task
       final Task task = taskSlotTable.getTask(executionAttemptID);
       // 如果task不为空,则调用triggerCheckpointBarrier()方法
       if (task != null) {
          task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, 
             checkpointOptions, advanceToEndOfEventTime);
       // 返回CompletableFuture对象
          return CompletableFuture.completedFuture(Acknowledge.get());
       } else {
          final String message = "TaskManager received a checkpoint request for 
             unknown task " + executionAttemptID + '.';
          // 如果task为空,则返回CheckpointException异常
          log.debug(message);
          return FutureUtils.completedExceptionally(
              new CheckpointException(message,
    CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

     

    五. 在StreamTask中执行Checkpoint操作

    在执行Task.triggerCheckpointBarrier()方法时,会借助AbstractInvokable中提供的triggerCheckpointAsync()方法触发并执行StreamTask中的Checkpoint操作。

    public Future<Boolean> triggerCheckpointAsync(
          CheckpointMetaData checkpointMetaData,
          CheckpointOptions checkpointOptions,
          boolean advanceToEndOfEventTime) {
       // 异步提交Checkpoint操作
       return mailboxProcessor.getMainMailboxExecutor().submit(    
          () -> triggerCheckpoint(checkpointMetaData, 
                                  checkpointOptions, advanceToEndOfEventTime),
          "checkpoint %s with %s",
          checkpointMetaData,
          checkpointOptions);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    StreamTask.triggerCheckpoint()方法主要逻辑如下。

    1. 调用StreamTask.performCheckpoint()方法执行Checkpoint并返回success信息,用于判断Checkpoint操作是否成功执行。
    2. 如果success信息为False,表明Checkpoint操作没有成功执行,此时调用declineCheckpoint()方法回退。
    boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, 
                                        checkpointMetrics, advanceToEndOfEventTime);
    if (!success) {
       declineCheckpoint(checkpointMetaData.getCheckpointId());
    }
    return success;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在StreamTask.performCheckpoint()方法中,主要执行了Task实例的Checkpoint操作,该方法除了会通过CheckpointCoordinator触发之外,在下游算子通过CheckpointBarrier对齐触发Checkpoint操作时,也会调用该方法执行具体Task的Checkpoint操作。

     
    下篇我们继续看CheckpointBarrier对齐触发Checkpoint的流程,了解StreamTask中performCheckpoint()方法如何执行Checkpoint操作,实现状态数据快照与持久化操作。

     

    参考:《Flink设计与实现:核心原理与源码解析》–张利兵

  • 相关阅读:
    mysql-面试50题-4
    世微 DC-DC降压恒注驱动芯片 LED汽车大灯 过EMC认证 AP2400
    [图解]“敏捷”染色和“额勒金德” 数学
    51单片机循迹小车原理介绍和代码示例
    leetcode-198.打家劫舍
    (七)什么是Vite——vite优劣势、命令
    828 B2B企业节引爆数字化需求 首周惠及中小企业数量同比增长117%
    【Spring-boot】Spring实现策略模式
    SpringBoot项目结合mybatis generator自动生成代码
    OpenAI 再放大招,ChatGPT 正式版 API 就是救世主尼奥,而我们都是愚蠢的人类
  • 原文地址:https://blog.csdn.net/hiliang521/article/details/136187586