RocketMQ因为有高可靠性的要求(宕机不丢失数据),所以数据要进行持久化存储。所以RocketMQ采用文件进行存储。
RocketMQ的数据默认存储在${user.home}/store目录下,可以通过修改broker.conf中的参数storePathRootDir的值进行设置。
物理目录结构大致如下:
├── abort
├── checkpoint
├── commitlog
│ ├── 00000000000000000000
│ └── 00000000001073741824
├── config
│ ├── consumerFilter.json
│ ├── consumerFilter.json.bak
│ ├── consumerOffset.json
│ ├── consumerOffset.json.bak
│ ├── delayOffset.json
│ ├── delayOffset.json.bak
│ ├── subscriptionGroup.json
│ ├── topics.json
│ └── topics.json.bak
├── consumequeue
│ ├── TopicTest
│ │ ├── 0
│ │ │ └── 00000000000000000000
│ │ ├── 1
│ │ │ └── 00000000000000000000
│ │ ├── 2
│ │ │ └── 00000000000000000000
│ │ └── 3
│ │ └── 00000000000000000000
│ └── TopicTest1
│ ├── 0
│ │ └── 00000000000000000000
│ └── 1
│ └── 00000000000000000000
├── index
│ └── 20220706092335158
└── lock
目录结构说明:

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

CommitLog以物理文件的方式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的, RocketMQ采取一些机制,尽量向CommitLog中顺序写,但是随机读。commitlog文件默认大小为lG ,可通过在broker配置文件中设置mappedFileSizeCommitLog属性来改变默认大小。
CommitLog作为消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。
单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
每个Rocket实例只会往一个commitlog文件中写,写完一个接着写下一个。indexFile和ComsumerQueue中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个CommitLog文件上。
CommitLog目前存储的MappedFile文件有两种内容类型:
| 第几位 | 字段 | 字节数 | 数据类型 | 说明 |
|---|---|---|---|---|
| 1 | MsgLen | 4 | Int | 消息总长度 |
| 2 | MagicCode | 4 | Int | MESSAGE_MAGIC_CODE |
| 3 | BodyCRC | 4 | Int | 消息内容CRC |
| 4 | QueueId | 4 | Int | 消息队列编号 |
| 5 | Flag | 4 | Int | flag |
| 6 | QueueOffset | 8 | Long | 消息队列位置 |
| 7 | PhysicalOffset | 8 | Long | 物理位置。在 CommitLog 的顺序存储位置。 |
| 8 | SysFlag | 4 | Int | MessageSysFlag |
| 9 | BornTimestamp | 8 | Long | 生成消息时间戳 |
| 10 | BornHost | 8 | Long | 生效消息的地址+端口 |
| 11 | StoreTimestamp | 8 | Long | 存储消息时间戳 |
| 12 | StoreHost | 8 | Long | 存储消息的地址+端口 |
| 13 | ReconsumeTimes | 4 | Int | 重新消费消息次数 |
| 14 | PreparedTransationOffset | 8 | Long | |
| 15 | BodyLength + Body | 4 + bodyLength | Int + Bytes | 内容长度 + 内容 |
| 16 | TopicLength + Topic | 1 + topicLength | Byte + Bytes | Topic长度 + Topic |
| 17 | PropertiesLength + Properties | 2 + PropertiesLength | Short + Bytes | 拓展字段长度 + 拓展字段 |
| 第几位 | 字段 | 字节数 | 数据类型 | 说明 |
|---|---|---|---|---|
| 1 | maxBlank | 4 | Int | 空白长度 |
| 2 | MagicCode | 4 | Int | BLANK_MAGIC_CODE |
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的 ConsumeQueue文件。
为了加速ConsumeQueue消息条目的检索速度与节省磁盘空间,每一个 Consumequeue条目不会存储消息的全量信息,存储结构如下:
| 第几位 | 字段 | 字节数 | 数据类型 | 说明 |
|---|---|---|---|---|
| 1 | offset | 8 | Long | 在commitLog中的偏移量 |
| 2 | size | 4 | Int | 消息的大小 |
| 3 | tagsCode | 8 | Long | tag标签 |
ConsumeQueue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件CommitLog上的位置。
ConsumeQueue对应一个topic的一个队列结构,由topic和queueId可以唯一创建一个ConsumeQueue结构,同样ConsumeQueue包含一个MappedFileQueue结构,而MappedFileQueue结构由多个MappedFile文件组成,每个文件的大小为30000020(300000 ConsumeQueue.CQ_STORE_UNIT_SIZE),其中20是queue中每个存储单元的大小。
consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:${user.home}/store/consumequeue/{topic}/{queueId}/{fileName}。
ConsumeQueue即为CommitLog文件的索引文件, 其构建机制是当消息到达Commitlog文件后 由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与索引文件(Index File)。
存储机制这样设计有以下几个好处:
RocketMQ还支持通过MessageID或者MessageKey来查询消息;使用ID查询时,因为ID就是用broker+offset生成的(这里msgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。但是对于用MessageKey来查询消息,RocketMQ则通过构建一个index来提高读取速度。
index存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息检索消息的速度 ,使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。
IndexFile 也是定长的,从单个文件的数据结构来说,这是实现了一种简单原生的哈希拉链机制。当一条新的消息索引进来时,首先使用 hash 算法命中黄色部分 500w 个 slot 中的一个,如果存在冲突就使用拉链解决,将最新索引数据的 next 指向上一条索引位置。同时将消息的索引数据 append 至文件尾部(绿色部分),这样便形成了一条当前 slot 按照时间存入的倒序的链表。
| 第几位 | 字段 | 字节数 | 数据类型 | 说明 |
|---|---|---|---|---|
| 1 | beginTimestamp | 8 | Long | 开始插入时间 |
| 2 | endTimestamp | 8 | Long | 最后插入时间 |
| 3 | beginPhyOffset | 8 | Long | 第一个索引对应CommitLog的偏移量 |
| 4 | endPhyOffset | 8 | Long | 最后一个索引对应CommitLog的偏移量 |
| 5 | hashSlotCount | 4 | Integer | 槽位使用数 |
| 6 | indexCount | 4 | Integer | 索引总数 |
| 第几位 | 字段 | 字节数 | 数据类型 | 说明 |
|---|---|---|---|---|
| 1 | absSlotPos | 4 | int | 索引在Content中的位置 |
| 第几位 | 字段 | 字节数 | 数据类型 | 说明 |
|---|---|---|---|---|
| 1 | keyHash | 4 | Int | key的hashcode |
| 2 | phyOffset | 8 | Long | CommitLog中的偏移量 |
| 3 | timeDiff | 8 | Long | 消息的延迟时间 |
| 4 | indexCount | 8 | Long | 上一次槽内的indexCount |
config文件夹中存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。
由于RocketMQ操作CommitLog,ConsumeQueue文件是基于内存映射机制并在启动的时候会加载CommitLog,ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。
删除过程分别执行清理消息存储文件(Commitlog)与消息消费队列文件(ConsumeQueue文件),消息消费队列文件与消息存储文件共用一套过期文件机制。
RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时(不同版本的默认值不同,这里以4.9.4为例),通过在Broker配置文件中设置 fileReservedTime来改变过期时间,单位为小时。
触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s执行一次。
文件删除主要是由这个配置属性:fileReservedTime,文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除。
另外还有其他两个配置参数:
另外还有RocketMQ的磁盘配置参数: