• Kafka源码分析(三) - Server端 - 消息存储


    Kafka源码分析(三) - Server端 - 消息存储 - 知乎

    一. 业务模型

     

    在上一篇文章中,我们分析了生产者的原理。下一步我们来分析下提交上来的消息在Server端时如何存储的。

    1.1 概念梳理

    KafkaTopic将数据划分成内聚性较强的子集,Topic内部又划分成多个Partition。不过这两个都是逻辑概念,真正存储文件的是Partition所对应的一个或多个Replica,即副本。在存储层有个概念和副本一一对应——Log。为了防止Log过大,增加消息过期和数据检索的成本,Log又会按一定大小划分成"段",即LogSegment。用一张图汇总这些概念间的关系:

    概念梳理

    1.2 文件分析

    1.2.1 数据目录

    Kafkap配置文件(server.properties)中有一个配置项——log.dir,其指定了kafka数据文件存放位置。为了研究数据目录的结构,我们先创建一个Topic(lao-zhang-tou-topic)

    kafka-console-producer.sh --topic lao-zhang-tou-topic --bootstrap-server localhost:9092

    然后向其中写几条消息

    1. kafka-console-producer.sh --topic lao-zhang-tou-topic --bootstrap-server localhost:9092
    2. {"message":"This is the first message"}
    3. {"message":"This is the sencond message"}

    接下来我们来看看log.dir指定目录下存放了那些文件

    log.dir目录截图

    该目录下文件分3类:

    1. 数据文件夹 (如截图中的lao-zhang-tou-topic-0)
    2. checkpoint文件
      1. cleaner-offset-checkpoint
      2. log-start-offset-checkpoint
      3. recovery-point-offset-checkpoint
      4. replication-offset-checkpoint
    3. 配置文件 (meta.properties)

    第2、3类文件后续文章会详细分析,本文主要关注截图中lao-zhang-tou-topic-0目录。

    topic目录截图

    实际上,该目录对应上文提到的Log概念,命名规则为 ${Topic}-${PartitionIndex}。该目录下,名称相同的.log文件、.index文件、.timeindex文件构成了一个LogSegment。例如图中的 00000000000000000000.log、00000000000000000000.index、00000000000000000000.timeindex 三个文件。其中.log是数据文件,用于存储消息数据;.index和.timeindex是在.log基础上建立起来的索引文件。

    1.2.2 .log文件

    log文件将消息数据依次排开进行存储

    log文件存储格式

    每个Message内部分为"数据头"(LOG_OVERHEAD)和"数据体"(Record)两部分

    message存储格式

    其中,LOG_OVERHEAD包含两个字段:

    1. offset:每条数据的逻辑偏移量,按插入顺序分别为0、1、2... ... N;每个消息的offset在Partition内部是唯一的;
    2. size:数据体(RECORD)部分的长度;

    RECORD内部格式如下:

    RECORD存储格式

    其中,

    • crc32:校验码,用于验证数据完整性;
    • magic:消息格式的版本号;v0=0,v1=1;本文讲v1格式;
    • timestamp:时间戳,具体业务含义依attributes的值而定;
    • attributes:属性值;其 8bits 的含义如下

    attributes格式

    • keyLength:key值的长度;
    • key:消息数据对应的key;
    • valueLength:value值的长度;
    • value:消息体,承载业务信息;

    1.2.3 .index和.timeindex文件

    .index文件是依offset建立其的稀疏索引,可减少通过offset查找消息时的遍历数据量。.index文件的每个索引条目占8 bytes,有两个字段:relativeOffset 和 position(各占4 bytes)。也就是消息offset到其在文件中偏移量的一个映射。那有人要问了,索引项中保存的明明是一个叫relativeOffset的东西,为什么说是offset到偏移量的映射呢?其实,准确的来讲,relativeOffset指的的相对偏移量,是对LogSegment基准offset而言的。我们注意到,一个LogSegment内的.log文件、.index文件、和.index文件除后缀外的名称都是相同的。其实这个名称就是该LogSegment的基准offset,即LogSegment内保存的第一条消息对应的offset。baseOffset + relativeOffset即可得到offset,所以称索引项是offset到物理偏移量的映射。

    不是所有的消息都对应.index文件内的一个条目。Kafka会每隔一定量的消息才会在.index建立索引条目,间隔大小由"log.index.interval.bytes"配置指定。.index文件布局示意图如下:

    索引文件示意图

    .timeindex文件和.index原理相同,只不过其IndexEntry的两个字段分别为timestamp(8 bytes)和relativeOffset(4 bytes)。用于减少以时间戳查找消息时遍历元素数量。

    1.3 顺序IO

    对于我们常用的机械硬盘,其读取数据分3步:

    1. 寻道;
    2. 寻找扇区;
    3. 读取数据;

    前两个,即寻找数据位置的过程为机械运动。我们常说硬盘比内存慢,主要原因是这两个过程在拖后腿。不过,硬盘比内存慢是绝对的吗?其实不然,如果我们能通过顺序读写减少寻找数据位置时读写磁头的移动距离,硬盘的速度还是相当可观的。一般来讲,IO速度层面,内存顺序IO > 磁盘顺序IO > 内存随机IO > 磁盘随机IO。

    Kafka在顺序IO上的设计分两方面看:

    1. LogSegment创建时,一口气申请LogSegment最大size的磁盘空间,这样一个文件内部尽可能分布在一个连续的磁盘空间内;
    2. .log文件也好,.index和.timeindex也罢,在设计上都是只追加写入,不做更新操作,这样避免了随机IO的场景;

    Kafka是公认的高性能消息中间件,顺序IO在这里占了很大一部分因素。

    不知道大家有没有听过这样一个说法:Kafka集群能承载的Partition数量有上限。很大一部分原因是Partition数量太多会抹杀掉Kafka顺序IO设计带来的优势,相当于自废武功。Why?因为不同Partition在磁盘上的存储位置可不保证连续,当以不同Partition为读写目标并发地向Kafka发送请求时,Server端近似于随机IO。

    1.4 端到端压缩

    一条压缩消息从生产者处发出后,其在消费者处才会被解压。Kafka Server端不会尝试解析消息体,直接原样存储,省掉了Server段压缩&解压缩的成本,这也是Kafka性能喜人的原因之一。

    二. 源码结构

    2.1 核心类

    2.1.1 核心类之间的关系

    Kafka消息存储涉及的核心类有:

    • ReplicaManager
    • Partition
    • Replica
    • Log
    • LogSegment
    • OffsetIndex
    • TimeIndex
    • MemoryRecords
    • FileRecords

    它们之间的关系如下图:

    核心类之间的关系

    2.1.1 数据传递对象

    Kafka消息存储的基本单位不是"一条消息",而是"一批消息"。在生产者文章中提到过,Producer针对每个Partition会攒一批消息,经过压缩后发到Server端。Server端会将对应Partition下的这一"批"消息作为一个整体进行管理。所以在Server端,一个"Record"表示"一批消息",而数据传递对象"XXXRecords"则可以表示一批或多批消息。

    MemoryRecords所表示的消息数据存储于内存。比如Server端从接到生产者消息到将消息存入磁盘的过程就用MemoryRecords来传递数据,因为这期间消息需要暂存于内存,且没有磁盘数据与之对应。MemoryRecords核心属性有两个:

    属性名类型说明
    bufferByteBuffer存储消息数据
    batchesIterable迭代器;用于以批为单位遍历buffer所存储的数据

    FileRecords所表示的消息数据存储于磁盘文件。比如从磁盘读出消息返回给消费者的过程就用FileRecords来传递数据。其核心属性如下:

    属性名类型说明
    fileFile消息数据所存储的文件
    channelFileChannel文件所对应的FileChannel
    startint本FileRecords所表示的数据在文件中的起始偏移量
    endint本FileRecords所表示的数据在文件中的结束偏移量
    sizeAtomicInteger本FileRecords所表示的数据的字节数

    2.1.2 ReplicaManager

    ReplicaManager负责管理本节点存储的所有副本。这个类的属性真的巨多。不过不要慌,对于消息存储原理这块,我们只需要关注下面这一个属性就可以,其他和请求处理以及副本复制相关的属性我们放到后边对应章节慢慢分析。

    属性名类型说明
    allPartitionsPool[TopicPartition, Partition]存储Partition对象,可根据TopicPartition类将其检索出来

    2.1.3 Partition

    Partition对象负责维护本分区下的所有副本,其核心属性如下:

    属性名类型说明
    allReplicasMapPool[Int, Replica]本分区下的所有副本。其中,key为BrokerId,value为Replica对象
    leaderReplicaIdOptOption[Int]Leader副本所在节点的BrokerId
    localBrokerIdInt本节点对应的BrokerId

    2.1.4 Replica

    Replica负责维护Log对象。Replica是业务模型层面"副本"的表示,Log是数据存储层面的"副本"。Replica核心属性如下:

    属性名类型说明
    logOption[Log]Replica对应的Log对象
    topicPartitionTopicPartition标识该副本所属"分区"
    brokerIdInt该副本所在的BrokerId
    highWatermarkMetadataLogOffsetMetadata高水位(后续章节会详细分析)
    logEndOffsetMetadataLogOffsetMetadata该副本中现存最大的Offset(后续章节会详细分析)

    2.1.5 Log

    Log负责维护副本下的LogSegment,其核心属性如下:

    属性名类型说明
    dirFileLog对应的目录,即存储LogSegment的文件夹
    segmentsConcurrentSkipListMap[java.lang.Long, LogSegment]LogSegment集合,其中key为对应LogSegment的起始offset

    2.1.6 LogSegment

    LogSegment则实实在在维护消息数据,其核心属性如下:

    属性名类型说明
    logFileRecords本日志段的消息数据
    baseOffsetLong本日志段的起始offset
    maxSegmentBytesInt本日志段的最大字节数;
    超过后就需要新建一个LogSegment;
    maxSegmentMsLong日志段也可以根据时间来滚动;
    比如待插入消息和日志段第一个消息间隔超过一定时间后,需要开个新的日志段;
    maxSegmentMs便是所指定的间隔大小(segment.ms 配置项);
    rollJitterMsLong为避免当前节点上所有LogSegment同时滚动的情况,需要在maxSegmentMs基础上减去一个随机数值;
    rollJitterMs便是这个随机扰动(segment.jitter.ms 配置项指定该随机数的最大值)
    offsetIndexOffsetIndex偏移量索引,下文分析
    timeIndexTimeIndex时间索引,下文分析

    2.1.7 OffsetIndex和TimeIndex

    首先两个索引都继承于AbstractIndex,那么他们就有一批共同的核心属性:

    属性名类型说明
    fileFile对应的索引文件
    mmapMappedByteBuffer索引文件的内存映射
    maxIndexSizeInt索引文件的最大字节数,
    由 segment.index.bytes 配置项指定
    baseOffsetLong所在日志段的起始offset

    实际上,这些属性已足够表达当前的索引逻辑,OffsetIndex和TimeIndex均未再额外自定义属性。

    2.2 消息写入流程

    消息写入流程时序图如下:

    消息写入流程

    需要提一点,这里不是为了让诸君将这一串流程视为整体记入脑海。面向对象的代码仍然要从面向对象的角度去理解。所以这里重要的是各个类各自内部的逻辑,这有助于进一步明确类所扮演的角色。

    2.2.1 ReplicaManager.appendRecords

    1. def appendRecords(timeout: Long,
    2. requiredAcks: Short,
    3. internalTopicsAllowed: Boolean,
    4. isFromClient: Boolean,
    5. entriesPerPartition: Map[TopicPartition, MemoryRecords],// 各Partition上待插入的消息数据
    6. responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
    7. delayedProduceLock: Option[Lock] = None,
    8. recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
    9. ... ...
    10. val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
    11. isFromClient = isFromClient, entriesPerPartition, requiredAcks)
    12. ... ...
    13. }
    14. private def appendToLocalLog(internalTopicsAllowed: Boolean,
    15. isFromClient: Boolean,
    16. entriesPerPartition: Map[TopicPartition, MemoryRecords],
    17. requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
    18. ... ...
    19. // step1 reject appending to internal topics if it is not allowed
    20. if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
    21. (topicPartition, LogAppendResult(
    22. LogAppendInfo.UnknownLogAppendInfo,
    23. Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
    24. } else {
    25. try {
    26. //step2 若本Broker节点不承载对应partition的主副本, 这步会抛异常
    27. val (partition, _) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
    28. //step3 将消息写入对应Partition主副本, 并唤醒相关的等待操作(比如, 消费等待)
    29. val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
    30. ... ...
    31. }
    32. }
    33. }
    34. }

    appendRecords直接调用appendToLocalLog,后者才是真正实行逻辑的方法。ReplicaManager的逻辑基本分三步走:

    1. 检查目标Topic是否为Kafka内部Topic,若是的话根据配置决定是否允许写入;
    2. 获取对应的Partition对象;
    3. 调用Partition.appendRecordsToLeader写入消息数据;

    2.2.2 Partition.appendRecordsToLeader

    接下来看看Partition内部的逻辑

    1. def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
    2. val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
    3. leaderReplicaIfLocal match {
    4. //step1 判断Leader副本是否在当前节点
    5. case Some(leaderReplica) =>
    6. //step2 获取Log对象
    7. val log = leaderReplica.log.get
    8. ... ...
    9. //step3 调用Log对象方法写入数据
    10. val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
    11. ... ...
    12. // 若本节点不是目标Partition的Leader副本, 抛异常
    13. case None =>
    14. throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
    15. .format(topicPartition, localBrokerId))
    16. }
    17. }
    18. ... ...
    19. }

    这里的逻辑也分3步走:

    1. 判断Leader副本是否在当前节点;
    2. 获取Log对象;
    3. 调用Log对象的appendAsLeader方法写入数据;

    这里我们额外看下第1步的原理。leaderReplicaIfLocal是个方法

    1. def leaderReplicaIfLocal: Option[Replica] =
    2. leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
    3. def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId))

    其核心思想是那本节点BrokerId和Leader副本所在节点的BrokerId作比较,若相等,则返回对应的Replica对象。

    2.2.3 Log.appendAsLeader

    1. def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {
    2. append(records, isFromClient, assignOffsets = true, leaderEpoch)
    3. }
    4. private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
    5. ... ...
    6. // maybe roll the log if this segment is full
    7. val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
    8. ... ...
    9. // 将消息插入segment
    10. segment.append(largestOffset = appendInfo.lastOffset,
    11. largestTimestamp = appendInfo.maxTimestamp,
    12. shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
    13. records = validRecords)
    14. ... ...
    15. }

    appendAsLeader方法直接调用append方法,后者两步走:

    1. 判断是否需要创建一个新的LogSegment,并返回最新的LogSegment;
    2. 调用LogSegment.append方法写入数据;

    这里我们再额外关注下第1步的判断标准。主要还是根据LogSegment.shouldRoll方法的返回值来作决策:

    1. def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
    2. val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
    3. size > maxSegmentBytes - messagesSize ||
    4. (size > 0 && reachedRollMs) ||
    5. offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
    6. }

    Kafka的源码很清晰的,这方面值得点赞和学习。从shouldRoll的结果表达式我们可以看到,以下4类场景中,LogSegment需要向前滚动:

    1. 若接受新消息的写入,当前LogSegment将超过最大字节数限制;
    2. 若接受新消息的写入,当前LogSegment将超过最大时间跨度限制;
    3. 当前LogSegment对应的索引已无法写入新数据;
    4. 输入的offset不在当前LogSegment表示范围;

    2.2.4 LogSegment.append

    1. def append(largestOffset: Long,
    2. largestTimestamp: Long,
    3. shallowOffsetOfMaxTimestamp: Long,
    4. records: MemoryRecords): Unit = {
    5. // step1.1 判断输入消息大小
    6. if (records.sizeInBytes > 0) {
    7. trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
    8. s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
    9. // step1.2 校验offset
    10. val physicalPosition = log.sizeInBytes()
    11. if (physicalPosition == 0)
    12. rollingBasedTimestamp = Some(largestTimestamp)
    13. ensureOffsetInRange(largestOffset)
    14. // step2 append the messages
    15. val appendedBytes = log.append(records)
    16. trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
    17. // step3 Update the in memory max timestamp and corresponding offset.
    18. if (largestTimestamp > maxTimestampSoFar) {
    19. maxTimestampSoFar = largestTimestamp
    20. offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
    21. }
    22. // step4 append an entry to the index (if needed)
    23. if (bytesSinceLastIndexEntry > indexIntervalBytes) {
    24. offsetIndex.append(largestOffset, physicalPosition)
    25. timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
    26. bytesSinceLastIndexEntry = 0
    27. }
    28. bytesSinceLastIndexEntry += records.sizeInBytes
    29. }
    30. }

    LogSegment.append大体可以分为4步:

    1. 数据校验
    2. 校验输入消息大小;
    3. 校验offset;
    4. 写入数据(注意: 此步的log对象不是Log类的实例,而是FileRecords的实例);
    5. 更新统计数据;
    6. 处理索引;
  • 相关阅读:
    Spring Bean循环依赖问题及解决
    Git 命令记录
    【uni-app系列】uni-app从0到1开发实例
    预编码ZF,MMSE,THP准则线性预编码误码率仿真
    RK3588S上从训练Minist到部署Minist仓库介绍
    C++ STL - map 与 multimap用法和区别
    外贸客户开发信怎么写?如何撰写营销邮件?
    rust迭代器
    [附源码]java毕业设计会议室会议管理系统
    2023年中国企业营销数字化行业发展趋势分析:企业营销数字化花费增速放缓[图]
  • 原文地址:https://blog.csdn.net/qq_32907195/article/details/126223294