RocketMQ4.x和RocketMQ5.0在CommitLog的设计上面基本上没有太多调整,还是沿用了之前的设计。下面来对CommitLog的设计思想和源码进行分析。
RocketMQ版本5.0.0
CommitLog是对RocketMQ的存储的抽象,示意图如下:

CommitLog 主要由几部分组成:
Tips: 多目录存储参照RIP-7
CommitLog 底层主要是通过FileChannel来实现。其中还有一些RocketMQ的自身优化,例如: TransientStorePool。
MappedFileQueue 是对数据存储文件的一个抽象,将多个数据文件抽象成为一个文件队列。通过这个文件队列对文件进行操作操作。同时保存一些 CommitLog 的属性。来看一下MappedFileQueue中的几个重要的属性:
flushedWhere: 当前刷盘指针,表示该指针之前所有的数据全部持久化到了硬盘上面committedWhere :当前数据提交指针,内存中byteBuffer当前的写指针,该值大于等于flushedWhereTips: 同步刷盘的过程中flushedWhere等于committedWhere
MappedFileQueue 同时提供了一些操作例如:
2.1 MappedFile
MappedFile 是对文件的抽象,包含了对RocketMQ数据文件的整个操作。例如获取文件名称、文件大小、判断文件是否可用、是否已经满了等等的操作。
单个数据文件默认是 1G 。然后按照顺序存储Producer发送的消息。数据格式如下图所示:

数据文件就是由上图所示的一条条的数据组成。通过观察你可能会发现存储Topic的长度只占用了一个字节。这个也是在使用RocketMQ需要注意的一点就是:
由于只用了一个字节保存Topic的长度所以Topic的最大长度是127字符,如果使用中文长度会更短。 在RocketMQ5.0的版本中增加 PutMessageHook 会在Put Message之前去Check一些必要的数据和参数。例如Topic的长度就其中之一。具体可以参照 HookUtils 工具类。

3.1 消息校验
生产者发送消息到 Broker Master 后,在存入 CommitLog 之前首先会经过一个 PutMessageHook 的处理接口类,这个有不同的实现,主要用于校验一些消息的数据如:Message Topic 大小、Message Body大小、以及MessageStore的一些状态等等。
Tips: 通过实现PutMessageHook可以做到更多校验以及一些其他的事情。
3.2 消息处理
CommitLog#asyncPutMessage 在CommitLog有两个分别用来处理单个消息和批量消息。下面以单个消息处理为例
消息的一些属性设置
消息体的CRC、设置消息生产者的IP地址是否为IPV6、设置存储Broker IP地址是否为IPV6
HA处理设置
根据服务的设置判断是否需要处理HA. RocketMQ5.0 HA有两种模式:
这两个模式主要目的是根据返回ACK的Slave数量来判断消息是否同步到Slave成功
Append Message 到 MappedByteBuffer
处理Append的Result
根据Broker的配置进行刷盘
刷盘是根据在启动Broker的时候配置的刷盘模式进行不同的刷盘策略。
处理HA
HA的处理不是必须的,这个看RocketMQ是否配置了HA模式。只有配置了才需要进行处理。
刷盘主要有两种模式:同步刷盘和异步刷盘。刷盘主要由 FlushManager 进行管理。刷盘接口关系:

4.1 同步刷盘解析
同步刷盘是由 GroupCommitService 来处理,同步刷盘的详细流程如下:

在刷盘过程中使用到的两个类:GroupCommitService 和 FlushDiskWatcher 从本质上看都是一个Thread。
GroupCommitService处理主要分为三步:
往链表中写入GroupCommitRequest请求

GroupCommitService执行doCommit方法

执行刷盘操作,将结果写入GroupCommitRequest

到这里就已经基本上完成整个同步的刷盘步骤。细心的可能会发现还有一个 FlushDiskWatcher 这个类。它的功能主要是:处理在规定时间内还没有刷盘成功的请求。
4.2 异步刷盘解析
异步刷盘的服务是**FlushRealTimeService** ,不过当内存缓存池 TransientStorePool 可用时,消息会先提交到TransientStorePool中的WriteBuffer内部,再提交到MappedFile的FileChannel中,此时异步刷盘服务就是CommitRealTimeService。下面看一下 FlushRealTimeService :


而在启用了暂存池的情况下使用的是 CommitRealTimeService 进行刷盘:
