• kafka 3.5 生产者请求中的acks,在服务端如何处理源码


    一、生产者客户端配置参数acks说明

    首先,客户端需要配置一个acks参数,默认值是1,下面是acks各个值的说明
    acks=-1,太慢,acks=0,有风险,acks=1,则是推荐,所以也是默认值的原因

    1、acks=1

    这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。

    2、acks=0

    表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1

    3、acks=-1

    这意味着leader需要等待ISR中所有备份都成功写入日志。只要任何一个备份存活,数据都不会丢失。min.insync.replicas指定必须确认写入才能被认为成功的副本的最小数量。

    二、请求在写入Leader的数据管道之前,则会验证Leader的ISR副本数量和配置中的最小ISR数量

    def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
                                requestLocal: RequestLocal): LogAppendInfo = {
        //函数首先获取leaderIsrUpdateLock的读锁,以确保对Leader和ISR(In-Sync Replica)的更新操作是同步的。
        val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
          //然后检查当前是否有Leader日志,
          leaderLogIfLocal match {
            //如果存在Leader日志,
            case Some(leaderLog) =>
              //则获取最小ISR(MinInSyncReplicas)的配置和ISR的大小。
              val minIsr = leaderLog.config.minInSyncReplicas
              val inSyncSize = partitionState.isr.size
    
              // Avoid writing to leader if there are not enough insync replicas to make it safe,如果没有足够的不同步副本来使其安全,请避免写入领导者
              //如果ISR的大小小于最小ISR要求,并且requiredAcks的值为-1(表示不需要确认),则抛出NotEnoughReplicasException异常。
              if (inSyncSize < minIsr && requiredAcks == -1) {
                throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
                  s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
              }
              //调用Leader日志的appendAsLeader方法将记录作为Leader追加到日志中,并传递相关参数。
              val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
                interBrokerProtocolVersion, requestLocal)
    
              // we may need to increment high watermark since ISR could be down to 1,
              // 我们可能需要增加高水位线,因为 ISR 可能降至 1
              (info, maybeIncrementLeaderHW(leaderLog))
            //如果没有,则抛出NotLeaderOrFollowerException异常。
            case None =>
              throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
                .format(topicPartition, localBrokerId))
          }
        }
        //返回追加记录的信息,并根据是否增加了Leader高水位线,将LeaderHwChange.INCREASED或LeaderHwChange.SAME复制给返回信息的副本。
        info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    1、Leader的ISR小于配置文件中minInSyncReplicas,并且acks=-1,则抛异常

    会验证acks=-1并且当前Leader的ISR副本数量小于配置中规定的最小值

    val minIsr = leaderLog.config.minInSyncReplicas
     val inSyncSize = partitionState.isr.size
     if (inSyncSize < minIsr && requiredAcks == -1) {
                throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
                  s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、如果acks不等于-1,则就算Leader的ISR小于配置,也会正常执行写入数据管道操作

     //调用Leader日志的appendAsLeader方法将记录作为Leader追加到日志中,并传递相关参数。
      val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
       interBrokerProtocolVersion, requestLocal)
    
    • 1
    • 2
    • 3

    三、请求把数据写入到Leader的数据管道后,acks=-1和非-1,有不同的逻辑

    这里不从头开始,如果想知道推送的数据怎么到下面方法的,可以看kafka 3.5 kafka服务端接收生产者发送的数据源码

    /**
      
       *将消息附加到分区的领导副本,并等待它们复制到其他副本;当超时或满足所需的 ACK 时,将触发回调函数;如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁
       * 请注意,所有挂起的延迟检查操作都存储在队列中。所有 ReplicaManager.appendRecords() 的调用方都应为所有受影响的分区调用 ActionQueue.tryCompleteActions,而不会保留任何冲突的锁。
       */
      def appendRecords(timeout: Long,
                        requiredAcks: Short,
                        internalTopicsAllowed: Boolean,
                        origin: AppendOrigin,
                        entriesPerPartition: Map[TopicPartition, MemoryRecords],
                        responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                        delayedProduceLock: Option[Lock] = None,
                        recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
                        requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
        //省略代码                
        //把数据中加入到本地Log                
        val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
            origin, entriesPerPartition, requiredAcks, requestLocal)
         //省略代码      
         //调用recordConversionStatsCallback方法,将每个分区的记录转换统计信息传递给回调函数。
          recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
          //通过 delayedProduceRequestRequired 方法判断是否需要等待其它副本完成写入,如果 acks = -1,则需要等待所有副本的回应
          if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
            //根据条件判断是否需要创建延迟的produce操作。如果需要,创建一个DelayedProduce对象,并将它添加到delayedProducePurgatory中。
            val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
            val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
            //创建(主题、分区)对的列表,以用作此延迟生成操作的键
            val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
            // 再一次尝试完成该延时请求
            //  如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
            delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
    
          } else {
            //如果不需要延迟操作,直接将produce的结果返回给回调函数。
            val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
            responseCallback(produceResponseStatus)
          }else{
    		 //如果不需要延迟操作,直接将produce的结果返回给回调函数。
            // we can respond immediately
            val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
            responseCallback(produceResponseStatus)
    	}
          //省略代码
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1、如果acks=-1,则会创建延迟Produce请求,等待ISR中所有副本的响应

      //它用于判断是否需要延迟发送生产请求并等待复制完成
      // 1. required acks = -1 判断requiredAcks是否等于-1,即是否需要等待所有副本的确认。
      // 2. there is data to append 判断entriesPerPartition是否不为空,即是否有要追加的数据。
       // 3. at least one partition append was successful (fewer errors than partitions) 计算localProduceResults中异常定义的数量,判断其是否小于entriesPerPartition的大小,即是否至少有一个分区的追加操作成功(即比分区数少的错误,如果全错,就应该直接返回)。
      private def delayedProduceRequestRequired(requiredAcks: Short,
                                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                                localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
        requiredAcks == -1 &&
        entriesPerPartition.nonEmpty &&
        localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
     /**
       *检查操作是否可以完成,如果没有,则根据给定的监视键进行监视
       *请注意,可以在多个密钥上监视延迟操作。对于某些键(但不是所有键),操作可能会在添加到监视列表后完成。
       * 在这种情况下,操作被视为已完成,不会添加到其余键的监视列表中。过期收割线程将从存在该操作的任何观察程序列表中删除此操作。
       * @param operation the delayed operation to be checked 要检查的延迟操作
       * @param watchKeys keys for bookkeeping the operation 用于监视的键
       * @return true iff the delayed operations can be completed by the caller 如果延迟操作可以由调用方完成,则为 true
       */
      def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
        assert(watchKeys.nonEmpty, "The watch key list can't be empty")
        //尝试完成操作,如果操作不能立即完成,则将操作添加到所有观察键的观察列表中,并递增estimatedTotalOperations计数器的值
        if (operation.safeTryCompleteOrElse {
          watchKeys.foreach(key => watchForOperation(key, operation))
          if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
        }) return true
        //如果操作仍未完成,则根据条件执行以下操作:
        if (!operation.isCompleted) {
          //如果启用了定时器(timerEnabled为真),则将操作添加到超时定时器中。
          if (timerEnabled)
            timeoutTimer.add(operation)
          //如果操作已完成,则取消定时器任务。
          if (operation.isCompleted) {
            // cancel the timer task
            operation.cancel()
          }
        }
        //返回false表示操作未完成。
        false
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2、如果acks不等于-1,写入到Leader的数据管道后,则直接执行回调函数返回结果

    	   //如果不需要延迟操作,直接将produce的结果返回给回调函数。
            // we can respond immediately
            val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
            responseCallback(produceResponseStatus)
    
    • 1
    • 2
    • 3
    • 4

    四、在返回response时,回调函数会遍历分区异常信息

    
        //用于发送 produce 响应的回调 ProduceResponse 的构造能够接受自动生成的协议数据,因此 KafkaApishandleProduceRequest 应应用自动生成的协议以避免额外的转换
        @nowarn("cat=deprecation")
        def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
          val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
          var errorInResponse = false
    
          mergedResponseStatus.forKeyValue { (topicPartition, status) =>
            if (status.error != Errors.NONE) {
              errorInResponse = true
              debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
                request.header.correlationId,
                request.header.clientId,
                topicPartition,
                status.error.exceptionName))
            }
          }
          //记录带宽和请求配额特定的值,并在违反任何配额时通过静音通道来限制。如果违反了两个配额,请使用两个配额之间的最大限制时间。请注意,如果 acks == 0,则不会强制执行请求配额。
          val timeMs = time.milliseconds()
          val requestSize = request.sizeInBytes
          val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, requestSize, timeMs)
          val requestThrottleTimeMs =
            if (produceRequest.acks == 0) 0
            else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
          val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
          if (maxThrottleTimeMs > 0) {
            request.apiThrottleTimeMs = maxThrottleTimeMs
            if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
              requestHelper.throttle(quotas.produce, request, bandwidthThrottleTimeMs)
            } else {
              requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
            }
          }
          //如果produceRequest.acks等于0,表示不需要响应。
          if (produceRequest.acks == 0) {
            //如果生产者请求,则无需操作;
            //但是,如果在处理请求时出现任何错误,由于生产者期望没有响应,服务器将关闭套接字服务器,以便生产者客户端知道发生了一些错误并刷新其元数据
            if (errorInResponse) {
              //如果errorInResponse为true,则关闭连接并发送包含错误信息的ProduceResponse响应
              val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
                topicPartition -> status.error.exceptionName
              }.mkString(", ")
              info(
                s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
                  s"from client id ${request.header.clientId} with ack=0\n" +
                  s"Topic and partition to exceptions: $exceptionsSummary"
              )
              requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
            } else {
              //如果没有异常,发送无操作的响应。
              requestHelper.sendNoOpResponseExemptThrottle(request)
            }
          } else {
            //如果produceRequest.acks不等于0,将mergedResponseStatus和maxThrottleTimeMs作为参数构造ProduceResponse响应,并通过requestChannel发送响应。
            requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs), None)
          }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    1、如果acks=0,则关闭套接字服务器

    		//如果生产者请求,则无需操作;
            //但是,如果在处理请求时出现任何错误,由于生产者期望没有响应,服务器将关闭套接字服务器,以便生产者客户端知道发生了一些错误并刷新其元数据
            if (errorInResponse) {
              //如果errorInResponse为true,则关闭连接并发送包含错误信息的ProduceResponse响应
              val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
                topicPartition -> status.error.exceptionName
              }.mkString(", ")
              info(
                s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
                  s"from client id ${request.header.clientId} with ack=0\n" +
                  s"Topic and partition to exceptions: $exceptionsSummary"
              )
              requestChannel.closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2、如果acks不等0,会返回异常信息

     requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs), None)
    
    • 1
  • 相关阅读:
    魔兽世界服务端源码各个重要文件详细情况说明开服一条龙
    Redis 核心面试题归纳
    使用c++实现输出爱心(软件:visual Studio)
    ZIP压缩文件的打开密码和自动加密有什么不同?
    SpringJDBC模板类JdbcTemplate
    蓝桥杯决赛PREV-392试题3:信用卡号的验证(第一届)
    IDE安装后配置项--类/方法注释
    离散傅里叶变换(DFT)
    关于串口服务器及转接线的一些基础知识笔记
    【图形学】17 光照模型(二、漫反射的Shader实现)
  • 原文地址:https://blog.csdn.net/weixin_43113679/article/details/132893182