• 01 kafka 记录的存储


    前言

    最近 会涉及到一些 kafka 相关的环境搭建, 然后 客户端 和 服务器 连接的过程中会出现一些问题

    因而 会存在一些 需要了解 kafka 代码的一些需求

    从而 衍生出 一些 知识点 的分析, 记录

    kafka 的记录是如何存储的 这里就是其中之一

    kafka 服务器基于 2.4.1, 客户端基于 2.2.0

    测试用例

    我们就来看看 我们的这个 producerRecord 是如何存储的 

    1. /**
    2. * Test06KafkaProducer
    3. *
    4. * @author Jerry.X.He <970655147@qq.com>
    5. * @version 1.0
    6. * @date 2022-05-28 10:14
    7. */
    8. public class Test06KafkaProducer {
    9. // Test06KafkaProducer
    10. public static void main(String[] args) {
    11. Properties properties = new Properties();
    12. properties.put("bootstrap.servers", "master:9092");
    13. properties.put("acks", "all");
    14. properties.put("retries", 0);
    15. properties.put("batch.size", 16384);
    16. properties.put("linger.ms", 1);
    17. properties.put("buffer.memory", 33554432);
    18. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    19. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    20. String topic = "test20220528";
    21. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    22. for (int i = 1; i <= 1; i++) {
    23. kafkaProducer.send(new ProducerRecord<>(topic, "message" + i));
    24. System.out.println("message" + i);
    25. }
    26. kafkaProducer.close();
    27. }
    28. }

    客户端的处理

    send 的时候会将 body 的相关信息写入到 byteBuffer 

    这里在 byteBuffer 中写入的数据的偏移段是 61 - 75 

    1. # 以这里的场景为例, body 中的实际情况如下
    2. bodyLen = 14 : 1 byte
    3. attribute = 0 : 1 byte
    4. timestampDelta = 0 : 1 byte
    5. offsetDelta = 0 : 1 byte
    6. key = -1 : 1 byte
    7. valueLen = 8 : 1 byte
    8. value = message1 : 8 byte
    9. headerLen = 0 : 1 byte

    序列化 body 的到 ProducerBatch 之后, 将 ProducerBatch 添加到 accumulator 的 batches 中, 然后 发送的任务交给 producer 线程 

    producer 线程 从队列中获取需要发送的消息, 并写入 header 信息, 发送

    1. # 以这里的场景为例, header 中的实际情况如下
    2. baseOffset = 0 : 8 byte
    3. lengthOffset = 64 : 4 byte
    4. partitionLeaderEpoch = -1 : 4 byte
    5. magic = 2 : 1 byte
    6. crc = 558737589 : 4 byte
    7. attributes = 0 : 2 byte
    8. lastOffsetDelta = 0 : 4 byte
    9. firstTimestamp = 1654310895383 : 8 byte
    10. maxTimestamp = 1654310895383 : 8 byte
    11. producerId = -1 : 8 byte
    12. epoch = -1 : 2 byte
    13. sequence = -1 : 4 byte
    14. numRecords = 1 : 4 byte

    在 MemoryRecordsBuilder.close 的时候转换为 MemoryRecords 

    从队列中获取到 当前批次需要发送的数据之后, 构造 ProducerRequest 和服务器交互 

    服务端的处理

    服务器拿到 Produce request 之后走 handleProduceRequest 

    partionRecords 中对应的就是客户端传递过来的根据 TopicPartition 分组之后的数据

    然后尝试将数据写入到 log, 这里的 entriesPerPartition 包含了多个 TopicParition 的数据 

    细化到每一个 TopicParition 会对应于一个 Log 

    将这个 TopicParition 的多个 record 写入到日志中 

    一个 Log 会对应于多个 Segment, 根据 log.segment.bytes 作为拆分的大小 

    并根据一定的大小 抽样记录 baseOffset -> physicalOffset, baseOffset -> timeStamp 

    所以 除了记录数据的 log 文件之外, 还有一个 index, 一个 timeindex 文件, 用于加速查询的 

    1. -rw-r--r-- 1 jerry wheel 8 Jun 5 15:49 00000000000000000000.index
    2. -rw-r--r-- 1 jerry wheel 228 Jun 5 15:49 00000000000000000000.log
    3. -rw-r--r-- 1 jerry wheel 12 Jun 5 15:49 00000000000000000000.timeindex

    服务端的处理中还有一个细节是 关于 record 的 baseOffset 的设置, 这里 record 的 byteBuffer 的偏移是 58, 因此 baseOffset 的数据存储是在 58 - 64, 如下图 这里设置 baseOffset 为 14 

    record 委托 byteBuffer 更新 baseOffset 

    kafka 的三类日志文件 

    文件包含三类, log 文件, index 文件, timeindex 文件 

    log 中包含的是具体的数据, 包含了如上的 headerInfo 和 bodyInfo 

    index 文件是抽样的一部分 baseOffset -> physicalOffset 的映射, physicalOffset 指的是 baseOffset 对应的记录存储于 log 文件的偏移 

    timeindex 文件是抽样的一部分 baseOffset -> maxTimestamp 

    log 文件

    1. /**
    2. * Test09ReadKafkaLog
    3. *
    4. * @author Jerry.X.He <970655147@qq.com>
    5. * @version 1.0
    6. * @date 2022-06-04 11:02
    7. */
    8. public class Test09ReadKafkaLog {
    9. // Test09ReadKafkaLog
    10. public static void main(String[] args) throws Exception {
    11. String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.log";
    12. // String path = "/tmp/kafka-logs/test20220528-0/00000000000000000003.log";
    13. File file = new File(path);
    14. int messageCount = (int) (file.length() / 76);
    15. InputStream is = new FileInputStream(path);
    16. DataInputStream dis = new DataInputStream(is);
    17. long availableBefore = is.available();
    18. List list = new ArrayList<>();
    19. for (int i = 0; i < messageCount; i++) {
    20. // header
    21. long baseOffset = dis.readLong();
    22. int lengthOffset = dis.readInt();
    23. int partitionLeaderEpoch = dis.readInt();
    24. int magic = dis.readByte();
    25. int crc = dis.readInt();
    26. int attributeCount = dis.readShort();
    27. int lastOffsetDelta = dis.readInt();
    28. Date firstTimestamp = new Date(dis.readLong());
    29. Date maxTimestamp = new Date(dis.readLong());
    30. long producerId = dis.readLong();
    31. int epoch = dis.readShort();
    32. int sequence = dis.readInt();
    33. int numRecords = dis.readInt();
    34. // body
    35. int bodyLen = dis.readByte() >> 1;
    36. int attributeCountInBody = dis.readByte();
    37. int timestampDelta = dis.readByte() >> 1;
    38. int offsetDelta = dis.readByte() >> 1;
    39. int key = dis.readByte();
    40. int valueLen = dis.readByte() >> 1;
    41. byte[] valueBytes = new byte[valueLen];
    42. dis.read(valueBytes);
    43. String value = new String(valueBytes);
    44. int headerLen = dis.readByte() >> 1;
    45. HeaderInfo headerInfo = new HeaderInfo(baseOffset, lengthOffset, partitionLeaderEpoch, magic, crc,
    46. attributeCount, lastOffsetDelta, firstTimestamp, maxTimestamp, producerId, epoch, sequence, numRecords);
    47. BodyInfo bodyInfo = new BodyInfo(bodyLen, attributeCountInBody, timestampDelta, offsetDelta, key, valueLen, valueBytes, value, headerLen);
    48. RecordInfo recordInfo = new RecordInfo(headerInfo, bodyInfo);
    49. list.add(recordInfo);
    50. }
    51. long availableAfter = is.available();
    52. System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
    53. for (RecordInfo recordInfo : list) {
    54. System.out.println(JSON.toJSONString(recordInfo, SerializerFeature.WriteDateUseDateFormat));
    55. }
    56. int x = 0;
    57. }
    58. @Data
    59. private static class RecordInfo {
    60. public HeaderInfo headerInfo;
    61. public BodyInfo bodyInfo;
    62. public RecordInfo(HeaderInfo headerInfo, BodyInfo bodyInfo) {
    63. this.headerInfo = headerInfo;
    64. this.bodyInfo = bodyInfo;
    65. }
    66. }
    67. @Data
    68. private static class HeaderInfo {
    69. public long baseOffset;
    70. public int lengthOffset;
    71. public int partitionLeaderEpoch;
    72. public int magic;
    73. public int crc;
    74. public int attributeCount;
    75. public int lastOffsetDelta;
    76. public Date firstTimestamp;
    77. public Date maxTimestamp;
    78. public long producerId;
    79. public int epoch;
    80. public int sequence;
    81. public int numRecords;
    82. public HeaderInfo(long baseOffset, int lengthOffset, int partitionLeaderEpoch, int magic, int crc,
    83. int attributeCount, int lastOffsetDelta, Date firstTimestamp, Date maxTimestamp,
    84. long producerId, int epoch, int sequence, int numRecords) {
    85. this.baseOffset = baseOffset;
    86. this.lengthOffset = lengthOffset;
    87. this.partitionLeaderEpoch = partitionLeaderEpoch;
    88. this.magic = magic;
    89. this.crc = crc;
    90. this.attributeCount = attributeCount;
    91. this.lastOffsetDelta = lastOffsetDelta;
    92. this.firstTimestamp = firstTimestamp;
    93. this.maxTimestamp = maxTimestamp;
    94. this.producerId = producerId;
    95. this.epoch = epoch;
    96. this.sequence = sequence;
    97. this.numRecords = numRecords;
    98. }
    99. }
    100. @Data
    101. private static class BodyInfo {
    102. public int bodyLen;
    103. public int attributeCount;
    104. public int timestampDelta;
    105. public int offsetDelta;
    106. public int key;
    107. public int valueLen;
    108. public byte[] valueBytes;
    109. public String value;
    110. public int headerLen;
    111. public BodyInfo(int bodyLen, int attributeCount, int timestampDelta, int offsetDelta, int key,
    112. int valueLen, byte[] valueBytes, String value, int headerLen) {
    113. this.bodyLen = bodyLen;
    114. this.attributeCount = attributeCount;
    115. this.timestampDelta = timestampDelta;
    116. this.offsetDelta = offsetDelta;
    117. this.key = key;
    118. this.valueLen = valueLen;
    119. this.valueBytes = valueBytes;
    120. this.value = value;
    121. this.headerLen = headerLen;
    122. }
    123. }
    124. }

    输出结果如下 

    1. availBefore : 228, availAfter : 0
    2. {"bodyInfo":{"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8},"headerInfo":{"attributeCount":0,"baseOffset":0,"crc":458006489,"epoch":-1,"firstTimestamp":"2022-06-05 15:47:32","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:47:32","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1}}
    3. {"bodyInfo":{"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8},"headerInfo":{"attributeCount":0,"baseOffset":1,"crc":-368922856,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:05","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:05","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1}}
    4. {"bodyInfo":{"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8},"headerInfo":{"attributeCount":0,"baseOffset":2,"crc":1402645798,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:18","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:18","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1}}

    index 文件

    1. /**
    2. * Test09ReadKafkaLog
    3. *
    4. * @author Jerry.X.He <970655147@qq.com>
    5. * @version 1.0
    6. * @date 2022-06-04 11:02
    7. */
    8. public class Test09ReadKafkaIndex {
    9. // Test09ReadKafkaLog
    10. public static void main(String[] args) throws Exception {
    11. String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.index";
    12. File file = new File(path);
    13. InputStream is = new FileInputStream(path);
    14. DataInputStream dis = new DataInputStream(is);
    15. long availableBefore = is.available();
    16. ListInteger, Integer>> offsetList = new ArrayList<>();
    17. while (dis.available() > 0) {
    18. int offset1 = dis.readInt();
    19. int position1 = dis.readInt();
    20. offsetList.add(new Pair<>(offset1, position1));
    21. }
    22. long availableAfter = is.available();
    23. System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
    24. for (Pair<Integer, Integer> pair : offsetList) {
    25. System.out.println(String.format("%s -> %s", pair.getFirst(), pair.getSecond()));
    26. }
    27. }
    28. }

    输出结果如下 

    1. availBefore : 8, availAfter : 0
    2. 2 -> 152

    timeindex 文件

    1. /**
    2. * Test09ReadKafkaLog
    3. *
    4. * @author Jerry.X.He <970655147@qq.com>
    5. * @version 1.0
    6. * @date 2022-06-04 11:02
    7. */
    8. public class Test09ReadKafkaTimeIndex {
    9. // Test09ReadKafkaLog
    10. public static void main(String[] args) throws Exception {
    11. String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.timeindex";
    12. File file = new File(path);
    13. InputStream is = new FileInputStream(path);
    14. DataInputStream dis = new DataInputStream(is);
    15. long availableBefore = is.available();
    16. ListInteger, Date>> offsetList = new ArrayList<>();
    17. while (dis.available() > 0) {
    18. Date timestamp1 = new Date(dis.readLong());
    19. int offset1 = dis.readInt();
    20. offsetList.add(new Pair<>(offset1, timestamp1));
    21. }
    22. long availableAfter = is.available();
    23. System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
    24. for (Pair<Integer, Date> pair : offsetList) {
    25. System.out.println(String.format("%s -> %s", pair.getFirst(), DateFormatUtils.format(pair.getSecond(), "yyyy-MM-dd hh:mm:ss")));
    26. }
    27. }
    28. }

    输出结果如下 

    1. availBefore : 12, availAfter : 0
    2. 2 -> 2022-06-05 03:49:18

    完 

  • 相关阅读:
    基于python实现的图像绘制(二维散点图、饼图,绘制三维的直方图、线图、散点图、利用pyecharts在地图上绘制各种图标)
    leetcode经典例题——单词拆分
    flutter 记录学习不一样的动画(二)
    努力一周,开源一个超好用的接口Mock工具——Msw-Tools
    AWB(Auto White Banlance)——自动白平衡
    ESP8266-Arduino编程实例-SHT21温度湿度传感器驱动
    C++ 背包问题——多重背包
    华为机试真题 Java 实现【最长连续方波信号】
    IDEA 调试远程服务
    SpringBoot+MP操作DM8
  • 原文地址:https://blog.csdn.net/u011039332/article/details/125126838