• 十九、kafka消费者思考之partition leader切换会引起消费者Rebalance么?


    最近有朋友在问我关于消费组rebalance的问题的时候有提到过引起rebalance的原因,其中一条就是partition leader切换会引起消费组的rebalance,一般来说大家经常提的原因有以下三个
    1、成员数量发生变化,有成员加入组或者退组
    2、订阅的topic发生变化
    3、订阅的topicPartition发生变化
    我也是第一次见人说partition leader切换会引起消费者Rebalance,于是从这个角度来分析一下是不是真的会发生,以及在这种时候消费者做了哪些事情

    样例分析

    我们采用以下的步骤来验证:
    1、准备topic_1,设置groupId为“mykafka-group_4”,设置三个分区,准备三个broker
    2、计算groupId对应的kafka内置分区为37分区,我的集群37分区对应的leader为node1,所以我们像这样操作:
    启动三个broker,让topic_1的分区leader尽量在node2上,然后启动consume1,consume2,
    待rebalance分配之后再关掉node2,这时候node2上对应的分区leader会切换到node1上,然后再观察consume的日志

    启动consume后分配日志如下:

    可以看到consume1跟consume2在分配成功之后都是跟node2节点通信,因为topic_1的leader都在node2上

    • consume1:

    [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_1, groupId=mykafka-group_4] Successfully joined group with generation 34
    [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_1, groupId=mykafka-group_4] Adding newly assigned partitions: topic_1-0
    ……
    [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=mykafka-group_4_1, groupId=mykafka-group_4] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic_1-0)) to broker 127.0.0.1:9093 (id: 2 rack: null)

    • consume2:

    [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Successfully joined group with generation 34
    [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Adding newly assigned partitions: topic_1-2, topic_1-1
    ……
    [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic_1-2, topic_1-1)) to broker 127.0.0.1:9093 (id: 2 rack: null)

    停掉node2后日志如下:

    可以看到在node2停掉之后,consume并没有重新加入组,而是直接切换到node1。

    • consume1:

    [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=mykafka-group_4_1, groupId=mykafka-group_4] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic_1-0)) to broker 127.0.0.1:9093 (id: 2 rack: null)
    [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=mykafka-group_4_1, groupId=mykafka-group_4] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic_1-0)) to broker 127.0.0.1:9093 (id: 2 rack: null)
    [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=mykafka-group_4_1, groupId=mykafka-group_4] Sending READ_UNCOMMITTED FullFetchRequest(topic_1-0) to broker 127.0.0.1:9092 (id: 1 rack: null)
    [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=mykafka-group_4_1, groupId=mykafka-group_4] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic_1-0)) to broker 127.0.0.1:9092 (id: 1 rack: null

    结果分析

    从以上的demo样例可以知道在topicPartition leader所在节点掉线,leader切换时,不会引起consume的rebalance。刚刚我们的demo是停掉的node2,这里又有个疑问了,如果停掉node1会有不一样的结果么,因为node1毕竟是groupCoordinator所在节点,答案是还会是一样的结果,大家也可以去验证一下。

    流程回顾

    图一

    源码分析

    消费者向服务端发送拉取消息请求

    代码入口在 org.apache.kafka.clients.consumer
    .KafkaConsumer#pollForFetches,这块代码比较简单,主要就是先判断是否有消息未处理完,如果有则先处理,如果没有则向服务端发起FetchRequest,然后拿到数据后再处理

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
            long pollTimeout = coordinator == null ? timer.remainingMs() :
                    Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
    
            // if data is available already, return it immediately
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
            if (!records.isEmpty()) {
                return records;
            }
    
            // send any new fetches (won't resend pending fetches)
            fetcher.sendFetches();
    
            // We do not want to be stuck blocking in poll if we are missing some positions
            // since the offset lookup may be backing off after a failure
    
            // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
            // updateAssignmentMetadataIfNeeded before this method.
            if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
                pollTimeout = retryBackoffMs;
            }
    
            Timer pollTimer = time.timer(pollTimeout);
            client.poll(pollTimer, () -> {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasAvailableFetches();
            });
            timer.update(pollTimer.currentTimeMs());
    
            return fetcher.fetchedRecords();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    服务器收到FetchRequest的处理

    代码入口依然在熟悉的类中,kafka.server.KafkaApis#handleFetchRequest。代码调用如下所示

    图二

    最终调用代码kafka.cluster.Partition#checkCurrentLeaderEpoch如下,也就是会校验请求中带的Epoch,如果大于请求中的,则说明当前leader已切换,升级到更高的版本,所以会抛一个Errors.FENCED_LEADER_EPOCH异常回去,促使消费端重新获取leader信息

    private def checkCurrentLeaderEpoch(remoteLeaderEpochOpt: Optional[Integer]): Errors = {
       if (!remoteLeaderEpochOpt.isPresent) {
         Errors.NONE
       } else {
         val remoteLeaderEpoch = remoteLeaderEpochOpt.get
         val localLeaderEpoch = leaderEpoch
         if (localLeaderEpoch > remoteLeaderEpoch)
           Errors.FENCED_LEADER_EPOCH
         else if (localLeaderEpoch < remoteLeaderEpoch)
           Errors.UNKNOWN_LEADER_EPOCH
         else
           Errors.NONE
       }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消费者收到服务端的响应后的处理

    代码在org.apache.kafka.clients.consumer.internals.Fetcher#fetchedRecords中

        public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
            Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
            int recordsRemaining = maxPollRecords;
    
            try {
                while (recordsRemaining > 0) {
                    if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
                        CompletedFetch records = completedFetches.peek();
                        if (records == null) {
                            log.info("record is null !!!!");
                            break;
                        }
    
                        if (records.notInitialized()) {
                            try {
                                //如果返回的消息没有初始化过,则先初始化
                                nextInLineFetch = initializeCompletedFetch(records);
                            } catch (Exception e) {
                                // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
                                // (2) there are no fetched records with actual content preceding this exception.
                                // The first condition ensures that the completedFetches is not stuck with the same completedFetch
                                // in cases such as the TopicAuthorizationException, and the second condition ensures that no
                                // potential data loss due to an exception in a following record.
                                FetchResponse.PartitionData partition = records.partitionData;
                                if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
                                    completedFetches.poll();
                                }
                                throw e;
                            }
                        } else {
                            nextInLineFetch = records;
                        }
                        completedFetches.poll();
                    } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
                        // when the partition is paused we add the records back to the completedFetches queue instead of draining
                        // them so that they can be returned on a subsequent poll if the partition is resumed at that time
                        log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
                        pausedCompletedFetches.add(nextInLineFetch);
                        nextInLineFetch = null;
                    } else {
                        List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);
    
                        if (!records.isEmpty()) {
                            TopicPartition partition = nextInLineFetch.partition;
                            List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                            if (currentRecords == null) {
                                fetched.put(partition, records);
                            } else {
                                // this case shouldn't usually happen because we only send one fetch at a time per partition,
                                // but it might conceivably happen in some rare cases (such as partition leader changes).
                                // we have to copy to a new list because the old one may be immutable
                                List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                                newRecords.addAll(currentRecords);
                                newRecords.addAll(records);
                                fetched.put(partition, newRecords);
                            }
                            recordsRemaining -= records.size();
                        }
                    }
                }
            } catch (KafkaException e) {
                if (fetched.isEmpty())
                    throw e;
            } finally {
                // add any polled completed fetches for paused partitions back to the completed fetches queue to be
                // re-evaluated in the next poll
                completedFetches.addAll(pausedCompletedFetches);
            }
    
            return fetched;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    org.apache.kafka.clients.consumer.internals.Fetcher#initializeCompletedFetch,在这里会判断返回的异常类型,如果为Errors.FENCED_LEADER_EPOCH,则会发送更新元数据请求

    //...省略
    else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
               error == Errors.REPLICA_NOT_AVAILABLE ||
               error == Errors.KAFKA_STORAGE_ERROR ||
               error == Errors.FENCED_LEADER_EPOCH ||
               error == Errors.OFFSET_NOT_AVAILABLE) {
        log.info("Error in fetch for partition {}: {}", tp, error.exceptionName());
        this.metadata.requestUpdate();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    总结

    在服务端partition的leader切换后,不会引起消费者的rebalance。消费者在发送FetchRequest时,若leader已切换,服务端会返回Errors.FENCED_LEADER_EPOCH异常,消费者收到Errors.FENCED_LEADER_EPOCH异常后,会重新向服务端请求更新元数据,从而找到新的leader所在的服务器,最后会与新服务器通信。

  • 相关阅读:
    redux 和 react-redux
    想学设计模式、想搞架构设计,先学学 UML 系统建模吧
    Docker容器之compose容器集群的快速编排
    CentOS7安装时直接跳过了安装信息摘要页面的解决方法
    vue 引用百度地图
    Docker学习-Docker的入门与安装
    大学生数学建模赛题解析及优秀论文-2021电工杯A题高铁牵引供电系统运行数据分析及等值建模(附Python代码)
    【Linux】管道命令split、awk、sed【二】
    c++ vector erase
    【C++实现】浅聊定时器的实现,最小堆配合map实现定时器
  • 原文地址:https://blog.csdn.net/qq_34306010/article/details/125473066