• kafka 3.5 主题分区ISR伸缩源码


    ISR(In-sync Replicas):保持同步的副本
    OSR(Outof-sync Replicas):不同步的副本。最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中
    AR(Assigned Replicas):包括所有的分区的副本,AR=ISR+OSR
    不懂的可以看一下Kafka——副本(Replica)机制

    一、在主题分区初始化时,当前主题分区所有副本都是会Leader副本的maximalIsr中

    如果不知到becomeLeaderOrFollower方法,可以看一下kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

    def becomeLeaderOrFollower(correlationId: Int,
                                 leaderAndIsrRequest: LeaderAndIsrRequest,
                                 onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
         //省略代码
    	val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
                makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
                  highWatermarkCheckpoints, topicIdFromRequest)
        //省略代码          
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    private def makeLeaders(controllerId: Int,
                              controllerEpoch: Int,
                              partitionStates: Map[Partition, LeaderAndIsrPartitionState],
                              correlationId: Int,
                              responseMap: mutable.Map[TopicPartition, Errors],
                              highWatermarkCheckpoints: OffsetCheckpoints,
                              topicIds: String => Option[Uuid]): Set[Partition] = {
    			//省略代码
               //更新分区信息以成为leader,成功则返回true
              if (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
                //将成功成为leader的分区添加到partitionsToMakeLeaders集合中
                partitionsToMakeLeaders += partition
              }
              //省略代码
     }         
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1、先获得leaderIsrUpdateLock写锁,在锁内

    def makeLeader(partitionState: LeaderAndIsrPartitionState,
                     highWatermarkCheckpoints: OffsetCheckpoints,
                     topicId: Option[Uuid]): Boolean = {
        //获取了一个写锁leaderIsrUpdateLock,以确保并发修改的同步。
        val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       	  //省略代码	
          controllerEpoch = partitionState.controllerEpoch
    	 //省略代码	
          val currentTimeMs = time.milliseconds
          //代码检查了isLeader是否为false,如果是,则将isNewLeader设置为true。
          val isNewLeader = !isLeader
          //代码将partitionState中的各种属性转换为Scala集合,并尝试更新分配和ISR状态。
          val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
          val replicas = partitionState.replicas.asScala.map(_.toInt)
          //遍历partitionState生成ISR,isv有此分区所有的副本的信息,包括Leader和Follower
          val isr = partitionState.isr.asScala.map(_.toInt).toSet
          val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
          val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
    	  //省略代码
          //如果分区纪元大于或等于当前分区纪元,则更新分配和 ISR 
          updateAssignmentAndIsr(
            replicas = replicas,
            isLeader = true,
            isr = isr,
            addingReplicas = addingReplicas,
            removingReplicas = removingReplicas,
            LeaderRecoveryState.RECOVERED
          )
        //省略代码。。。。。
        isNewLeader
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    updateAssignmentAndIsr这个会进行初始化ISR

      def updateAssignmentAndIsr(
        replicas: Seq[Int],
        isLeader: Boolean,
        isr: Set[Int],
        addingReplicas: Seq[Int],
        removingReplicas: Seq[Int],
        leaderRecoveryState: LeaderRecoveryState
      ): Unit = {
        if (isLeader) {
          //根据replicas过滤出所有非本地节点的副本标识符,存储在followers中
          val followers = replicas.filter(_ != localBrokerId)
          //通过remoteReplicasMap.keys过滤出需要移除的副本标识符,存储在removedReplicas中
          val removedReplicas = remoteReplicasMap.keys.filterNot(followers.contains(_))
          //。通过迭代followers,将新副本添加到remoteReplicasMap,如果副本已存在,则不进行任何操作。
          followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
          remoteReplicasMap.removeAll(removedReplicas)
        } else {
          //清空remoteReplicasMap
          remoteReplicasMap.clear()
        }
        assignmentState = if (addingReplicas.nonEmpty || removingReplicas.nonEmpty)
          OngoingReassignmentState(addingReplicas, removingReplicas, replicas)
        else
          SimpleAssignmentState(replicas)
    
        partitionState = CommittedPartitionState(isr, leaderRecoveryState)
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    通过调用CommittedPartitionStateISR(代码中字段是maximalIsr)赋值

    2、初始化ISR(只是把所有副本信息保存在maximalIsr,这时候maximalIsr也是最大的时候)

    case class CommittedPartitionState(
      isr: Set[Int],
      leaderRecoveryState: LeaderRecoveryState
    ) extends PartitionState {
      val maximalIsr = isr
      val isInflight = false
    
      override def toString: String = {
        s"CommittedPartitionState(isr=$isr" +
        s", leaderRecoveryState=$leaderRecoveryState" +
        ")"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    至于为什么赋值给maximalIsr,看一下下面第五章1节的PartitionState的定义,其实就知道,ISR还没有正式生效

    二、定时任务针对ISR缩容

    1、2种启动方式

    (1)zk模式

    kakfaServer.scala中的startup方法里会调用replicaManager.startup()

    (2)kraft模式

    BrokerServer.scalastartup方法------>
    sharedServer.loader.installPublishers(metadataPublishers)-------->
    scheduleInitializeNewPublishers(0);------------->
    initializeNewPublishers------------->
    publisher.onMetadataUpdate(delta, image, manifest);实现方法是BrokerMetadataPublisher.scala中的onMetadataUpdate-------------->
    initializeManagers()----------------->
    replicaManager.startup()

    2、定时任务具体实现

    首先直接看定时任务,在ReplicaManager.scala类中

     def startup(): Unit = {
        //启动 ISR 过期线程
        // 从属者在从 ISR 中删除之前最多可以落后于领导者。replicaLagTimeMaxMs x 1.5
        scheduler.schedule("isr-expiration", () => maybeShrinkIsr(), 0L, config.replicaLagTimeMaxMs / 2)
    
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    实现定时执行方法为maybeShrinkIsr

     private def maybeShrinkIsr(): Unit = {
        trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
    
        // Shrink ISRs for non offline partitions
        //收缩非脱机分区的 ISR,即遍历所有在线分区的ISR,
        allPartitions.keys.foreach { topicPartition =>
          onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    (1) 获得leaderIsrUpdateLock的读锁判断是否需要ISR的缩容

      //检查是否需要更新ISR(In-Sync Replica)列表,并在需要更新时执行更新。
    def maybeShrinkIsr(): Unit = {
        def needsIsrUpdate: Boolean = {
          //检查partitionState.isInflight是否为false,并在获取leaderIsrUpdateLock的读锁内部调用needsShrinkIsr()来判断。
          !partitionState.isInflight && inReadLock(leaderIsrUpdateLock) {
            needsShrinkIsr()
          }
        }
    
        if (needsIsrUpdate) {
          val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
            leaderLogIfLocal.flatMap { leaderLog =>
              //获取超过指定延迟时间的不同步副本的ID列表。
              val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
              partitionState match {
                case currentState: CommittedPartitionState if outOfSyncReplicaIds.nonEmpty =>
                  //省略代码
                  //准备更新ISR的操作。
                  Some(prepareIsrShrink(currentState, outOfSyncReplicaIds))
                case _ =>
                  None
              }
            }
          }
          //submitAlterPartition在LeaderAndIsr锁之外发送AlterPartition请求,因为完成逻辑可能会增加高水位线(high watermark)并完成延迟操作。
          alterIsrUpdateOpt.foreach(submitAlterPartition)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    其中needsShrinkIsr的结果决定下面是否执行修改ISR操作

     private def needsShrinkIsr(): Boolean = {
        leaderLogIfLocal.exists { _ => getOutOfSyncReplicas(replicaLagTimeMaxMs).nonEmpty }
      }
       /**
       * 如果追随者已经拥有与领导者相同leo,则不会被视为不同步
       * 1、卡住的追随者:如果副本的 leo 尚未针对 maxLagMs ms 进行更新,则跟随者卡住,应从 ISR 中删除
       * 2、慢速跟随器:如果复制副本在最近 maxLagM 毫秒内未读取 leo,则跟随器滞后,应从 ISR 中删除
       * 这两种情况都是通过检查 lastCaughtUpTimeMs 来处理的,该 lastCaughtUpTimeM 表示副本完全赶上的最后时间。如果违反上述任一条件,则该副本将被视为不同步
       *如果 ISR 更新正在进行中,我们将在此处返回一个空集
       **/
      def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = {
        val current = partitionState
        if (!current.isInflight) {
          val candidateReplicaIds = current.isr - localBrokerId
          val currentTimeMs = time.milliseconds()
          val leaderEndOffset = localLogOrException.logEndOffset
          candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))
        } else {
          Set.empty
        }
      }
    private def isFollowerOutOfSync(replicaId: Int,
                                      leaderEndOffset: Long,
                                      currentTimeMs: Long,
                                      maxLagMs: Long): Boolean = {
        getReplica(replicaId).fold(true) { followerReplica =>
        //这里需要注意是感叹号,结果取反
          !followerReplica.stateSnapshot.isCaughtUp(leaderEndOffset, currentTimeMs, maxLagMs)
        }
    }  
    def isCaughtUp(
        leaderEndOffset: Long,
        currentTimeMs: Long,
        replicaMaxLagMs: Long
      ): Boolean = {
      //如果leo==副本日志的logEndOffset或者当前时间减去最后的拉取时间间隔小于等于replicaMaxLagMs,则返回true,
        leaderEndOffset == logEndOffset || currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs
      }
    }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    (2)得到leaderIsrUpdateLock的写锁开始修改ISR

    执行的操作是prepareIsrShrink方法

      //在缩小 ISR 时,我们不能假设更新会成功,因为如果“AlterPartition”失败,这可能会错误地推进HW。
      // 因此,“PendingShrinkIsr”的“最大 ISR”是当前的 ISR。
     private[cluster] def prepareIsrShrink(
        currentState: CommittedPartitionState,
        outOfSyncReplicaIds: Set[Int]
      ): PendingShrinkIsr = {
     
        //把要去掉的副本从ISR中去掉
        val isrToSend = partitionState.isr -- outOfSyncReplicaIds
        //组建一个新的ISR
        val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList)
        val newLeaderAndIsr = LeaderAndIsr(
          localBrokerId,
          leaderEpoch,
          partitionState.leaderRecoveryState,
          isrWithBrokerEpoch,
          partitionEpoch
        )
        val updatedState = PendingShrinkIsr(
          outOfSyncReplicaIds,
          newLeaderAndIsr,
          currentState
        )
        partitionState = updatedState
        updatedState
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    (3) 缩容后的ISR先赋值给maximalIsr,isr还是保持没有缩容前的

    PendingShrinkIsr方法会给ISR赋值

    case class PendingShrinkIsr(
      outOfSyncReplicaIds: Set[Int],
      sentLeaderAndIsr: LeaderAndIsr,
      lastCommittedState: CommittedPartitionState
    ) extends PendingPartitionChange  {
      val isr = lastCommittedState.isr
      val maximalIsr = isr
      val isInflight = true
    
      def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = {
        alterPartitionListener.markIsrShrink()
      }
    
      override def toString: String = {
        s"PendingShrinkIsr(outOfSyncReplicaIds=$outOfSyncReplicaIds" +
        s", sentLeaderAndIsr=$sentLeaderAndIsr" +
        s", leaderRecoveryState=$leaderRecoveryState" +
        s", lastCommittedState=$lastCommittedState" +
        ")"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    三、Follower请求Leader的Fetch数据时,会判断是否加入ISR

    kafkaApis.scala中的fetch请求处理逻辑中,有判断此次请求是Follower请求还是消费者的请求,或者你可以看一下kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码

     def fetchRecords(
        fetchParams: FetchParams,
        fetchPartitionData: FetchRequest.PartitionData,
        fetchTimeMs: Long,
        maxBytes: Int,
        minOneMessage: Boolean,
        updateFetchState: Boolean
      ): LogReadInfo = {
        def readFromLocalLog(log: UnifiedLog): LogReadInfo = {
          readRecords(
            log,
            fetchPartitionData.lastFetchedEpoch,
            fetchPartitionData.fetchOffset,
            fetchPartitionData.currentLeaderEpoch,
            maxBytes,
            fetchParams.isolation,
            minOneMessage
          )
        }
        //判断获取数据的请求是否来自Follower
        if (fetchParams.isFromFollower) {
          // Check that the request is from a valid replica before doing the read
          val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
            val localLog = localLogWithEpochOrThrow(
              fetchPartitionData.currentLeaderEpoch,
              fetchParams.fetchOnlyLeader
            )
            val replica = followerReplicaOrThrow(
              fetchParams.replicaId,
              fetchPartitionData
            )
            val logReadInfo = readFromLocalLog(localLog)
            (replica, logReadInfo)
          }
          //todo Follower副本在fetch数据后,修改一些信息
          if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
            //如果 fetch 来自 broker 的副本同步,那么就更新相关的 log end offset
            updateFollowerFetchState(
              replica,
              followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
              followerStartOffset = fetchPartitionData.logStartOffset,
              followerFetchTimeMs = fetchTimeMs,
              leaderEndOffset = logReadInfo.logEndOffset,
              fetchParams.replicaEpoch
            )
          }
          logReadInfo
        } 
        //省略代码
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    其中updateFollowerFetchState就是获取数据后进行一些处理

    def updateFollowerFetchState(
        replica: Replica,
        followerFetchOffsetMetadata: LogOffsetMetadata,
        followerStartOffset: Long,
        followerFetchTimeMs: Long,
        leaderEndOffset: Long,
        brokerEpoch: Long
      ): Unit = {
        //通过判断是否存在延迟的DeleteRecordsRequest来确定是否需要计算低水位(lowWatermarkIfLeader)。如果没有延迟的DeleteRecordsRequest,则将oldLeaderLW设为-1。
        val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
        //获取副本的先前的跟随者日志结束偏移量
        val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
        //调用replica.updateFetchState方法来更新副本的抓取状态,包括跟随者的抓取偏移量元数据、起始偏移量、抓取时间、领导者的结束偏移量和代理节点的时期。
        replica.updateFetchState(
          followerFetchOffsetMetadata,
          followerStartOffset,
          followerFetchTimeMs,
          leaderEndOffset,
          brokerEpoch
        )
        //再次判断是否存在延迟的DeleteRecordsRequest,如果没有则将newLeaderLW设为-1。
        val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
        //检查分区的低水位是否增加,即新的低水位(newLeaderLW)是否大于旧的低水位(oldLeaderLW)。
        val leaderLWIncremented = newLeaderLW > oldLeaderLW
        //调用maybeExpandIsr方法来检查是否需要将该同步副本添加到ISR(In-Sync Replicas)中。
        maybeExpandIsr(replica)
    
        //检查分区的高水位是否可以增加。如果副本的日志结束偏移量(replica.stateSnapshot.logEndOffset)发生变化,
        val leaderHWIncremented = if (prevFollowerEndOffset != replica.stateSnapshot.logEndOffset) {
          //尝试增加高水位(maybeIncrementLeaderHW方法),并在leaderIsrUpdateLock锁的保护下执行该操作。
          inReadLock(leaderIsrUpdateLock) {
            leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
          }
        } else {
          false
        }
        //如果低水位或高水位发生变化,则尝试完成延迟请求(tryCompleteDelayedRequests方法)。
        if (leaderLWIncremented || leaderHWIncremented)
          tryCompleteDelayedRequests()
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    1、获得leaderIsrUpdateLock的读锁后再判断是否符合加入ISR条件

    其中maybeExpandIsr方法会尝试把当前副本添加到ISR,和上面定时任务触发的maybeShrinkIsr差不多

     /**
       *  //检查并可能扩展分区的 ISR。
       *如果副本的 LEO >= current hw,并且它在当前前导纪元内被赶到偏移量,则会将其添加到 ISR 中。
       * 副本必须先赶到当前领导者纪元,然后才能加入 ISR,
       * 否则,如果当前领导者的HW和 LEO 之间存在已提交的数据,则副本可能会在获取已提交数据之前成为领导者,并且数据将丢失。
       */
      private def maybeExpandIsr(followerReplica: Replica): Unit = {
        //partitionState不在inflight状态 并且ISR不包含此Follower副本并且分区状态不是isInflight=true,再获取leaderIsrUpdateLock读锁
        val needsIsrUpdate = !partitionState.isInflight && canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock) {
          //再一次判断是否符合条件到ISR的条件
          needsExpandIsr(followerReplica)
        }
        if (needsIsrUpdate) {
          //经过needsIsrUpdate的验证,Follower符合添加到ISR的条件,则获得leaderIsrUpdateLock的写锁进行操作
          val alterIsrUpdateOpt = inWriteLock(leaderIsrUpdateLock) {
            // check if this replica needs to be added to the ISR
            partitionState match {
              case currentState: CommittedPartitionState if needsExpandIsr(followerReplica) =>
                //prepareIsrExpand执行加入操作
                Some(prepareIsrExpand(currentState, followerReplica.brokerId))
              case _ =>
                None
            }
          }
          // Send the AlterPartition request outside of the LeaderAndIsr lock since the completion logic
          // may increment the high watermark (and consequently complete delayed operations).
          alterIsrUpdateOpt.foreach(submitAlterPartition)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
     private def needsExpandIsr(followerReplica: Replica): Boolean = {
        //isFollowerInSync 会判断Follower副本的leo是否大于当前Leader的HW,大于则为true
        canAddReplicaToIsr(followerReplica.brokerId) && isFollowerInSync(followerReplica)
      }
      //条件1
      private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
        val current = partitionState
        !current.isInflight &&
          !current.isr.contains(followerReplicaId) &&
          isReplicaIsrEligible(followerReplicaId)
      }
      //判断副本是否符合成为ISR(In-Sync Replica)的条件
      private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
        metadataCache match {
          //对于KRaft元数据缓存
          //1、副本没有被标记为已隔离(fenced)
          //2、副本不处于受控关机状态(controlled shutdown)。
          //3、副本的元数据缓存的Broker epoch与其Fetch请求的Broker epoch匹配,或者Fetch请求的Broker epoch为-1(绕过epoch验证)。
          case kRaftMetadataCache: KRaftMetadataCache =>
            val storedBrokerEpoch = remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch
            val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId)
            !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
              !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
              isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
    
          //对于ZK元数据缓存,只需确保副本是存活的Broker即可。尽管这里没有检查正在关闭的Broker,但控制器会阻止它们加入ISR。
          case zkMetadataCache: ZkMetadataCache =>
            zkMetadataCache.hasAliveBroker(followerReplicaId)
    
          case _ => true
        }
      } 
      //条件2
     private def isFollowerInSync(followerReplica: Replica): Boolean = {
        leaderLogIfLocal.exists { leaderLog =>
          val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
          followerEndOffset >= leaderLog.highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
        }
      }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    2、获得leaderIsrUpdateLock的写锁再执行写入操作

    方法是prepareIsrExpand

      //在扩展 ISR 时,我们假设新副本将在我们收到确认之前将其放入 ISR。
      // 这可确保HW已经反映更新的 ISR,即使在我们收到确认之前有延迟。
      // 或者,如果更新失败,则不会造成任何损害,因为扩展的 ISR 对HW的推进提出了更严格的要求。
      private def prepareIsrExpand(
        currentState: CommittedPartitionState,
        newInSyncReplicaId: Int
      ): PendingExpandIsr = {
        //将当前的ISR与新的In-Sync Replica ID相结合,得到要发送的ISR列表isrToSend
        val isrToSend = partitionState.isr + newInSyncReplicaId
        //调用addBrokerEpochToIsr方法为ISR列表中的每个副本添加Broker Epoch,并将结果存储在isrWithBrokerEpoch中。
        val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList)
        //使用localBrokerId作为新的leader,将其他参数从当前的分区状态中获取,并创建一个新的LeaderAndIsr对象newLeaderAndIsr。
        val newLeaderAndIsr = LeaderAndIsr(
          localBrokerId,
          leaderEpoch,
          partitionState.leaderRecoveryState,
          isrWithBrokerEpoch,
          partitionEpoch
        )
        //创建一个PendingExpandIsr对象updatedState,其中包含新的In-Sync Replica ID、新的LeaderAndIsr对象和当前状态
        val updatedState = PendingExpandIsr(
          newInSyncReplicaId,
          newLeaderAndIsr,
          currentState
        )
        //将partitionState更新为updatedState。
        //返回updatedState作为结果。
        partitionState = updatedState
        updatedState
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3、写入操作把新的Follower副本先加入maximalIsr,isr保持扩容前的

    case class PendingExpandIsr(
      newInSyncReplicaId: Int,
      sentLeaderAndIsr: LeaderAndIsr,
      lastCommittedState: CommittedPartitionState
    ) extends PendingPartitionChange {
      //这个是现在正在生效的ISR集合
      val isr = lastCommittedState.isr
      //而maximalIsr包含还没有正式生效的,防止因为修改失败影响流程
      val maximalIsr = isr + newInSyncReplicaId
      val isInflight = true
    
      def notifyListener(alterPartitionListener: AlterPartitionListener): Unit = {
        alterPartitionListener.markIsrExpand()
      }
    
      override def toString: String = {
        s"PendingExpandIsr(newInSyncReplicaId=$newInSyncReplicaId" +
        s", sentLeaderAndIsr=$sentLeaderAndIsr" +
        s", leaderRecoveryState=$leaderRecoveryState" +
        s", lastCommittedState=$lastCommittedState" +
        ")"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    四、修改完maximalIsr后都要把信息发给其他副本

    上面不管是定时任务中的maybeShrinkIsr还是fetch请求中的maybeExpandIsr方法,都会执行到下面这个函数

    alterIsrUpdateOpt.foreach(submitAlterPartition)
    
    • 1
    private def submitAlterPartition(proposedIsrState: PendingPartitionChange): CompletableFuture[LeaderAndIsr] = {
        debug(s"Submitting ISR state change $proposedIsrState")
        //alterIsrManager.submit是提交 ISR 状态更改,zk模式和kraft模式执行不同的函数
        //zk是ZkAlterPartitionManager中的submit
        //kraft是DefaultAlterPartitionManager中的submit
        val future = alterIsrManager.submit(
          new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition),
          proposedIsrState.sentLeaderAndIsr,
          controllerEpoch
        )
        future.whenComplete { (leaderAndIsr, e) =>
          var hwIncremented = false
          var shouldRetry = false
    
          inWriteLock(leaderIsrUpdateLock) {
            if (partitionState != proposedIsrState) {
              //这意味着partitionState在我们得到AlterPartition响应之前,是通过领导者选举或其他机制更新的。我们不知道控制器上到底发生了什么,但我们知道此响应已过时,因此我们忽略它。
            //省略代码
            } else if (leaderAndIsr != null) {
              //修改ISR,并且返回高位水是否递增
              hwIncremented = handleAlterPartitionUpdate(proposedIsrState, leaderAndIsr)
            } else {
              shouldRetry = handleAlterPartitionError(proposedIsrState, Errors.forException(e))
            }
          }
          //高水位标记是否增加。
          if (hwIncremented) {
            tryCompleteDelayedRequests()
          }
          if (shouldRetry) {
            //需要重试则自己调用自己
            submitAlterPartition(proposedIsrState)
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    1、zk模式

    //将给定的leaderAndIsr信息写入ZooKeeper,并返回一个LeaderAndIsr对象。
      override def submit(
        topicIdPartition: TopicIdPartition,
        leaderAndIsr: LeaderAndIsr,
        controllerEpoch: Int
      ): CompletableFuture[LeaderAndIsr]= {
        debug(s"Writing new ISR ${leaderAndIsr.isr} to ZooKeeper with version " +
          s"${leaderAndIsr.partitionEpoch} for partition $topicIdPartition")
        //调用ReplicationUtils.updateLeaderAndIsr方法更新ZooKeeper中的leaderAndIsr信息,并返回更新是否成功(updateSucceeded)以及新的版本号(newVersion)。
        val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicIdPartition.topicPartition,
          leaderAndIsr, controllerEpoch)
        val future = new CompletableFuture[LeaderAndIsr]()
        if (updateSucceeded) {
          //使用synchronized关键字同步访问isrChangeSet,
          // Track which partitions need to be propagated to the controller
          //isrChangeSet是通过定时任务触发
          isrChangeSet synchronized {
            //将topicIdPartition.topicPartition添加到isrChangeSet中。
            isrChangeSet += topicIdPartition.topicPartition
            //使用lastIsrChangeMs记录最后一次ISR更改的时间。
            lastIsrChangeMs.set(time.milliseconds())
          }
          //使用leaderAndIsr.withPartitionEpoch(newVersion)更新leaderAndIsr的分区时代,并将其设置为future的结果。
          future.complete(leaderAndIsr.withPartitionEpoch(newVersion))
        } else {
         	//省略代码
        }
        future
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    定时任务修改zk节点进行传播

    kakfaServer.scala中启动函数会执行如下命令

     alterPartitionManager.start()
    
    • 1

    其中alterPartitionManager的实现是ZkAlterPartitionManager
    实际执行的是如下代码创建定时任务

    override def start(): Unit = {
        scheduler.schedule("isr-change-propagation", () => maybePropagateIsrChanges(), 0L,
          isrChangeNotificationConfig.checkIntervalMs)
      }
    
    • 1
    • 2
    • 3
    • 4
    /**
       * 此函数定期运行以查看是否需要传播 ISR。它在以下情况下传播 ISR:
       * 1. 尚未传播 ISR 更改。
       * 2. 最近 5 秒内没有 ISR 更改,或者自上次 ISR 传播以来已超过 60 秒。
       * 这允许在几秒钟内传播偶尔的 ISR 更改,并避免在发生大量 ISR 更改时使控制器和其他代理不堪重负。
       */
      private[server] def maybePropagateIsrChanges(): Unit = {
        val now = time.milliseconds()
        isrChangeSet synchronized {
          if (isrChangeSet.nonEmpty &&
            (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
              lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) {
            zkClient.propagateIsrChanges(isrChangeSet)
            isrChangeSet.clear()
            lastIsrPropagationMs.set(now)
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2、kraft模式

    override def submit(
        topicIdPartition: TopicIdPartition,
        leaderAndIsr: LeaderAndIsr,
        controllerEpoch: Int
      ): CompletableFuture[LeaderAndIsr] = {
        val future = new CompletableFuture[LeaderAndIsr]()
        val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future, controllerEpoch)
        //把要修改的LeaderAndIsr信息放入到map中
        val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) == null
        if (enqueued) {
          maybePropagateIsrChanges()
        } else {
          future.completeExceptionally(new OperationNotAttemptedException(
            s"Failed to enqueue ISR change state $leaderAndIsr for partition $topicIdPartition"))
        }
        future
      }
    
      private[server] def maybePropagateIsrChanges(): Unit = {
        //如果尚未收到请求,请发送所有待处理项目。
        if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
          //复制当前未发送的 ISR,但不从映射中删除,它们会在响应处理程序中清除
          val inflightAlterPartitionItems = new ListBuffer[AlterPartitionItem]()
          unsentIsrUpdates.values.forEach(item => inflightAlterPartitionItems.append(item))
          sendRequest(inflightAlterPartitionItems.toSeq)
        }
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    通过给controllerChannelManager发送请求通知

    其中controllerChannelManager是在BrokerServer.scala初始化时执行 alterPartitionManager.start() ,实现类是DefaultAlterPartitionManager,执行的是start方法,方法内部是controllerChannelManager.start()

      private def sendRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): Unit = {
        val brokerEpoch = brokerEpochSupplier()
        //构建一个AlterPartition请求,并返回请求对象request以及一个映射topicNamesByIds
        val (request, topicNamesByIds) = buildRequest(inflightAlterPartitionItems, brokerEpoch)
        debug(s"Sending AlterPartition to controller $request")
    
        //我们不会使 AlterPartition 请求超时,而是让它无限期地重试,直到收到响应,或者新的 LeaderAndIsr 覆盖现有的 isrState,从而导致忽略这些分区的响应
        //controllerChannelManager.sendRequest方法用于将请求发送给控制器,并提供一个ControllerRequestCompletionHandler作为回调处理程序。
        controllerChannelManager.sendRequest(request,
          new ControllerRequestCompletionHandler {
            override def onComplete(response: ClientResponse): Unit = {
              debug(s"Received AlterPartition response $response")
              val error = try {
                if (response.authenticationException != null) {
                  // For now we treat authentication errors as retriable. We use the
                  // `NETWORK_EXCEPTION` error code for lack of a good alternative.
                  // Note that `BrokerToControllerChannelManager` will still log the
                  // authentication errors so that users have a chance to fix the problem.
                  Errors.NETWORK_EXCEPTION
                } else if (response.versionMismatch != null) {
                  Errors.UNSUPPORTED_VERSION
                } else {
                  //处理响应
                  handleAlterPartitionResponse(
                    response.requestHeader,
                    response.responseBody.asInstanceOf[AlterPartitionResponse],
                    brokerEpoch,
                    inflightAlterPartitionItems,
                    topicNamesByIds
                  )
                }
              } finally {
                // clear the flag so future requests can proceed
                clearInFlightRequest()
              }
    			//省略代码
           			
            }
    			//省略代码
          })
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    其中handleAlterPartitionResponse是处理请求后响应结果的函数

    def handleAlterPartitionResponse(
        requestHeader: RequestHeader,
        alterPartitionResp: AlterPartitionResponse,
        sentBrokerEpoch: Long,
        inflightAlterPartitionItems: Seq[AlterPartitionItem],
        topicNamesByIds: mutable.Map[Uuid, String]
      ): Errors = {
        val data = alterPartitionResp.data
    
        Errors.forCode(data.errorCode) match {
          //省略代码。。。。
          case Errors.NONE =>
            //创建一个partitionResponses的可变哈希映射,用于存储分区级别的响应。
            val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
            data.topics.forEach { topic =>
             	//省略代码
                topic.partitions.forEach { partition =>
                  //创建一个TopicPartition对象,表示主题和分区索引。
                  val tp = new TopicPartition(topicName, partition.partitionIndex)
                  val apiError = Errors.forCode(partition.errorCode)
                  debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
                  if (apiError == Errors.NONE) {
                    //解析分区的leaderRecoveryState,如果有效,则将分区的响应存储到partitionResponses中。
                    LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match {
                      case Some(leaderRecoveryState) =>
                        partitionResponses(tp) = Right(
                          LeaderAndIsr(
                            partition.leaderId,
                            partition.leaderEpoch,
                            partition.isr.asScala.toList.map(_.toInt),
                            leaderRecoveryState,
                            partition.partitionEpoch
                          )
                        )
                      //省略代码  
                    }
                  } else {
                    partitionResponses(tp) = Left(apiError)
                  }
                }
            }
            //遍历入参的inflightAlterPartitionItems,可以和响应结果对应,
            inflightAlterPartitionItems.foreach { inflightAlterPartition =>
              partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) match {
                case Some(leaderAndIsrOrError) =>
                  //如果找到响应,将其从unsentIsrUpdates中移除,并根据响应的类型完成inflightAlterPartition.future。
                  unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)
                  leaderAndIsrOrError match {
                    case Left(error) => inflightAlterPartition.future.completeExceptionally(error.exception)
                    case Right(leaderAndIsr) => inflightAlterPartition.future.complete(leaderAndIsr)
                  }
               //省略代码
              }
            }
    		//省略代码
        }
       //省略代码
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    五、 maximalIsr和isr

    1、PartitionState中,isr和maximalIsr两个字段的定义和为什么上面只是修改了maximalIsr,而不是修改isr

    sealed trait PartitionState {
      /**
       * 仅包括已提交到 ZK 的同步副本。
       */
      def isr: Set[Int]
    
      /**
       *此集可能包括扩展后未提交的 ISR 成员。此“有效”ISR 用于推进高水位线以及确定 acks=all produce 请求需要哪些副本
       */
      def maximalIsr: Set[Int]
    
      /**
       * The leader recovery state. See the description for LeaderRecoveryState for details on the different values.
       */
      def leaderRecoveryState: LeaderRecoveryState
    
      /**
         * 指示我们是否有正在进行的 更改分区 请求。
       */
      def isInflight: Boolean
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    原因以maybeShrinkIsr举例:

    maybeShrinkIsr方法更新的是maximalIsr变量,而不是ISR列表本身。maximalIsr是一个优化变量,用于表示在上一次调用maybeShrinkIsr方法时,ISR列表的最大长度。这样,Kafka可以通过检查当前ISR列表的长度与maximalIsr的大小来判断是否需要进行收缩操作。更新maximalIsr变量而不是直接更新ISR列表本身可以减少内存拷贝的开销,因为ISR列表可能在方法调用期间频繁地被更新。另外,只更新maximalIsr变量而不更新ISR列表本身可以保持ISR列表的稳定性,以便其他并发操作可以安全地访问ISR列表。

    2、什么时候maximalIsr会给isr赋值

    目前知道有几种
    1、Leader选举时会修改isr
    2、broker之间的心跳会适当修改isr
    3、生产者发送数据到服务端,会适当修改isr

    这里折磨了我2天,还是没找到什么时候isr中的数据会根据maximalIsr修改,网关资料都没有查到,只是说适当的时机,这个时机在哪里?或者都讲解到修改maximalIsr就结束了,就认为isr修改成功了,我连单元测试都看了,下面分析一个单元测试,大家如果有结果可以在评论里给一下答案,

    @ParameterizedTest                                                                                         
    @ValueSource(strings = Array("zk", "kraft"))                                                               
    def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {                                
      val kraft = quorum == "kraft"                                                                            
                                                                                                               
      val log = logManager.getOrCreateLog(topicPartition, topicId = None)                                      
      seedLogData(log, numRecords = 10, leaderEpoch = 4)                                                       
                                                                                                               
      val controllerEpoch = 0                                                                                  
      val leaderEpoch = 5                                                                                      
      val remoteBrokerId = brokerId + 1                                                                        
      val replicas = List(brokerId, remoteBrokerId)                                                            
      val isr = Set(brokerId)                                                                                  
                                                                                                               
      val metadataCache: MetadataCache = if (kraft) mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetad
      if (kraft) {                                                                                             
        addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas)            
      }                                                                                                        
                                                                                                               
      // Mark the remote broker as eligible or ineligible in the metadata cache of the leader.                 
      // When using kraft, we can make the broker ineligible by fencing it.                                    
      // In ZK mode, we must mark the broker as alive for it to be eligible.                                   
      def markRemoteReplicaEligible(eligible: Boolean): Unit = {                                               
        if (kraft) {                                                                                           
          when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligi
        } else {                                                                                               
          when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible)                              
        }                                                                                                      
      }                                                                                                        
      //初始化分区                                                                                                         
      val partition = new Partition(                                                                           
        topicPartition,                                                                                        
        replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,                                                    
        interBrokerProtocolVersion = MetadataVersion.latest,                                                   
        localBrokerId = brokerId,                                                                              
        () => defaultBrokerEpoch(brokerId),                                                                    
        time,                                                                                                  
        alterPartitionListener,                                                                                
        delayedOperations,                                                                                     
        metadataCache,                                                                                         
        logManager,                                                                                            
        alterPartitionManager                                                                                  
      )                                                                                                        
                                                                                                               
      partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)          
      assertTrue(partition.makeLeader(                                                                         
        new LeaderAndIsrPartitionState()                                                                       
          .setControllerEpoch(controllerEpoch)                                                                 
          .setLeader(brokerId)                                                                                 
          .setLeaderEpoch(leaderEpoch)                                                                         
          .setIsr(isr.toList.map(Int.box).asJava)                                                              
          .setPartitionEpoch(1)                                                                                
          .setReplicas(replicas.map(Int.box).asJava)                                                           
          .setIsNew(true),                                                                                     
        offsetCheckpoints, None), "Expected become leader transition to succeed")                              
      assertEquals(isr, partition.partitionState.isr)                                                          
      assertEquals(isr, partition.partitionState.maximalIsr)                                                   
                                                                                                               
      markRemoteReplicaEligible(true)                                                                          
                                                                                                               
      // Fetch to let the follower catch up to the log end offset and                                   
    // to check if an expansion is possible.                                                          
    //获取以让追随者赶上日志结束偏移量和检查是否可以扩展                                                                       
    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)              
                                                                                                      
    // Follower fetches and catches up to the log end offset.                                         
    //追随者获取并赶上日志结束偏移量。                                                                                
    assertReplicaState(partition, remoteBrokerId,                                                     
      lastCaughtUpTimeMs = time.milliseconds(),                                                       
      logStartOffset = 0L,                                                                            
      logEndOffset = log.logEndOffset                                                                 
    )                                                                                                 
                                                                                                      
    // Expansion is triggered.                                                                        
    //扩展被触发。                                                                                          
    assertEquals(isr, partition.partitionState.isr)                                                   
    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)                                 
    assertEquals(1, alterPartitionManager.isrUpdates.size)                                            
                                                                                                      
    // Controller rejects the expansion because the broker is fenced or offline.                      
    //控制器拒绝扩展,因为代理处于受防护或脱机状态。                                                                         
    alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)                                    
                                                                                                                                           
                                                                                                               
      // The leader reverts back to the previous ISR.                                                          
      //领导者将恢复到以前的 ISR。                                                                                        
      assertEquals(isr, partition.partitionState.isr)                                                          
      assertEquals(isr, partition.partitionState.maximalIsr)                                                   
      assertFalse(partition.partitionState.isInflight)                                                         
      assertEquals(0, alterPartitionManager.isrUpdates.size)                                                   
                                                                                                               
      // The leader eventually learns about the fenced or offline broker.                                      
      markRemoteReplicaEligible(false)                                                                         
                                                                                                               
      // The follower fetches again.                                                                           
      //追随者再次获取                                                                                                
      fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)                     
                                                                                                               
      // Expansion is not triggered because the follower is fenced.                                            
      //不会触发扩展,因为追随者被围栏                                                                                        
      assertEquals(isr, partition.partitionState.isr)                                                          
      assertEquals(isr, partition.partitionState.maximalIsr)                                                   
      assertFalse(partition.partitionState.isInflight)                                                         
      assertEquals(0, alterPartitionManager.isrUpdates.size)                                                   
                                                                                                               
      // The broker is eventually unfenced or brought back online.                                             
      //经纪人最终被解除围栏或重新上线。                                                                                       
      markRemoteReplicaEligible(true)                                                                          
                                                                                                               
      // The follower fetches again.                                                                           
      //追随者再次获取。                                                                                               
      fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)                     
                                                                                                               
      // Expansion is triggered.                                                                               
      //扩展被触发。                                                                                                 
      assertEquals(isr, partition.partitionState.isr)                                                          
      assertEquals(replicas.toSet, partition.partitionState.maximalIsr)                                        
      assertTrue(partition.partitionState.isInflight)                                                          
      assertEquals(1, alterPartitionManager.isrUpdates.size)                                                   
                                                                                                               
      // Expansion succeeds.                                                                                   
      //扩容成功。                                                                                                  
      alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1)                                           
                                                                                                               
      // ISR is committed.                                                                                     
      //todo ISR 已提交。                                                                                          
      assertEquals(replicas.toSet, partition.partitionState.isr)                                               
      assertEquals(replicas.toSet, partition.partitionState.maximalIsr)                                        
      assertFalse(partition.partitionState.isInflight)                                                         
      assertEquals(0, alterPartitionManager.isrUpdates.size)                                                   
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131

    注意上面alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1) ,在这条命令之前,maximalIsr已经是最新的了,而isr还是旧的,当执行完这个命令后,isrmaximalIsr已经相同了,都是最新的了
    其中alterPartitionManager.completeIsrUpdate执行的是TestUtils类中如下方法,

    class MockAlterPartitionManager extends AlterPartitionManager {
        val isrUpdates: mutable.Queue[AlterPartitionItem] = new mutable.Queue[AlterPartitionItem]()
        val inFlight: AtomicBoolean = new AtomicBoolean(false)
    
    	//这个命令会在fetchFollower命令里面执行,执行链条
    	//fetchFollower->fetchRecords->updateFollowerFetchState->maybeExpandIsr->submitAlterPartition->submit
    	//主要是把数据存入isrUpdates
        override def submit(
          topicPartition: TopicIdPartition,
          leaderAndIsr: LeaderAndIsr,
          controllerEpoch: Int
        ): CompletableFuture[LeaderAndIsr]= {
          val future = new CompletableFuture[LeaderAndIsr]()
          if (inFlight.compareAndSet(false, true)) {
            isrUpdates += AlterPartitionItem(
              topicPartition,
              leaderAndIsr,
              future,
              controllerEpoch
            )
          } else {
            future.completeExceptionally(new OperationNotAttemptedException(
              s"Failed to enqueue AlterIsr request for $topicPartition since there is already an inflight request"))
          }
          future
        }
        def completeIsrUpdate(newPartitionEpoch: Int): Unit = {
          if (inFlight.compareAndSet(true, false)) {
            val item = isrUpdates.dequeue()
            //第四章节,kraft模式,inflightAlterPartition.future.complete
            //第四章节,zk模式,future.complete(leaderAndIsr.withPartitionEpoch(newVersion))
            item.future.complete(item.leaderAndIsr.withPartitionEpoch(newPartitionEpoch))
          } else {
            fail("Expected an in-flight ISR update, but there was none")
          }
        }
     }   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    其中isrUpdates.dequeue()出来的就是AlterPartitionItem,之后执行item.future.complete,之后isr修改完了,很莫名其妙,
    我分析了第四章节和这个命令一样功能代码,他这里也没有future.whenComplete的后续处理,但是也修改了isr,不明白

  • 相关阅读:
    【数据结构】一种令人愉悦的链表——带头双向循环链表(代码实现)
    Spring之Aware接口
    Linux和Windows系统有何区别
    前端使用 Konva 实现可视化设计器(16)- 旋转对齐、触摸板操作的优化
    【JVM笔记】堆的核心概述与堆空间大小的设置与查看
    GLTF和GLB的区别,各自生成方法
    Array reduce() 如何计算元素内数值总合,计算总合
    node.js的express模块实现GET和POST请求
    MapReduce论文解读
    《模式识别》教学上机实验报告
  • 原文地址:https://blog.csdn.net/weixin_43113679/article/details/132747339