• Kafka RecordAccumulator 一


    Kafka RecordAccumulator 一

    当需要发送消息时,需要封装成批次统一来发,而不是一条一条的发消息

                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
    
    • 1

    RecordAccumulator 方法大概在做这些事情

    在这里插入图片描述

    这里我们看些整个方法的操作步骤流程:

    (1)获取该topicPartition对应的分区,也就是要把数据放到这个对列的批次里面。如果对列已经存在那么就使用该对列,否则创建一个新队列(对应该topicPartition)

    Deque<RecordBatch> dq = getOrCreateDeque(tp);
    
    • 1

    (2) 尝试将数据写入到对列里面的批次中,但是一开始添加数据是失败的,因为当前只是创建了对列,但是数据需要存储在批次对象里面(该批次对象是需要分配内存的),目前尚未分配内存,因此第一次运行到这里是失败的

    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
    
    • 1

    (3) 计算一个批次的大小

    在消息的大小和批次的大小之间取一个最大值,用这个值作为当前这个批次的大小。

    有可能我们的一个消息的大小比一个设定好的批次的大小还要大。 默认一个批次的大小是16K(可能业务的一条消息几十M都是有可能的)。

    因此我们需要注意:

    如果我们生产者发送数的时候,如果我们的消息的大小都是超过16K, 说明其实就是一条消息就是一个批次,那也就是说消息是一条一条被发送出去的。 那如果是这样的话,批次这个概念的设计就没有意义了 ,而且吞吐也上不去。所以大家一定要根据自定公司的数据大小的情况去设置批次的大小。

                int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
    
    • 1

    (4) 根据批次去申请内存,Kafka自己做了内存池,可以更好地申请释放内存

                ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
    
    • 1

    (5) 再次尝试写入数据

    代码第一次执行到这儿的时候 依然还是失败的(appendResult==null)

    目前虽然已经分配了内存 ,但是还没有创建批次,那我们向往批次里面写数据 ,还是不能写的。

                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
    
    • 1

    (6) 根据内存大小封装批次

     MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                    RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                    //尝试往这个批次里面写数据,到这个时候 我们的代码会执行成功。
    
                    //线程一,就往批次里面写数据,这个时候就写成功了。
                    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
                  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    (7) 把批次数据放入到队尾

     dq.addLast(batch);
                    incomplete.add(batch);
    
    • 1
    • 2
  • 相关阅读:
    一个非常不错的开源Docker管理工具-DockerUI
    HelpLook AI 升级!一键生成SEO设置关键要素
    板凳-------unix 网络编程 卷1-1简介
    【项目原理】ESP12F作无线网卡
    【算法基础】TOPSIS法
    [2022-12-06]神经网络与深度学习hw11 - 各种优化算法比较
    django运行问题
    js如何定义二位数组然后转josn数据,ajax上传给php,php通过json_decode解析
    使用JMeter创建FTP测试计划
    动手写prometheus的exporter-01-Gauge(仪表盘)
  • 原文地址:https://blog.csdn.net/zhangkai1992/article/details/127723203