• Kafka元数据拉取流程


    元数据采用的数据结构

    public final class Cluster {
    
        private final boolean isBootstrapConfigured;
        // 一个Node就代表一个Broker
        private final List<Node> nodes;
        // 尚未被授权访问的Kafka列表,Kafka是支持权限访问的
        private final Set<String> unauthorizedTopics;
        // 映射关系为:“某个Topic下的某个Partition:Partition的详细信息”
        // TopicPartition指的是Topic1中的Partition1,PartitionInfo为具体某个Partition的详细信息
        private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
        // 映射关系为:“某个Topic:这个Topic下的Partition列表”
        private final Map<String, List<PartitionInfo>> partitionsByTopic;
        // 映射关系为:“某个Topic:这个Topic下的可用的Partition列表”
        private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
        // 映射关系为:“某个Broker ID:这个Broker上的所有Partition”
        // 某个Broker上有哪些Partition(可能来自不同的Topic)
        private final Map<Integer, List<PartitionInfo>> partitionsByNode;
        // broker.id映射到Node的数据结构,映射关系为:“broker.id:Node”
        private final Map<Integer, Node> nodesById;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    KafkaProducer初始化时的拉取流程

    在KafkaProducer初始化时,会构造出集群元数据组件Metadata,且在初始化方法里有一次Metadata#update()方法调用。

    // 构造核心组件:Metadata;用于去Broker集群拉取元数据(有哪些Topic,对应哪些Partition,其中哪个是leader、哪个是follower)
    // 想往Broker发送一条ProducerRecord,就必须知道目标Topic,有哪几个Partition,其中Partition Leader在哪个Broker上
    // 在KafkaProducer初始化时拉取一次元数据;后面每隔一段时间(metadata.max.age.ms,默认:5min)会刷新元数据;发送消息时如果元数不在本地,还得通过Metadata发送请求
    this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
    
    // 省略部分代码......
    
    // 会把我们配置的Kafka Broker地址作为参数传入
    this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在方法调用中,会传递2个参数,分别是Cluster实例对象和当前时间戳。对于Cluster实例的创建,是利用我们配置的Broker地址,将其包装成Node,并add到List< Node >中。最后利用List< Node >构造出Cluster实例对象。

    public static Cluster bootstrap(List<InetSocketAddress> addresses) {
        List<Node> nodes = new ArrayList<>();
        int nodeId = -1;
        // 遍历传进来的Kafka Broker地址
        for (InetSocketAddress address : addresses)
            // 将Broker地址包装成Node后,添加到List集合中
            nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
        // 利用List包装Cluster实例并返回
        return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    之后再执行update()方法时,会将构造好的Cluster实例和当前时间戳传入。

    /**
     * KafkaProducer初始化时,只是将将配置的Broker地址包装成Node后,添加到List集合中,利用List集合创建Cluster实例。
     * 传进来的参数就是Cluster实例。
     * 在KafkaProducer初始化时,并没有真正的去某个Broker上拉取元数据,只是将配置的Broker地址转换成了Node,
     * 以List的形式存到了Cluster实例中
     *
     * 后面拉取元数据成功后处理响应时再调用该方法,就是更新Cluster了!
     */
    public synchronized void update(Cluster cluster, long now) {
        // 将“是否需要update元数据”的标记设为false,即不需要update
        this.needUpdate = false;
        // 将“最近的刷新时间”和“成功刷新时间”都设为now
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        // 每次成功update元数据后,就会对version加1
        this.version += 1;
    
        // 拉取元数据使用的监听器
        for (Listener listener: listeners)
            listener.onMetadataUpdate(cluster);
    
        // Do this after notifying listeners as subscribed topics' list can be changed by listeners
        // needMetadataForAllTopics(默认false)表示:将所有Topic的元数据都刷新一次
        // 于是将刚刚包装好的Cluster实例赋值给this.cluster
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
    
        // 由于本方法由synchronized修饰,是线程安全的,所以Thread-1抢到了锁,执行该方法,Thread-2就得wait进入休眠状态
        // 此时调用notifyAll()方法就会唤醒(处于休眠状态的)Thread-2,Thread-2就又能争抢锁了
        notifyAll();
        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    可以看出,用来标记元数据是否需要拉取的标志位默认为false,还把管理元数据的版本号自增,此时很明显就不会去拉取元数据。
    由于update()方法由synchronized修饰,所以在多线程并发执行时,同一时刻只会有一个线程抢占到锁(其他线程进入休眠等待状态),进而执行“更新元数据”操作。等本方法执行完毕后,就会通过notifyAll()唤醒其他处于休眠状态的线程。

    消息发送时如何拉取元数据

    在这里插入图片描述

    在调用Kafka API的doSend()方法生产消息时,会(按需、以同步阻塞的方式)拉取元数据

    // 以同步阻塞等待的方式(传参:同步阻塞的最大时间),去连接Broker拉取元数据:如果想往Topic发送消息,必须知道元数据,这样才能通过Partitioner选择一个Partition,
    // 然后才能跟这个Partition对应的Leader建立连接、发送消息。其中调用本方法最多能够阻塞等待时间是:max.block.ms
    // 返回的是“为了拉取元数据,总共花费的时间” = 元数据拉取的时间 + 一些边边角角
    long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
    
    • 1
    • 2
    • 3
    • 4

    得到的就是元数据拉取流程所花费的时间

    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        // Metadata组件中已经缓存、加载过元数据的Topic,会放到Set集合中。
        // 第一次发送消息到某个Topic,Set集合没有这个Topic,那就准备拉取
        if (!this.metadata.containsTopic(topic))
            this.metadata.add(topic);
    
        // metadata.fetch()得到的就是Cluster实例,这里是判断Cluster中的Map>是否有这个Topic
        // 说明这个Topic的元数据信息在Cluster Map中能查到(已经被缓存了),无需等待拉取
        if (metadata.fetch().partitionsForTopic(topic) != null)
            // 元数据拉取过程中的阻塞等待时间 = 0
            return 0;
    
        long begin = time.milliseconds();
        // 最多能阻塞等待的时间,默认:60s
        long remainingWaitMs = maxWaitMs;
        // 只要Cluster实例中的Map>集合中没有这个Topic,就得触发“元数据拉取操作”
        while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            // step 1:将Sender线程拉取元数据的标志位,设为true
            int version = metadata.requestUpdate();
            // step 2:唤醒Sender线程,底层就是唤醒NetworkClient(让它不要阻塞等待了),准备异步拉取元数据
            sender.wakeup();
            // step 3:Metadata准备以同步阻塞的方式,等待元数据的拉取结果
            metadata.awaitUpdate(version, remainingWaitMs);
            // 整个“元数据异步拉取而同步等待”所花费的时间 = 当前时间戳 - 元数据拉取前夕的时间戳
            long elapsed = time.milliseconds() - begin;
            // 如果等待元数据拉取所花费的时间大于默认的60s,抛出超时异常
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            // 某个Topic尚未被授权访问,抛异常
            if (metadata.fetch().unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            // 剩余等待时间 = 默认的60s - 同步阻塞所花费的时间
            remainingWaitMs = maxWaitMs - elapsed;
        }
        // 以同步阻塞的方式等待元数据拉取成功所花费的时间 = 元数据拉取所花费的时间 + 一些边边角角
        return time.milliseconds() - begin;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    首先要判断目标Topic的元数据是否已经缓存,如果没有,那就准备拉取。

    1.更新拉取标志位

    首先将Metadata组件中是否需要拉取元数据的标志位,设为true,表示现在需要拉取元数据。

    public synchronized int requestUpdate() {
        // 是否需要拉取元数据的标志位,设为true
        this.needUpdate = true;
        // 返回“拉取元数据”过程中用到的版本号
        return this.version;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.唤醒Sender线程,异步拉取

    元数据拉取工作是由Sender负责完成的,底层就是唤醒NetworkClient,让它不要阻塞等待了,准备异步拉取元数据

    当Sender线程运行时,会触发执行它的run()方法。

    void run(long now) {
    
        // 如果某些做好发送准备的Partition的元数据都还没拉取到(不知道Leader是谁),就标识一下
        if (result.unknownLeadersExist)
            // 将“需要拉取元数据的标志位”更新为:true
            this.metadata.requestUpdate();
    
        
        // 省略部分代码...
    
        // 万能poll()方法
        this.client.poll(pollTimeout, now);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    其中如果哪个Partition的Leader还不知道是谁,就强制刷新一次元数据。最后调用万能poll方法拉取元数据

    public List<ClientResponse> poll(long timeout, long now) {
        // MetadataUpdater组件是专门用来更新元数据的,调用MetadataUpdater#maybeUpdate()拉取元数据
        // 内部会构建专门用于向Broker发送请求的MetadataRequest
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        
        // 省略部分代码...
        
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        // 发送出去的MetadataRequest,收到了响应,现在处理这些响应
        handleCompletedReceives(responses, updatedNow);
       
       // 省略部分代码...
    
        return responses;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    底层通过MetadataUpdater组件完成拉取动作,本质就是创建拉取元数据的请求–MetadataRequest,将其封装成ClientRequest,最后由Selector将其发送出去

    private void maybeUpdate(long now, Node node) {
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            this.lastNoNodeAvailableMs = now;
            return;
        }
        String nodeConnectionId = node.idString();
    
        if (canSendRequest(nodeConnectionId)) {
            this.metadataFetchInProgress = true;
            // 首先创建好拉取元数据要发送的请求:MetadataRequest
            MetadataRequest metadataRequest;
            if (metadata.needMetadataForAllTopics())
                metadataRequest = MetadataRequest.allTopics();
            else
                metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
            // 将拉取元数据的请求,封装成ClientRequest
            ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
            // 核心:真正发送请求调用的是request()方法
            doSend(clientRequest, now);
        } else if (connectionStates.canConnect(nodeConnectionId, now)) {
            log.debug("Initialize connection to node {} for sending metadata request", node.id());
            initiateConnect(node, now);
    
        } else { 
            this.lastNoNodeAvailableMs = now;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    然后将这个请求发送出去,走的是基于Java NIO封装的KafkaChannel将其发送到

    /**
     * 真正发送请求的方法
     */
    private void doSend(ClientRequest request, long now) {
        request.setSendTimeMs(now);
        this.inFlightRequests.add(request);
        // 通过Selectable组件发起请求,该组件是Kafka中专用于网络I/O操作的
        selector.send(request.request());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    万能poll()方法将“元数据拉取”的这个ClientRequest 发送出去后,总归是能接收到响应的。于是,调用handleCompletedReceives()方法处理响应

    /**
     *  处理Broker对MetadataRequest的响应
     */
    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            ClientRequest req = inFlightRequests.completeNext(source);
            Struct body = parseResponse(receive.payload(), req.request().header());
            // 如果这个请求是一个metadata request,那就立即处理,并返回true
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                responses.add(new ClientResponse(req, now, false, body));
        }
    }
    
    @Override
    public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
        short apiKey = req.request().header().apiKey();
        if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
            handleResponse(req.request().header(), body, now);
            return true;
        }
        return false;
    }
    
    /**
     * 处理响应
    */
    private void handleResponse(RequestHeader header, Struct body, long now) {
        this.metadataFetchInProgress = false;
        MetadataResponse response = new MetadataResponse(body);
        // 从MetadataResponse中取出最新拉取到的元数据
        Cluster cluster = response.cluster();
        // check if any topics metadata failed to get updated
        Map<String, Errors> errors = response.errors();
        if (!errors.isEmpty())
            log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
    
        // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
        // created which means we will get errors and no nodes until it exists
        if (cluster.nodes().size() > 0) {
            // 拉取元数据的请求,最终会得到响应。
            // 现在就是要将响应的Cluster交给Metadata更新,内部会调用notifyAll方法唤醒当初阻塞等待拉取结果的主线程
            this.metadata.update(cluster, now);
        } else {
            log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
            this.metadata.failedUpdate(now);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    从获取到的响应MetadataResponse 中,取出最新拉取到的元数据Cluster,将其更新到Metadata组件中去,最后notifyAll()唤醒当初因为wait(60s)而阻塞的线程

    /**
     * 向Broker发请求拉取元数据,得到响应之后,会从响应中取出最新拉取到的Cluster,将其赋值给this.cluster。
     * 然后notifyAll()唤醒当初因为wait(60s)而阻塞的线程
     */
    public synchronized void update(Cluster cluster, long now) {
        // 将“是否需要update元数据”的标记设为false,即现在不需要update
        this.needUpdate = false;
        // 将“最近的刷新时间”和“成功刷新时间”都设为now
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        // 每次成功update元数据后,就会对version加1
        this.version += 1;
    
        // 拉取元数据使用的监听器
        for (Listener listener: listeners)
            listener.onMetadataUpdate(cluster);
    
        // Do this after notifying listeners as subscribed topics' list can be changed by listeners
        // needMetadataForAllTopics(默认false)表示:将所有Topic的元数据都刷新一次
        // 于是将刚刚包装好的Cluster实例赋值给this.cluster
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
    
        // 由于本方法由synchronized修饰,是线程安全的,所以Thread-1抢到了锁,执行该方法,Thread-2就得wait进入休眠状态
        // 此时调用notifyAll()方法就会唤醒(处于休眠状态的)Thread-2,Thread-2就又能争抢锁了
        notifyAll();
        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3.同步阻塞,等待拉取结果

    通过wait()方法让所在线程以同步阻塞的方式,等待元数据拉取结果。while循环的判定条件是以元数据版本号version为准,只要元数据拉取成功,必然会更新version,此时也就能跳出while循环了。

    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs < 0) {
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
        }
        long begin = System.currentTimeMillis();
        // 最多能够阻塞等待的时间
        long remainingWaitMs = maxWaitMs;
        // while循环等待元数据拉取结果,啥时候拉取成功了,version就会自增+1,就能跳出while循环
        while (this.version <= lastVersion) {
            // 最多能够阻塞等待的时间也正常
            if (remainingWaitMs != 0)
                // wait释放锁,让业务逻辑所在的线程阻塞等待最长60s
                wait(remainingWaitMs);
            // 已经因为阻塞等待而耗费的时间
            long elapsed = System.currentTimeMillis() - begin;
            // 如果等待元数据拉取结果的的时间超过了默认的60s,就抛出异常
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            // 否则,表示元数据拉取过程并未超时,计算出剩余还需要阻塞等待的时间 = 默认的60s - 已经花费的时间
            remainingWaitMs = maxWaitMs - elapsed;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    如果在默认的阻塞等待时间内,成功拉取到了集群元数据,那就计算出整个拉取流程的耗费时间并return。

    一旦拉取超时,那就得抛出TimeoutException。异常抛出会中断上述while循环,异常信息会传递到waitOnMetadata()方法,于是waitOnMetadata()方法就会抛出InterruptedException。

    最外层的doSend方法捕获到InterruptedException异常后,会专门对其进行处理:

    catch (InterruptedException e) {
        // 如果拉取元数据的过程超过了60s,就会将TimeoutException抛出来,在这里catch住,并通过onSendError回调交给开发者
        this.errors.record();
        if (this.interceptors != null)
            this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总结

    在KafkaProducer初始化时,并不会拉取集群元数据,仅仅是将Broker包装成了Node,并add到了List中用来构建Cluster实例。

    发送消息时加载元数据,之所以采用“同步阻塞等待 + 异步拉取”的方式,是因为既不想无脑的同步阻塞在那,也不想无限制的等待异步结果。如果60s内拉取成功,wait的线程就会唤醒,正常走以后的逻辑;如果60s内没拉取到,那就主动抛异常让最外层捕获、视情况处理…

  • 相关阅读:
    谷歌爬虫插件webscraper使用详细实操
    利用cmd(命令提示符)安装mysql&&配置环境
    暑假加餐|有钱人和你想的不一样(第14天)+基于改进量子粒子群算法的电力系统经济调度(Matlab代码实现)
    软件设计模式系列之二十——备忘录模式
    Java多线程:waitnotify原理剖析
    virtualBOX网络配置
    【使用教程】在Ubuntu下PMM60系列一体化伺服电机通过SDO跑循环同步位置模式详解
    Node.js最准确历史版本下载
    红黑树(4万字文章超详细,只为一个目的)
    第九章 动态规划 part16(编辑距离专题)583. 两个字符串的删除操作 72. 编辑距离 编辑距离总结篇
  • 原文地址:https://blog.csdn.net/qq_36299025/article/details/128085717