• KafkaConsumer 消费逻辑


    版本:kafka-clients-2.0.1.jar

    之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。
    下面是相关的源码,并通过注释的方式进行说明。

    先结论:kafkaConsumer 拉取消息的 offset 是存本地的,根据 offset 拉取消息。开启自动提交时,会自动提交 offset 到 broker(在一些场景下会手动检查是否需要提交),防止重启或reblance时 offset 丢失。而本地保存的 offset 是本地拉取到消息时就更新的,所以自动提交的场景下,在消费前过滤掉消息没有影响。

    拉取消息

    KafkaConsumer#poll

    private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {
        // note: 获取轻锁同时检查非多线程环境,并检查 consumer 开启状态 (可以close的)
        acquireAndEnsureOpen();
        try {
            if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");
    
            // note: subscriptions:SubscriptionState  维护了当前消费者订阅的主题列表的状态信息(组、offset等)
            //   方法判断是否未订阅或未分配分区
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
    
            // poll for new data until the timeout expires
            long elapsedTime = 0L;
            do {
                // note: 是否触发了唤醒操作 (调用了当前对象的 wakeup 方法) 通过抛异常的方式退出当前方法,(这里是while循环,可能一直在拉取消息,(无新消息时))
                client.maybeTriggerWakeup();
    
                final long metadataEnd;
                if (includeMetadataInTimeout) {
                    final long metadataStart = time.milliseconds();
                    // note: 更新分区分配元数据以及offset, remain是用来算剩余时间的
                    // 内部逻辑:
                    //  1 协调器 ConsumerCoordinator.poll 拉取协调器事件(期间会发送心跳、自动提交)
                    //  2 updateFetchPositions 更新positions,(但本地有positions数据就不更新,更新完pos后,如果还有缺的,就先使用reset策略,最后异步设置pos)
                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
                        return ConsumerRecords.empty();
                    }
                    metadataEnd = time.milliseconds();
                    elapsedTime += metadataEnd - metadataStart;
                } else {
                    while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {
                        log.warn("Still waiting for metadata");
                    }
                    metadataEnd = time.milliseconds();
                }
                
                //note: 这里终于开始拉取消息了,下面单独讲一下
                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));
    
                if (!records.isEmpty()) {
                    //note: 翻译:返回之前,发送下一个拉取的请求避免阻塞response
    
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.pollNoWakeup();
                    }
    
                    //note:  这里使用拦截器拦截一下,这里可以对消息进行修改或过滤,但需要注意commit的问题
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
                final long fetchEnd = time.milliseconds();
                elapsedTime += fetchEnd - metadataEnd;
    
            } while (elapsedTime < timeoutMs);
    
            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    关于 pollForFetches 的逻辑

    pollForFetches

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
        final long startMs = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);
    
        // note: 先获取已经拉取了的消息,存在就直接返回
        //  fetcher 内部有一个 completedFetches 暂存预拉取的请求,可解析出 nextLineRecords 用于暂存预拉取的消息
        //    从 nextLineRecords 获取消息时,先判断一下状态(如assigned、paused、position),
        //      然后获取到消息后,再更新 subscriptions 中的 position 位置(值为下一个的offset), 注意这个时候还没commit
        
        // if data is available already, return it immediately
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }
    
        // note: 没有预拉取的消息,发送拉取请求(实际没发) 
        //  先找到partition的leader,检查可用,检查没有待处理的请求,然后从 subscriptions 获取 position,构建ClientRequest暂存
        //  以及设置listener (成功则处理结果入队列completedFetches)
        
        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();
    
        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure
    
        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }
    
        // note: 轮询等待,详见下文
    
        client.poll(pollTimeout, startMs, () -> {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        });
    
        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.rejoinNeededOrPending()) {
            return Collections.emptyMap();
        }
    
        return fetcher.fetchedRecords();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    ConsumerNetworkClient#poll

    /**
     * Poll for any network IO.
     * @param timeout timeout in milliseconds
     * @param now current time in milliseconds
     * @param disableWakeup If TRUE disable triggering wake-ups
     */
    public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {
        
        // note: 触发已完成的请求的回调处理器  (有一个pendingCompletion的队列)
        // there may be handlers which need to be invoked if we woke up the previous call to poll
        firePendingCompletedRequests();
    
        lock.lock();
        try {
            // note: 处理断开的连接 (pendingDisconnects队列)
            // Handle async disconnects prior to attempting any sends
            handlePendingDisconnects();
    
            // note: 实际上这里才真正发出请求。。 前面那个feature只是构建request
            //  前面准备的 ClientRequest 放在一个 UnsentRequests (内部map, key:Node,val: requests)中
            //  这里面取出来进行发送, kafkaClient.ready -> send
            // send all the requests we can send now
            long pollDelayMs = trySend(now);
            timeout = Math.min(timeout, pollDelayMs);
    
            // note: 这里主要是判断是否需要阻塞 poll (timeout是否为0) 如果没有待完成且判断应该阻塞(completedFetches为空)则阻塞
            //  poll 里面是从 sockets 里面读写数据
            
            // check whether the poll is still needed by the caller. Note that if the expected completion
            // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
            // handler), the client will be woken up.
            if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
                // if there are no requests in flight, do not block longer than the retry backoff
                if (client.inFlightRequestCount() == 0)
                    timeout = Math.min(timeout, retryBackoffMs);
                client.poll(Math.min(maxPollTimeoutMs, timeout), now);
                now = time.milliseconds();
            } else {
                client.poll(0, now);
            }
    
            // note: 检查断开的链接,判断node连接是否断开,是则从unset中取出对应requests,构建response加到completedFetches中
            
            // handle any disconnects by failing the active requests. note that disconnects must
            // be checked immediately following poll since any subsequent call to client.ready()
            // will reset the disconnect status
            checkDisconnects(now);
            if (!disableWakeup) {
                // trigger wakeups after checking for disconnects so that the callbacks will be ready
                // to be fired on the next call to poll()
                maybeTriggerWakeup();
            }
            // throw InterruptException if this thread is interrupted
            maybeThrowInterruptException();
    
            // note: 再发一次请求,推测是可能部分 node 的连接在第一次没有ready (没ready会进行初始化,并返回false)
            // try again to send requests since buffer space may have been
            // cleared or a connect finished in the poll
            trySend(now);
    
            // fail requests that couldn't be sent if they have expired
            failExpiredRequests(now);
    
            // clean unsent requests collection to keep the map from growing indefinitely
            unsent.clean();
        } finally {
            lock.unlock();
        }
    
        // called without the lock to avoid deadlock potential if handlers need to acquire locks
        firePendingCompletedRequests();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    自动提交

    提交 offset 是为了防止重启或 rebalance 后,导致本地 position 丢失无法正常拉取后面的消息。

    入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync

    触发逻辑主要是

    • KafkaConsumer#poll 拉消息
    • -> KafkaConsumer#updateAssignmentMetadataIfNeeded
    • -> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (也是先构建请求存 unset 里面,等拉消息的时候再发出去)
        public void maybeAutoCommitOffsetsAsync(long now) {
            // 这里用来判断是否满足自动提交的间隔
            if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
                this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
                doAutoCommitOffsetsAsync();
            }
        }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    JS操作字符串面试题系列(2)-每天做5题
    Spring事务与MyBatis事务的集成:通过ThreadLocal实现绑定
    力扣labuladong——一刷day06
    两台Linux机器scp不输密码
    vue3基础知识
    yarn安装报错:No license field
    【定语从句练习题】限制性与非限制性
    ShardingSphere|shardingJDBC - 在使用数据分片功能情况下无法配置读写分离
    【计算机毕业设计】python学生成绩补考通知管理系统
    【开源】SpringBoot框架开发创意工坊双创管理系统
  • 原文地址:https://blog.csdn.net/Wu_Shang001/article/details/134340445