最近 会涉及到一些 kafka 相关的环境搭建, 然后 客户端 和 服务器 连接的过程中会出现一些问题
因而 会存在一些 需要了解 kafka 代码的一些需求
从而 衍生出 一些 知识点 的分析, 记录
kafka 的记录是如何存储的 这里就是其中之一
kafka 服务器基于 2.4.1, 客户端基于 2.2.0
我们就来看看 我们的这个 producerRecord 是如何存储的
- /**
- * Test06KafkaProducer
- *
- * @author Jerry.X.He <970655147@qq.com>
- * @version 1.0
- * @date 2022-05-28 10:14
- */
- public class Test06KafkaProducer {
-
- // Test06KafkaProducer
- public static void main(String[] args) {
-
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "master:9092");
- properties.put("acks", "all");
- properties.put("retries", 0);
- properties.put("batch.size", 16384);
- properties.put("linger.ms", 1);
- properties.put("buffer.memory", 33554432);
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- String topic = "test20220528";
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- for (int i = 1; i <= 1; i++) {
- kafkaProducer.send(new ProducerRecord<>(topic, "message" + i));
- System.out.println("message" + i);
- }
- kafkaProducer.close();
-
- }
-
- }
send 的时候会将 body 的相关信息写入到 byteBuffer
这里在 byteBuffer 中写入的数据的偏移段是 61 - 75
- # 以这里的场景为例, body 中的实际情况如下
- bodyLen = 14 : 1 byte
- attribute = 0 : 1 byte
- timestampDelta = 0 : 1 byte
- offsetDelta = 0 : 1 byte
- key = -1 : 1 byte
- valueLen = 8 : 1 byte
- value = message1 : 8 byte
- headerLen = 0 : 1 byte
序列化 body 的到 ProducerBatch 之后, 将 ProducerBatch 添加到 accumulator 的 batches 中, 然后 发送的任务交给 producer 线程
producer 线程 从队列中获取需要发送的消息, 并写入 header 信息, 发送
- # 以这里的场景为例, header 中的实际情况如下
- baseOffset = 0 : 8 byte
- lengthOffset = 64 : 4 byte
- partitionLeaderEpoch = -1 : 4 byte
- magic = 2 : 1 byte
- crc = 558737589 : 4 byte
- attributes = 0 : 2 byte
- lastOffsetDelta = 0 : 4 byte
- firstTimestamp = 1654310895383 : 8 byte
- maxTimestamp = 1654310895383 : 8 byte
- producerId = -1 : 8 byte
- epoch = -1 : 2 byte
- sequence = -1 : 4 byte
- numRecords = 1 : 4 byte
在 MemoryRecordsBuilder.close 的时候转换为 MemoryRecords
从队列中获取到 当前批次需要发送的数据之后, 构造 ProducerRequest 和服务器交互
服务器拿到 Produce request 之后走 handleProduceRequest
partionRecords 中对应的就是客户端传递过来的根据 TopicPartition 分组之后的数据
然后尝试将数据写入到 log, 这里的 entriesPerPartition 包含了多个 TopicParition 的数据
细化到每一个 TopicParition 会对应于一个 Log
将这个 TopicParition 的多个 record 写入到日志中
一个 Log 会对应于多个 Segment, 根据 log.segment.bytes 作为拆分的大小
并根据一定的大小 抽样记录 baseOffset -> physicalOffset, baseOffset -> timeStamp
所以 除了记录数据的 log 文件之外, 还有一个 index, 一个 timeindex 文件, 用于加速查询的
- -rw-r--r-- 1 jerry wheel 8 Jun 5 15:49 00000000000000000000.index
- -rw-r--r-- 1 jerry wheel 228 Jun 5 15:49 00000000000000000000.log
- -rw-r--r-- 1 jerry wheel 12 Jun 5 15:49 00000000000000000000.timeindex
服务端的处理中还有一个细节是 关于 record 的 baseOffset 的设置, 这里 record 的 byteBuffer 的偏移是 58, 因此 baseOffset 的数据存储是在 58 - 64, 如下图 这里设置 baseOffset 为 14
record 委托 byteBuffer 更新 baseOffset
文件包含三类, log 文件, index 文件, timeindex 文件
log 中包含的是具体的数据, 包含了如上的 headerInfo 和 bodyInfo
index 文件是抽样的一部分 baseOffset -> physicalOffset 的映射, physicalOffset 指的是 baseOffset 对应的记录存储于 log 文件的偏移
timeindex 文件是抽样的一部分 baseOffset -> maxTimestamp
- /**
- * Test09ReadKafkaLog
- *
- * @author Jerry.X.He <970655147@qq.com>
- * @version 1.0
- * @date 2022-06-04 11:02
- */
- public class Test09ReadKafkaLog {
-
- // Test09ReadKafkaLog
- public static void main(String[] args) throws Exception {
-
- String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.log";
- // String path = "/tmp/kafka-logs/test20220528-0/00000000000000000003.log";
- File file = new File(path);
- int messageCount = (int) (file.length() / 76);
- InputStream is = new FileInputStream(path);
- DataInputStream dis = new DataInputStream(is);
-
- long availableBefore = is.available();
- List
list = new ArrayList<>(); - for (int i = 0; i < messageCount; i++) {
- // header
- long baseOffset = dis.readLong();
- int lengthOffset = dis.readInt();
- int partitionLeaderEpoch = dis.readInt();
- int magic = dis.readByte();
- int crc = dis.readInt();
- int attributeCount = dis.readShort();
- int lastOffsetDelta = dis.readInt();
- Date firstTimestamp = new Date(dis.readLong());
- Date maxTimestamp = new Date(dis.readLong());
- long producerId = dis.readLong();
- int epoch = dis.readShort();
- int sequence = dis.readInt();
- int numRecords = dis.readInt();
-
- // body
- int bodyLen = dis.readByte() >> 1;
- int attributeCountInBody = dis.readByte();
- int timestampDelta = dis.readByte() >> 1;
- int offsetDelta = dis.readByte() >> 1;
- int key = dis.readByte();
- int valueLen = dis.readByte() >> 1;
- byte[] valueBytes = new byte[valueLen];
- dis.read(valueBytes);
- String value = new String(valueBytes);
- int headerLen = dis.readByte() >> 1;
-
- HeaderInfo headerInfo = new HeaderInfo(baseOffset, lengthOffset, partitionLeaderEpoch, magic, crc,
- attributeCount, lastOffsetDelta, firstTimestamp, maxTimestamp, producerId, epoch, sequence, numRecords);
- BodyInfo bodyInfo = new BodyInfo(bodyLen, attributeCountInBody, timestampDelta, offsetDelta, key, valueLen, valueBytes, value, headerLen);
- RecordInfo recordInfo = new RecordInfo(headerInfo, bodyInfo);
- list.add(recordInfo);
- }
- long availableAfter = is.available();
-
- System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
- for (RecordInfo recordInfo : list) {
- System.out.println(JSON.toJSONString(recordInfo, SerializerFeature.WriteDateUseDateFormat));
- }
- int x = 0;
-
- }
-
- @Data
- private static class RecordInfo {
- public HeaderInfo headerInfo;
- public BodyInfo bodyInfo;
-
- public RecordInfo(HeaderInfo headerInfo, BodyInfo bodyInfo) {
- this.headerInfo = headerInfo;
- this.bodyInfo = bodyInfo;
- }
- }
-
- @Data
- private static class HeaderInfo {
- public long baseOffset;
- public int lengthOffset;
- public int partitionLeaderEpoch;
- public int magic;
- public int crc;
- public int attributeCount;
- public int lastOffsetDelta;
- public Date firstTimestamp;
- public Date maxTimestamp;
- public long producerId;
- public int epoch;
- public int sequence;
- public int numRecords;
-
- public HeaderInfo(long baseOffset, int lengthOffset, int partitionLeaderEpoch, int magic, int crc,
- int attributeCount, int lastOffsetDelta, Date firstTimestamp, Date maxTimestamp,
- long producerId, int epoch, int sequence, int numRecords) {
- this.baseOffset = baseOffset;
- this.lengthOffset = lengthOffset;
- this.partitionLeaderEpoch = partitionLeaderEpoch;
- this.magic = magic;
- this.crc = crc;
- this.attributeCount = attributeCount;
- this.lastOffsetDelta = lastOffsetDelta;
- this.firstTimestamp = firstTimestamp;
- this.maxTimestamp = maxTimestamp;
- this.producerId = producerId;
- this.epoch = epoch;
- this.sequence = sequence;
- this.numRecords = numRecords;
- }
- }
-
- @Data
- private static class BodyInfo {
- public int bodyLen;
- public int attributeCount;
- public int timestampDelta;
- public int offsetDelta;
- public int key;
- public int valueLen;
- public byte[] valueBytes;
- public String value;
- public int headerLen;
-
- public BodyInfo(int bodyLen, int attributeCount, int timestampDelta, int offsetDelta, int key,
- int valueLen, byte[] valueBytes, String value, int headerLen) {
- this.bodyLen = bodyLen;
- this.attributeCount = attributeCount;
- this.timestampDelta = timestampDelta;
- this.offsetDelta = offsetDelta;
- this.key = key;
- this.valueLen = valueLen;
- this.valueBytes = valueBytes;
- this.value = value;
- this.headerLen = headerLen;
- }
- }
-
- }
输出结果如下
- availBefore : 228, availAfter : 0
- {"bodyInfo":{"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8},"headerInfo":{"attributeCount":0,"baseOffset":0,"crc":458006489,"epoch":-1,"firstTimestamp":"2022-06-05 15:47:32","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:47:32","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1}}
- {"bodyInfo":{"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8},"headerInfo":{"attributeCount":0,"baseOffset":1,"crc":-368922856,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:05","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:05","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1}}
- {"bodyInfo":{"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8},"headerInfo":{"attributeCount":0,"baseOffset":2,"crc":1402645798,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:18","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:18","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1}}
- /**
- * Test09ReadKafkaLog
- *
- * @author Jerry.X.He <970655147@qq.com>
- * @version 1.0
- * @date 2022-06-04 11:02
- */
- public class Test09ReadKafkaIndex {
-
- // Test09ReadKafkaLog
- public static void main(String[] args) throws Exception {
-
- String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.index";
- File file = new File(path);
- InputStream is = new FileInputStream(path);
- DataInputStream dis = new DataInputStream(is);
-
- long availableBefore = is.available();
- List
Integer, Integer>> offsetList = new ArrayList<>(); - while (dis.available() > 0) {
- int offset1 = dis.readInt();
- int position1 = dis.readInt();
- offsetList.add(new Pair<>(offset1, position1));
- }
- long availableAfter = is.available();
-
- System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
- for (Pair<Integer, Integer> pair : offsetList) {
- System.out.println(String.format("%s -> %s", pair.getFirst(), pair.getSecond()));
- }
-
- }
-
- }
输出结果如下
- availBefore : 8, availAfter : 0
- 2 -> 152
- /**
- * Test09ReadKafkaLog
- *
- * @author Jerry.X.He <970655147@qq.com>
- * @version 1.0
- * @date 2022-06-04 11:02
- */
- public class Test09ReadKafkaTimeIndex {
-
- // Test09ReadKafkaLog
- public static void main(String[] args) throws Exception {
-
- String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.timeindex";
- File file = new File(path);
- InputStream is = new FileInputStream(path);
- DataInputStream dis = new DataInputStream(is);
-
- long availableBefore = is.available();
- List
Integer, Date>> offsetList = new ArrayList<>(); - while (dis.available() > 0) {
- Date timestamp1 = new Date(dis.readLong());
- int offset1 = dis.readInt();
- offsetList.add(new Pair<>(offset1, timestamp1));
- }
- long availableAfter = is.available();
-
- System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
- for (Pair<Integer, Date> pair : offsetList) {
- System.out.println(String.format("%s -> %s", pair.getFirst(), DateFormatUtils.format(pair.getSecond(), "yyyy-MM-dd hh:mm:ss")));
- }
-
- }
-
- }
输出结果如下
- availBefore : 12, availAfter : 0
- 2 -> 2022-06-05 03:49:18
完