• 【Kafka源码分析】一、生产者Producer


    一、Kafka生产者线程模型、整体流程

    Kafka生产者在发送消息时主要存在两个线程:主线程Sender线程

    • 主线程即调用KafkaProducer.send方法的线程。当send方法被调用时,消息并没有真正被发送,而是暂存到RecordAccumulator
    • Sender线程在满足一定条件后,会去RecordAccumulator中取消息并发送到Kafka Server端。

    通过这种暂存机制,可以提升Kafka的吞吐量

    1. 可以将多条消息通过一个ProduceRequest批量发送出去;
    2. 提高数据压缩效率(一般压缩算法都是数据量越大越能接近预期的压缩效果);

    Kafka生产者发送消息的整体流程如下:

    在这里插入图片描述

    1. 当接受外部传过来的数据的时候,会先创建一个main线程,在main线程中创建producer对象,然后调用send方法,将数据进行发送
    2. 消息会经过拦截器,对发送的数据进行处理、加工,再经过序列化器,对传输的数据进行序列化
    3. 根据分区器的分区策略决定传输的数据发送至哪个分区
    4. 通过消息暂存器将数据写入暂存区
      1. 消息暂存器维护了一个内存池(默认大小32M),并且未每一个分区维护了一个双端队列,队列中是一个个批次
      2. 写入数据时,会从内存池中取出内存,创建批次(默认大小16k)
    5. 当一个批次的数据大小积累到 batch.size 或者到达了延迟时间 linger.ms唤醒sender线程
    6. Sender线程从分区中拉取数据。拉取数据的方式是以brokerId为key,所有分区的请求为value放到队列中
    7. Sender线程通过selector发送数据,数据发送成功之后,会有应答机制,返回acks,应答级别有3种
      • 如果反馈回来的请求是成功,则会删除发送数据成功的请求以及清理分区中请求中拉取的数据(释放批次的内存,放回到内存池中)
      • 如果失败会进行重试,重试的次数(默认是Int的最大值,可以进行修改,一般是3-5次)
    8. 如果发送数据的第一个请求到达集群中的某一个broker没有应答,允许继续发送请求,默认每个broker节点最多缓存5个请求

    由此,我们可以引出KafkaProducer的几个核心组件:

    • partitionner:分区选择器,决定将消息发送至Topic的哪个分区
    • accumulator:消息暂存器,负责暂存要发送的消息
    • sender:用于处理实际发送消息的业务逻辑,继承自Runnable
    • ioThread:实际负责处理发送消息的io线程,即sender使用的线程对象

    接下来,我们将从主线程和Sender线程两个方面来分析KafkaProducer的消息发送过程

    二、主线程

    我们从该发送消息的方法入口进入阅读源码

    public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
    
    • 1

    1.准备工作

    在调用发送消息方法后,会先进行一些准备工作:

    1. 拦截器 ProducerInterceptors 在发送消息前进行拦截
    2. 获取集群元数据Cluster(包括目标Topic下有几个Partition,分别分布在哪些Broker上)
    3. 序列化(使用序列化器对消息的key和value做序列化处理)

    这些准备工作不是我们这里关注的重点,感兴趣的可以自己去看一下源码

    2.确认分区

    做完准备工作后,生产者接下来要开始发送消息了,首先就需要确认消息发送给topic下的哪个partition这

    这里就用到了上文提到过的的一个组件——partitioner

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        ......
        // 确定目标Partition
        int partition = partition(record, serializedKey, serializedValue, cluster);
        ......
    }
    
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        // 若ProducerRecord中强制指定了partition, 则以该值为准
        Integer partition = record.partition();
        // 否则调用Partitioner动态计算对应的partition
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在创建KafkaProducer时,可以通过"partitioner.class"配置来指定Partitioner的实现类。若未指定,则使用Kafka内置实现类——DefaultPartitioner。来看DefaultPartitioner的具体实现:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    }
    
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                             int numPartitions) {
            // 如果没指定key,会调用stickyPartitionCache缓存,发送到该topic某个特定的partition上
            if (keyBytes == null) {
                return stickyPartitionCache.partition(topic, cluster);
            }
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    StickyPartitionCache实际就是一个缓存,其内部维护了一个 ConcurrentMap,用以保存某topic应该发送到哪个分区

    public class StickyPartitionCache {
        private final ConcurrentMap<String, Integer> indexCache;
        public StickyPartitionCache() {
            this.indexCache = new ConcurrentHashMap<>();
        }
    
        public int partition(String topic, Cluster cluster) {
            Integer part = indexCache.get(topic);
            if (part == null) {
                return nextPartition(topic, cluster, -1);
            }
            return part;
        }
    
        public int nextPartition(String topic, Cluster cluster, int prevPartition) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            Integer oldPart = indexCache.get(topic);
            Integer newPart = oldPart;
    
            if (oldPart == null || oldPart == prevPartition) {
                // 从集群的元数据中获取可以使用的分区
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() < 1) {
                    Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = random % partitions.size();
                } else if (availablePartitions.size() == 1) {
                	// 只有一个分区则使用当前分区
                    newPart = availablePartitions.get(0).partition();
                } else {
                	// 保证在多线程情况下不会重复创建
                    while (newPart == null || newPart.equals(oldPart)) {
                        // 使用ThreadLocalRandom来随机获取一个可用的分区,
                        // 注意,这里的ThreadLocalRandom都是去使用主线程的seed,因此即使执行了多次也是返回同一个随机数,确保线程安全
                        Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                        newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                    }
                }
                // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
                if (oldPart == null) {
                    indexCache.putIfAbsent(topic, newPart);
                } else {
                    indexCache.replace(topic, prevPartition, newPart);
                }
                return indexCache.get(topic);
            }
            return indexCache.get(topic);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    通过以上代码,可以知道DefaultPartitioner的分区策略如下:

    1. 若指定了key,则通过key来hash到一个partition。(只要有key就可以保证消息顺序发送!!!
    2. 若未指定key,则在Topic下多个Partition间随机选取一个,并尽可能一直使用该分区;待该分区的当前batch已满或者当前批次的等待时间到了,Kafka再随机一个分区进行使用(和上一次的分区不同)

    3.将消息写入暂存区RecordAccumulator

    RecordAccumulator作为消息暂存者,其思想是将目的地Partition相同的消息放到一起并按一定的"规格"(由"batch.size"配置指定)划分成多个"批次"(ProducerBatch)然后以批次为单位进行数据压缩&发送。示意图如下:
    在这里插入图片描述
    因此每一个TopicPartition都对应一个Batch队列,RecordAccumulator中使用一个ConcurrentHashMap进行存储:ConcurrentMap batches;

    RecordAccumulator有两个核心方法,分别对应"存"和"取":

    • public RecordAppendResult append():主线程会调用此方法追加消息
    • public Map drain():Sender线程会调用此方法提取消息

    来看看append方法

    public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers,
               Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException {
            
            // appendsInProgress是一个AtomicInteger,用于记录正在进行存操作的线程数
            appendsInProgress.incrementAndGet();
            ByteBuffer buffer = null;
            if (headers == null) headers = Record.EMPTY_HEADERS;
    
            try {
                // 从batches中获取该partition对应的队列,没有则创建,ConcurrentHashMap会保证多个线程获取的是同一个队列
                Deque<ProducerBatch> dq = getOrCreateDeque(tp);
                // 以 partition 为粒度进行同步的存操作
                synchronized (dq) {
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
                    // 尝试将内容写入batch,如果写入成功则直接返回
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                    if (appendResult != null)
                        return appendResult;
                }
    
                // 如果不允许创建新的批次,则先返回,等待下一次append(默认为false,一般都是允许创建新的批次的)
                if (abortOnNewBatch) {
                    // 第二个参数表示批次是否已满,第三个参数表示是否创建了新的批次
                    return new RecordAppendResult(null, false, false, true);
                }
    
                // 为缓冲区分配合适的空间
                byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                buffer = free.allocate(size, maxTimeToBlock);
    
                // 更新当前时间
                nowMs = time.milliseconds();
                synchronized (dq) {
                    if (closed)
                        throw new KafkaException("Producer closed while send in progress");
    
                    // 开始第二次写入,通常情况下还是没有batch空间的
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                    if (appendResult != null) {
                        return appendResult;
                    }
    
                    // 在缓冲区上创建batch,在该batch上append,返回一个future
                    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                    FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                            callback, nowMs));
    
                    // 将新创建的批次放入队列中和未完成的批次集合中
                    dq.addLast(batch);
                    incomplete.add(batch);
    
                    buffer = null;
                    // 将该future存入RecordAppendResult中返回
                    return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
                }
            } finally {
                if (buffer != null)
                    free.deallocate(buffer);
                appendsInProgress.decrementAndGet();
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    来看 tryAppend 方法,就是先从队列中获取最后一个批次ProducerBatch,只要批次存在,就在这个批次中写入数据,返回一个future,主题逻辑跟上面一样,都是通过ProducerBatch进行写数据

         private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                             Callback callback, Deque<ProducerBatch> deque, long nowMs) {
            ProducerBatch last = deque.peekLast();
            if (last != null) {
                FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
                if (future == null)
                    last.closeForRecordAppends();
                else
                    return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
            }
            return null;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    那么具体是如何在这个批次上进行数据写入的呢,我们来看 ProducerBatch 的 tryAppend 方法

    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
            // 首先检查该recordsBuilder是否有充足的空间写入
            if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
                return null;
            } else {
                // 在recordsBuilder上写入数据(包含数据压缩),并返回写入数据的CRC
                Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
    
                // 更新最大记录大小和更新时间
                this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                        recordsBuilder.compressionType(), key, value, headers));
                this.lastAppendTime = now;
                
                // 返回这条写入记录的Future,其中produceFuture为这个ProducerBatch公用的Future,
                // recordCount为这条记录的偏移量,checksum为这条记录的CRC
                // 此外,还包括了这条记录key和value的大小,通过这些数据可以唯一确认和校验一条记录!!!!
                FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                       timestamp, checksum,
                                                                       key == null ? -1 : key.length,
                                                                       value == null ? -1 : value.length,
                                                                       Time.SYSTEM);
                
                thunks.add(new Thunk(callback, future));
                // 更新偏移量
                this.recordCount++;
                return future;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    可以看到 ProducerBatch又通过recordsBuilder进行数据写入,这里就不再继续往下看了,内部实际就是对要发送的消息进行了数据压缩,并通过计算得到这批数据的CRC校验值。

    写完后返回了这条数据的RecordMetadata,其中包含了这条数据的偏移量,校验值,内容长度,通过这些数据可以唯一确认和校验一条记录,此外,还包含了各个批次一个公有的produceFuture,这个Future是什么稍后再看

    append完成后,我们再回到 KafkaProducer的 doSend()方法

    // 如果在内部不允许创建新的批次,则在外部重新创建partition再重新写入(默认不会这样)
    if (result.abortForNewBatch) {
                    int prevPartition = partition;
                    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                    partition = partition(record, serializedKey, serializedValue, cluster);
                    tp = new TopicPartition(record.topic(), partition);
                    if (log.isTraceEnabled()) {
                        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                    }
                    // producer callback will make sure to call both 'callback' and interceptor callback
                    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    
                    result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
                }
    
                // 处理事务
                if (transactionManager != null && transactionManager.isTransactional())
                    transactionManager.maybeAddPartitionToTransaction(tp);
    
                // 如果发现批次以及满了或者创建了新的批次,则唤醒sender线程!!!!
                if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
    
                // 返回该batch的future
                return result.future;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    可以看到,如果我们这一批写满了或者新的批次被创建出来了(实际就是上一批写满了),就会去唤醒Sender线程,告诉它我们暂存的数据够多了,该把这些数据发送给Server了!

    最后,发送完毕后,返回的是我们上面方法创建的FutureRecordMetadata

    那么这个返回的future到底是什么,它实际就是我们使用消费者线程发送消息后返回的FutureRecordMetadata包含了消息发送的分区,分配的offset和消息的时间戳

    public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
    
    • 1

    来看看他内部包含的信息是如何写入的:
    首先,在使用 ProducerBatch 写信息前,会在构建函数中将该Batch对应的topicPartition放入Future中

    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
        ...
        this.produceFuture = new ProduceRequestResult(topicPartition);
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    之后,在消息发送完毕后,也会将该消息对应的偏移量、logAppendTime, exception写入到该Future中。

    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
            // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
            produceFuture.set(baseOffset, logAppendTime, exception);
    }
    
    • 1
    • 2
    • 3
    • 4

    由此,就可以通过该Future获取我们所需要的信息

    三、Sender线程

    以上主线程执行完毕,消息被写入了topicpartition的一个又一个batch中。Sender线程就开始负责发送数据了

    Sender线程需要用到一个基础通信类NetworkClient,NetworkClient中有一个核心属性 Selectable selector,进行网络通讯时的 send和 poll 操作都是通过 selector实现的,org.apache.kafka.common.network.Selector 内部则通过 java.nio.channels.Selector 来实现,因此 Kafka内部本质是通过NIO进行网络通讯的

    在KafkaProducer中,和Sender线程相关的有两个属性:在这里插入图片描述
    他们在 KafkaProducer 构造函数中被创建:

    this.sender = newSender(logContext, kafkaClient, this.metadata);
    String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    this.ioThread.start();
    ...
            return new Sender(logContext,
                    client,
                    metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
                    metricsRegistry.senderMetrics,
                    time,
                    requestTimeoutMs,
                    producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    来看Sender的run方法,实际就是通过sendProducerData方法发送消息

    public void run() {
        log.debug("Starting Kafka producer I/O thread.");
    
        while (running) {
            try {
                    runOnce();
                } catch (Exception e) {
                    log.error("Uncaught error in kafka producer I/O thread: ", e);
                }
            }
        .....
    }
    
    void runOnce() {
        ... ...
        // 1. 发送请求,并确定下一步的阻塞超时时间
        long pollTimeout = sendProducerData(now);
        // 2. 处理端口事件,poll的timeout为上一步计算结果
        client.poll(pollTimeout, now);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    来看sendProducerData方法:

    ........
            // create produce requests
            Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
            addToInflightBatches(batches);
            if (guaranteeMessageOrder) {
                // Mute all the partitions drained
                for (List<ProducerBatch> batchList : batches.values()) {
                    for (ProducerBatch batch : batchList)
                        this.accumulator.mutePartition(batch.topicPartition);
                }
            }
    
    ........       
            sendProduceRequests(batches, now);
            return pollTimeout;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    可以看到这里通过 accumulator.drain 将刚刚写入到batch中的信息重新取出

    取出后 sendProduceRequests 会将batch中的信息构建成 ClientRequest 通过 KafkaClient 的 send方法将消息发送至指定的partition,在发送完毕后会有一个 callback 来进行异步的处理

    由此一来,Kafka整个消息的发送逻辑就完成了

    参考文献:https://zhuanlan.zhihu.com/p/371361083
    https://blog.csdn.net/wanger61?spm=1000.2115.3001.5343

  • 相关阅读:
    Linux--系统烧写
    项目文章| PBJ(IF:13.8)发表稻曲病菌效应因子Uv1809增强组蛋白去乙酰化抑制水稻免疫的分子机制
    爽。。。一键导出 MySQL 表结构,告别手动梳理表结构文档了。。。
    生产消费者模型的介绍以及其的模拟实现
    Flutter 3.3 正式发布
    酷早报:6月30日Web3元宇宙业界重点消息大汇总
    Spring -Spring之依赖注入源码解析(下)
    IPv6改造方案:协议转换技术
    使用“讯飞星火”快速生成高质量PPT文档
    通过CMD快速安装VNC服务器,无ico
  • 原文地址:https://blog.csdn.net/wanger61/article/details/126037678