• Kafka ProducerRecord如何写入到RecordAccumulator



    Kafka ProducerRecord通过Partitioner组件得到分区号之后,就要将消息写入到RecordAccumulator中

    // 将消息追加到内存缓冲中去
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
    
    
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
    
        // 因为KafkaProducer是线程安全的,有可能在同一时刻会有多个线程调用该方法将消息写入Kafka
        // 这一步就是通过AtomicInteger,看看当前有多少个线程正常尝试将消息写入内存缓冲区
        appendsInProgress.incrementAndGet();
        try {
            // 根据TopicPartition从ConcurrentHashMap中取出对应的Deque队列,没有就new(第一次肯定new)
            // 有可能是多个线程并发调用该方法!!!
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            // 对刚刚得到的Deque加锁
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                // 第1次尝试:尝试将消息放到Deque中
                // 第一次进来会失败,因为虽然有了Deque,但是Deque队列中并没有RecordBatch
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                // appendResult不为null,说明已经将消息放到Deque中了,该方法到此结束
                if (appendResult != null)
                    // 返回消息追加操作的结果
                    return appendResult;
            }
            // 能走到这,说明是第一次进入,虽然有了Deque但是队列中还没有对应的Batch,
            // 所以将消息放到Deque的操作才没有结果。因此,下面才要申请ByteBuffer用来创建RecordBatch
    
            // 确定即将要创建的RecordBatch size:默认的batchSize 16k和消息的size之间取MAX
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            // 让BufferPool给Batch分配一块内存空间,内存空间的来源来自于2部分:availableMemory和Deque
            // Deque内缓存了一块一块的、可复用的、固定大小的ByteBuffer,对于不足16k的消息可以直接复用
            // availableMemory是自由发挥的内存,可以针对“超大”消息定制ByteBuffer。如果availableMemory不够用,还可以“拆东墙补西墙”
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            // 再次锁住Deque(这回Batch不缺了,再次尝试写入)
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
    
                // 第2次尝试:这回有了Batch,再次尝试将消息放入Deque中
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                // 如果第2次尝试写入的操作成功,就完成了
                // 这里利用了double-check思想
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    // 因为并发,会申请到多余的ByteBuffer。只有一块ByteBuffer用来构建RecordBatch并add到Deque
                    // 其余多申请的ByteBuffer,会根据ByteBuffer大小,决定将其交给Deque or availableMemory
                    free.deallocate(buffer);
                    // 返回消息追加操作的结果
                    return appendResult;
                }
                // 如果是第一次写入消息,上面经历了double-check后,创建了空的Deque和ByteBuffer,
                // 现在要利用申请到的ByteBuffer,构建出RecordBatch,并将其添加到空的Deque
                // 先将“停留在内存中的消息”包装成MemoryRecords
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                // 搞一个新的RecordBatch出来
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                // 将包装好的MemoryRecords放到RecordBatch中
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
    
                // 消息已经放到Batch中了,就把这个Batch添加到Deque
                dq.addLast(batch);
                // 将Batch也添加到IncompleteRecordBatches中,IncompleteRecordBatches表示当前还没有将Batch发送出去的列表
                incomplete.add(batch);
                // 返回消息追加操作的结果
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            // 将当前正在执行append操作的线程数量,递减
            appendsInProgress.decrementAndGet();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    1.从CopyOnWriteMap中获取Deque队列

    多线程并发的调用以下方法从CopyOnWriteMap中获取Deque,如果get不到就new新的Deque出来,这里可能会并发的new出好几个Deque出来。

    /**
     * 根据TopicPartition从ConcurrentHashMap中取出对应的Deque队列
     * ConcurrentMap是基于CopyOnWriteMap实现的,适合读多写少的场景。update时会先copy出来一个副本,更新副本。
     * 好处:读写之间不会长时间的锁互斥,写的时候不会阻塞读。  坏处:copy副本会占用大量内存空间
     * Deque正是利用了读多写少的特点,因为一个Partition对应一个Deque,写操作本就是很少的。
     * 主要操作(大量的)还是从ConcurrentMap中将Deque读出来,后面频繁更新的就只是Deque了,跟ConcurrentMap没关系了
     */
    private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
        // 最核心的数据结构:ConcurrentMap>
        // 一个TopicPartition对应一个Deque,Deque中放的就是RecordBatch
        // 这里就是直接去volatile修饰的map中将Deque取出来
        Deque<RecordBatch> d = this.batches.get(tp);
        // 如果能拿到Deque,就直接返回
        if (d != null)
            return d;
        // 多个线程并发的创建出多个ArrayDeque,但是CopyOnWriteMap#putIfAbsent()方法是线程安全的,且没有key的时候才会put
        // 因此虽然这里new了很多Deque但是最终put到CopyOnWriteMap中的只有1个
        d = new ArrayDeque<>();
        // CopyOnWriteMap#putIfAbsent()方法被synchronized修饰,是线程安全的
        Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    各个Thread各自new了Deque后,就会将其put到CopyOnWriteMap中。由于CopyOnWriteMap的大部分方法都是由关键字synchronized修饰的线程安全的,因此同一时间只会有一个Thread争抢到锁,从而执行put操作。巧的是在put时还会判断是否containsKey,因此虽然有多个(因为并发而new出来的)Deque等着put到CopyOnWriteMap中,但是最终只会有1个Deque被put到CopyOnWriteMap中。

    // 用关键字volatile修饰的原因:如果有人put了新元素,put完毕后,立马就能被读到
    private volatile Map<K, V> map;
    
    
    @Override
    public V get(Object k) {
        // 直接从volatile修饰的map中读,如果有人put了新元素进来,volatile就能立马感知到
        return map.get(k);
    }
    
    
    /**
     * 上一步在ConcurrentMap中没找到Deque,于是new了一个新的ArrayDeque
     * 让TopicPartition作为Key,让新ArrayDeque作为Value,put到ConcurrentMap中
     * 注意:该方法被synchronized修饰,是线程安全的!!!
     */
    @Override
    public synchronized V putIfAbsent(K k, V v) {
        // 由于本方法是线程安全的,同一时间只会有一个线程争抢到锁、执行本方法。
        // 如果ConcurrentMap中没有才会put,而put也是被synchronized修饰的线程安全的方法。
        // 如果Thread-1先手抢到锁执行了put操作,随后Thread-1释放锁、Thread-2抢到锁,
        // 经过判断ConcurrentMap中已经有了对应的键值对,直接get就行
        if (!containsKey(k))
            return put(k, v);
        else
            return get(k);
    }
    
    
    @Override
    public synchronized V put(K k, V v) {
        // 将内部的(用volatile修饰的)map,copy出来一份副本
        Map<K, V> copy = new HashMap<K, V>(this.map);
        // 基于副本执行put操作
        V prev = copy.put(k, v);
        // 将副本内的数据,写回到(volatile修饰的)map中,保证写完之后立马就能读
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    put操作是基于copy出来的副本进行的,这充分体现了COW思想。因为COW适合读多写少的场景,创建Deque并将其put到CopyOnWriteMap中就是写,获取Deque就是读。写的时候先copy一个副本,更新到副本中。好处:读写之间不会相互阻塞;坏处:内存占用大;

    最终,copy副本会被写入到由关键字volatile修饰的map变量中。volatile可以保证内存可见性,一旦有人基于copy副本机制更新了这个引用变量对应的实际的map对象的地址,它立马就会被别人看到。所以在get()的时候完全不需要加锁。即使在同一时刻多线程并发读,也没有锁的阻塞。

    2.尝试写入消息

    由于消息是第一次写入,因此到目前为止,刚刚创建的Deque< Record>队列还是空的。因此锁住Deque后的tryAppend操作一定会失败,也就是返回null

    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
        // Deque中最后的RecordBatch,之所以取最后的RecordBatch是因为前面的RecordBatch都装满了,
        // 而最后的这个是新add的,还是新的
        RecordBatch last = deque.peekLast();
        if (last != null) {
            // 直接调用RecordBatch#tryAppend()方法,将消息写入到这个RecordBatch对应的ByteBuffer中
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
            // 如果某个RecordBatch的剩余可用内存空间,不足以将手里的这条消息写入,这个future一定为null
            if (future == null)
                // 将MemoryRecords关闭掉,这辆大巴车就得关门了
                last.records.close();
            else
                // 如果写入成功,返回结果信息
                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
        }
        // 第一次写入消息,只有Deque,RecordBatch肯定没有。因此写入失败,返回null
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    外层append()方法在收到RecordAppendResult为null后,就得开始着手准备申请ByteBuffer以便能创建RecordBatch了。

    // 确定即将要创建的RecordBatch size:默认的batchSize 16k和消息的size之间取MAX
    int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
    log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
    // 让BufferPool给Batch分配一块内存空间,内存空间的来源来自于2部分:availableMemory和Deque
    // Deque内缓存了一块一块的、可复用的、固定大小的ByteBuffer,对于不足16k的消息可以直接复用
    // availableMemory是自由发挥的内存,可以针对“超大”消息定制ByteBuffer。如果availableMemory不够用,还可以“拆东墙补西墙”
    ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    先计算出需要用到的RecordBatch的size,然后再去申请ByteBuffer。

    2.1基于NIO ByteBuffer分配内存

    BufferPool中负责管理ByteBuffer内存分配工作的,是由两部分组成:Deque< ByteBuffer >和availableMemory 。

    /**
     * 为即将要被创建出来的RecordBatch分配内存空间
     */
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");
    
        // 使用ReentrantLock锁住以下代码块,选它是因为它更灵活的加锁、释放锁
        this.lock.lock();
        try {
            // 如果我需要的内存大小正好等于BufferPool内“中规中矩”的RecordBatch的大小(poolableSize也就是默认的batch size,16k),并且BufferPool内的Deque不为空
            // 这一步是为了复用BufferPool内的内存块,第一次进来肯定全是空的,没有可复用的ByteBuffer
            if (size == poolableSize && !this.free.isEmpty())
                // 将Deque队列中的first取出来复用即可
                return this.free.pollFirst();
    
            // 能往下走,说明Deque提供的的可复用的内存满足不了需求,因此必须找availableMemory申请
    
            // 计算出Deque的大小 = Deque内ByteBuffer的个数 * “中规中矩”的16k
            int freeListSize = this.free.size() * this.poolableSize;
            // 如果创建这个RecordBatch所需的内存 < “自由创建”的内存 + “中规中矩”复用的内存
            if (this.availableMemory + freeListSize >= size) {
                // “拆东墙补西墙”,Deque就是东墙,availableMemory就是西墙
                // 这里是说availableMemory不够用,就从Deque中一块一块的拆,直到availableMemory满足使用需求
                freeUp(size);
                // 从“剩余可用的内存”中扣除本次创建RecordBatch要用到的内存大小
                this.availableMemory -= size;
                // 解除锁
                lock.unlock();
                // 本次内存申请已成功,内存来源于availableMemory
                return ByteBuffer.allocate(size);
            } else {
                // 可用内存严重不足,不能再允许申请ByteBuffer了(需要16k,但此时就剩15.9k可用了)
                // 其他RecordBatch因为发送成功而空闲出来的ByteBuffer的大小
                int accumulated = 0;
                ByteBuffer buffer = null;
                // 获取到正在使用的ReentrantLock的Condition
                Condition moreMemory = this.lock.newCondition();
                // 剩余(因等待可用内存而)阻塞的时间
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                // 将这个Condition添加到Deque中,排队等待“可用内存恢复可用容量”
                this.waiters.addLast(moreMemory);
                // 等待RecordBatch因为发送成功而空闲出来的ByteBuffer恢复可用内存空间
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        // 调用Condition.await()方法,让当前线程进入休眠等待状态
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        // 出现异常,就将本次等待的Condition从等待队列中移除
                        this.waiters.remove(moreMemory);
                        throw e;
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        this.waitTime.record(timeNs, time.milliseconds());
                    }
    
                    // 如果某个Condition在“有限的规定时间”内没等来可用内存,那就只能放弃了
                    if (waitingTimeElapsed) {
                        // 将这个Condition从Deque等待队列中移除
                        this.waiters.remove(moreMemory);
                        // 抛出超时异常
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }
    
                    // 刷新“剩余阻塞等待时间”
                    remainingTimeToBlockNs -= timeNs;
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // 把Deque中的第一个ByteBuffer弹出来
                        buffer = this.free.pollFirst();
                        // 可用内存容量“恢复”了size大小
                        accumulated = size;
                    } else {
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }
    
                // 一旦RecordBatch因为发送成功而空闲出来的ByteBuffer,大于要申请的内存大小,
                // 比如等了一会ByteBuffer空闲了32k出来,此时我只需要申请16k,就会跳出while循环
    
                // 将Deque中的第一个Condition移除掉,唤醒第一个等待的人,开始为他分配ByteBuffer
                Condition removed = this.waiters.removeFirst();
                if (removed != moreMemory)
                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");
    
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        // 唤醒因调用Condition.await()方法而进入休眠等待状态的线程
                        this.waiters.peekFirst().signal();
                }
    
                // 释放锁
                lock.unlock();
                // 直接返回ByteBuffer
                if (buffer == null)
                    // 分配ByteBuffer
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113

    **Deque< ByteBuffer >可以理解为“中规中矩”,它缓存了一堆大小固定(等于batch size的默认 16k)、可以复用的ByteBuffer **。如果接下来需要创建的这个RecordBatch要求的内存大小不超过默认的16k,那么完全可以复用Deque< ByteBuffer >中的First ByteBuffer。

    availableMemory 可以理解为“自由发挥”,当Deque< ByteBuffer >中没有可复用的ByteBuffer,就得需要让availableMemory 来“自由、灵活的”分配内存。availableMemory的初始值在RecordAccumulator构造时就已经指定,默认大小为:32M

    当然,第一次进来,这个Deque< ByteBuffer >是空的。那就得找availableMemory(初始值为默认的32M)要内存了!

    availableMemory负责分配内存时,也得结合Deque。因为availableMemory毕竟空间有限,万一不够用呢?所以,但凡不是简单的内存复用就能解决的,都得两者结合。

    一旦availableMemory自己手里的内存也满足不了使用需求,那就得找Deque帮忙,“拆东墙补西墙”,(while循环中)将Deque中复用的ByteBuffer,从后往前一块一块的补充给availableMemory,直到availableMemory的内存空间恢复到满足使用需求为止!

    /**
     * 将Deque中的内存块,从后往前一块一块的补充给availableMemory,
     * 这样一来这条“特殊大小”的消息就能利用availableMemory创建RecordBatch了。
     */
    private void freeUp(int size) {
        // 如果“剩余可以用来自由发挥”的内存不足以创建这个RecordBatch,
        // 同时"缓存好用来复用的中规中矩"的Deque还有足够的内存空间(n * 16k),
        // 那就从Deque中,从最后一个ByteBuffer开始,“拆东墙补西墙”的方式一块一块的将ByteBuffer补充给“剩余可以用来自由发挥”的内存
        while (!this.free.isEmpty() && this.availableMemory < size)
            this.availableMemory += this.free.pollLast().capacity();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    当然了,ByteBuffer的分配方法不是线程安全的,多线程并发执行时,会“额外”的申请多余而不用的ByteBuffer。例如3个Thread同一时间各自分别申请了一个16k大小的ByteBuffer。没关系,在外层消息写入经历double-check的tryAppend时,会通过关键字synchronized锁住Deque。同一时间只会有一个Thread,允许它将自己申请到的ByteBuffer用于构建RecordBatch并将其添加到Deque< RecordBatch >中。

    其他线程虽然手里也攥着各自申请到的ByteBuffer,但是在tryAppend时,由于Deque< RecordBatch >中的最后一个RecordBatch就是新创建的。因此这些“多余”的ByteBuffer就会被释放掉。根据ByteBuffer的大小,决定将其交给Deque or availableMemory。

    这一套处理逻辑,正式基于double-check模式保证的!

    2.2尝试写入消息

    经历了double-check以后,Deque< RecordBatch >有了,用来构建ByteBuffer也有了,这回可以往RecordBatch中写入消息了。

    // 如果是第一次写入消息,上面经历了double-check后,创建了空的Deque和ByteBuffer,
    // 现在要利用申请到的ByteBuffer,构建出RecordBatch,并将其添加到空的Deque
    // 先将“停留在内存中的消息”包装成MemoryRecords
    MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
    // 搞一个新的RecordBatch出来
    RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
    // 将包装好的MemoryRecords放到RecordBatch中
    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
    
    // 消息已经放到Batch中了,就把这个Batch添加到Deque
    dq.addLast(batch);
    // 将Batch也添加到IncompleteRecordBatches中,IncompleteRecordBatches表示当前还没有将Batch发送出去的列表
    incomplete.add(batch);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    如果是第一次写入的消息,首先会将消息包装成MemoryRecords,用MemoryRecords创建RecordBatch。用RecordBatch提供的tryAppend()方法将消息“放到”RecordBatch中。

    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        // 如果这个RecordBatch已经没有足够的空间来容纳这个MemoryRecords了,那就返回null。
        // 外层判空方法收到null,会抛出NullPointerException
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            // 调用MemoryRecords#append()方法通过Compressor将消息放到RecordBatch中,得到crc值
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            // 当前RecordBatch中最大的那条消息的size
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            // 最后一次写入时间设为:now
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }
    
    
    public boolean hasRoomFor(byte[] key, byte[] value) {
        if (!this.writable)
            return false;
    
        // Compressor组件已经写了多少条消息了
        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
        // ByteBuffer的大小 >= Compressor组件已经写入进去的消息字节数量(估算值) + 消息的大小(消息头 + 消息本身大小)
        // 假设ByteBuffer 16k,Compressor组件已经写了15.9k,目前还有一条消息 1k,此时一定为false,
        // 说明当前RecordBatch的剩余可用内存空间已经不足以写入这一条消息了,就得申请新的ByteBuffer构建新的RecordBatch了
        this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3.底层写入

    不管是第一次写入消息,还是第N次写入消息,还是在double-check的哪个阶段写入消息,最终都得通过RecordBatch#tryAppend()方法将消息按照Kafka约定的二进制协议,通过Compressor组件将消息以流的形式写入到RecordBatch中。

    如果是第一次写入消息,经过double-check后,会先后获取到Deque< RecordBatch >和ByteBuffer,因为double-check中的两次tryAppend是RecordAccumulator执行的操作,它首先会将Deque< RecordBatch >中的最后的那个RecordBatch拿出来,通过RecordBatch#tryAppend()方法将消息“放到”RecordBatch中。

    但是对于第一次写入,只是申请到了ByteBuffer,并没有构建相应的RecordBatch,因此double-check中的2次尝试写入都会失败。这样一来,就会将“停留在内存的消息”包装成MemoryRecords,并利用MemoryRecords实例化RecordBatch对象。最后通过RecordBatch#tryAppend()方法完成消息写入的任务。对了,这个RecordBatch最后还得添加到Deque< RecordBatch >中,因为后续的消息得往里写。

    如果不是第1次写入,那就再double-check模式中RecordAccumulator提供的2次尝试写入操作中,取Deque的最后一个RecordBatch,将消息按照二进制协议写入。

    3.1 如果RecordBatch剩余可用内存空间充足,可以写入

    不管是经历了九九八十一难中的哪一难,只要消息顺利抵达RecordBatch,就得先判断一下当前RecordBatch剩余的可用内存是否还能将眼前的这条消息写入其中。

    /**
     * 不管是第一次写入,还是第N次写入;不管是经历double-check的哪个tryAppend,
     * 最终都得调用RecordBatch#tryAppend()方法,将消息“放到”RecordBatch中。
     * 写入之前要经过判断,看这个RecordBatch是否还能足够的剩余空间写下这一条消息。如果有,就通过Compressor组件写入;否则,就return null
     */
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        // 如果这个RecordBatch已经没有足够的空间来容纳这个MemoryRecords了,那就返回null。
        // 外层判空方法收到null,会抛出NullPointerException
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            // 核心写入操作:调用MemoryRecords#append()方法通过Compressor将消息放到RecordBatch中,得到crc值
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            // 当前RecordBatch中最大的那条消息的size
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            // 最后一次写入时间设为:now
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    如果是第一次写入的话,那么这个RecordBatch是刚刚才添加到Deque< RecordBatch >中的,RecordBatch的剩余可用空间绝对能写下这条消息,那就让MemoryRecords将消息按照约定好的二进制协议进行转换,完事通过Compressor组件将消息写入到RecordBatch中

    /**
     * MemoryRecords先将消息按照Kafka约定好的二进制协议,将消息进行转换。
     * 最后,通过Compressor组件将消息写入到RecordBatch中
     */
    public long append(long offset, long timestamp, byte[] key, byte[] value) {
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");
    
        // 计算出这个MemoryRecords的大小
        int size = Record.recordSize(key, value);
        // 设置消息对应的offset(利用Java的DataOutputStream实现)
        compressor.putLong(offset);
        // 设置消息大小
        compressor.putInt(size);
        // 计算出crc的值
        long crc = compressor.putRecord(timestamp, key, value);
        // 通过Compressor完成写入操作
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        return crc;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    通过Compressor组件的初始化可以看出,Compressor.putxxx方法实际上是借助Java BIO实现的。这个appendStream就是Java里的DataOutputStream

    public Compressor(ByteBuffer buffer, CompressionType type) {
        
       // 省略部分代码...
    
        // 由ByteBufferOutputStream包裹着ByteBuffer,也就是持有了一个对ByteBuffer的输出流
        bufferStream = new ByteBufferOutputStream(buffer);
        // (视压缩情况,决定如何包裹)再让DataOutputStream包裹着ByteBufferOutputStream
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    数据由Java的DataOutputStream接入,当执行DataOutputStream#writeLong()等方法时会将数据转换成二进制的字节数组。(如果开启了压缩,字节数组先进压缩流的缓冲区,按照相应的压缩算法压缩后)然后再将其写入到ByteBufferOutputStream中

    3.2 如果RecordBatch剩余可用内存空间不足

    如果RecordBatch判断当前剩余可用的内存空间不足以写下这一条消息,比如经过MemoryRecords的判断,发现当前ByteBuffer的大小为16k,估算了一下已经Compressor组件已经写了15.9k了,现在还有一条1K大小的消息待写入,写不进了。那么就会返回null

    public boolean hasRoomFor(byte[] key, byte[] value) {
        if (!this.writable)
            return false;
    
        // Compressor组件已经写了多少条消息了
        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
        // ByteBuffer的大小 >= Compressor组件已经写入进去的消息字节数量(估算值) + 消息的大小(消息头 + 消息本身大小)
        // 假设ByteBuffer 16k,Compressor组件已经写了15.9k,目前还有一条消息 1k,此时一定为false,
        // 说明当前RecordBatch的剩余可用内存空间已经不足以写入这一条消息了,就得申请新的ByteBuffer构建新的RecordBatch了
        this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    RecordAccumulator收到null后,就得调用这个已经濒临写满状态的RecordBatch对应的MemoryRecords#close()方法,将当前RecordBatch对应的ByteBuffer的Compressor组件关掉,将针对ByteBuffer的I/O流也关掉,将当前的MemoryRecords里的“写标志位”设为false:不能再写了。然后第1次尝试的结果返就是null

    // 直接调用RecordBatch#tryAppend()方法,将消息写入到这个RecordBatch对应的ByteBuffer中
    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
    // 如果某个RecordBatch的剩余可用内存空间,不足以将手里的这条消息写入,这个future一定为null
    if (future == null)
        // 剩余内存空间不够写这条消息了,就将MemoryRecords关闭掉,这辆大巴车就得关门了
        last.records.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    由于第1次尝试的写入结果得到的就是个null,所以继续往下走,申请新的ByteBuffer,使用synchronized锁住Deque后再一次的尝试写入,重复double-check的写入流程…

  • 相关阅读:
    export default 导出的对象,不能解构问题,和module.exports的区别
    转行软件测试我后悔了
    Android开发知识学习——从Retrofit原理来看HTTP
    交换机和路由器技术-17-生成树协议配置
    终于有人把Java面试高分Guide总结得如此系统,堪称傻瓜式笔记总结
    02 python基本数据结构
    写给 MMSegmentation 工具箱新手的避坑指南
    通讯网关软件029——利用CommGate X2MQTT实现MQTT访问DDE数据源
    如何用Python优雅的合并两个Dict
    Redis 过期删除策略和内存淘汰策略
  • 原文地址:https://blog.csdn.net/qq_36299025/article/details/128096274