截止目前,Rocket MQ的官方文档尚未明确表示自身具备的Crash-Safe的能力。所以这个概念是笔者根据自身理解提出的。指明这个事实是因为想象到了有的读者如若遇到“咬文嚼字”老学究式的交流对象可能不太愿意认可官方之外的内容。那我们只需要清楚背后的实现原理,设计思想即可,不必过分较真概念与名词。
MySQL中Crash-Safe能力是指宕机、异常重启之后提供的强数据一致性保证:
Rocket MQ中也有类似的机制保证数据的一致性:
其实这个很好理解,ConsumeQueue、IndexFile相当于都是索引文件,并不负责存储任何消息的具体数据。所以即使索引文件已经构建完备,但是如果没有CommitLog的加持也将一文不值。
Crash-Safe与存储息息相关,Rocket MQ中涉及到持久化的功能绝大部分集中在Broker节点(鲜有例外,但是客观上确实存在,比如广播模式下的消费进度管理)。况且上文提到的三个文件全部都是Broker节点负责管理,因此本文只关注ta。
Broker启动之初已经注册了JVM ShutdownHook,虚拟机停止运行之前会尽可能的走完漫长的BrokerController#shutdown过程。
- public class BrokerStartup {
-
- /* 启动入口 */
- public static void main(String[] args) {
- start(createBrokerController(args));
- }
-
- /* 创建BrokerController对象 */
- public static BrokerController createBrokerController(String[] args) {
- ......
- BrokerController controller = new BrokerController(
- brokerConfig, nettyServerConfig,
- nettyClientConfig, messageStoreConfig
- );
- ......
-
- /* 注册shutdown函数 */
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
- private volatile boolean hasShutdown = false;
- private final AtomicInteger shutdownTimes = new AtomicInteger(0);
-
- @Override
- public void run() {
- synchronized (this) {
- if (!this.hasShutdown) {
- this.hasShutdown = true;
- controller.shutdown();
- }
- }
- }
- }, "ShutdownHook")
- );
-
- return controller;
- }
-
- }
- 复制代码
Broker shutdown时需要关闭的资源非常多,包含但不仅限于多维度数据统计结束、主从高可用数据同步结束、多个文件进行刷盘、持久化消费进度、关闭网络连接、关闭线程资源......笔者只分析跟本文相关的部分。对CommitLog、ConsumeQueue、IndexFile三个文件的善后处理,全部集中在DefaultMessageStore#shutdown,同样该方法也很繁琐,笔者截取部分关键代码。
- public void shutdown() {
- ......
- if (!this.shutdown) {
- this.indexService.shutdown();
- this.commitLog.shutdown();
- this.reputMessageService.shutdown();
- this.flushConsumeQueueService.shutdown();
- }
- ......
- }
- 复制代码
退出之前Rocket MQ会将三个文件,尽可能的全部刷盘持久化。笔者认为除非存在代码缺陷,否则可以认为这三个关键文件是完整的、可以信任的。日后重启,获得FileChannel而后map进内存即可,不需要复杂的恢复逻辑。但Rocket MQ面对正常退出的情景,依然会有check过程。
虽然都会进行文件修复工作,但是两者逻辑有较大区别,随之而来的问题是我们如何来断定当时服务所处的情形呢?总不会每次重启都要人工判断吧,既不快捷,也不可靠。
作为一个高效、自洽的中间件,Rocket MQ坚持最小化外部依赖,所以当时的状态绝对不可以保存在第三方应用中。假如你是Rocket MQ的作者你会怎么设计呢?本来Broker就可以看作是一个优秀的文件管理系统,难道不能设计一个文件来保存当时的状态吗。 实不相瞒Rocket MQ真的就是这么做的。只不过他不在文件中记录信息,而是以abort文件是否存在来辨别。
保存日志数据的根目录:
- public class MessageStoreConfig {
-
- /* The root directory in which the log data is kept */
- @ImportantField
- private String storePathRootDir =
- System.getProperty("user.home")
- + File.separator
- + "store";
-
- }
- 复制代码

根据Rocket MQ自身约定,storePathRootDir下入如果存在名为"abort"的文件,则说明异常退出,该判断逻辑会在Broker服务初始化DefaultMessageStore#load这一环节触发。
- public class DefaultMessageStore implements MessageStore {
-
- /* 检查abort文件是否存在 */
- private boolean isTempFileExist() {
- String fileName = StorePathConfigHelper
- .getAbortFile(
- this.messageStoreConfig.getStorePathRootDir()
- );
- File file = new File(fileName);
-
- return file.exists();
- }
-
- }
-
-
- public class StorePathConfigHelper {
-
- /* 获取abort文件路径,通过根目录与文件名拼接而成 */
- public static String getAbortFile(String rootDir) {
- return rootDir + File.separator + "abort";
- }
-
- }
- 复制代码
那么abort文件是什么时候删除的呢?其实DefaultMessageStore#shutdown中如果符合条件,会删除这个文件,只不过刚刚害怕剧透,笔者没有展示。
- public void shutdown() {
- ......
- if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
- this.deleteFile(
- StorePathConfigHelper.getAbortFile(
- this.messageStoreConfig.getStorePathRootDir()
- )
- );
-
- shutDownNormal = true;
- } else {
- log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
- }
- ......
- }
- 复制代码
经过上面isTempFileExist方法的检查,已经可以明确得知上次服务退出的状况。如果上一次是正常退出,意味着此时abort文件是不存在的,下一次重启的时候就没有办法确定是否正常退出了,因此启动之初我们肯定要创建这个文件,确实如我们所料,在Broker启动之时会触发此逻辑。
- private void createTempFile() throws IOException {
- String fileName = StorePathConfigHelper.getAbortFile(
- this.messageStoreConfig.getStorePathRootDir()
- );
-
- File file = new File(fileName);
- MappedFile.ensureDirOK(file.getParent());
- /* 上一次是异常退出,则文件已经存在,不用重新创建 */
- boolean result = file.createNewFile();
- log.info(fileName + (result ? " create OK" : " already exists"));
- }
- 复制代码
前面提到的一切都无法解决异常退出时候的问题,完全有可能三个重要文件的刷盘进度都不一致,那我应该怎么确定可信范围呢?不可能我将文件全部丢弃,那样损失的消息实在太多。任何业务系统都不允许这么沉重的代价。所以Rocket MQ也引入了CheckPoint机制,尽可能的减少消息丢失。
不同之处在于Rocket MQ并没有MySQL那么复杂的机制,而且CheckPoint是以文件的形式存在,里面包含多种文件的最后刷盘点。但是有一点是高度相似的,即check point之前的数据是一定可信的。
- public class StorePathConfigHelper {
-
- /**
- * 获取checkpoint的文件路径
- *
- * 存储内容:
- * CommitLog文件最后一次刷盘时间戳、
- * ConsumerQueue最后一次刷盘时间戳、
- * IndexFile索引最后一次刷盘时间戳
- *
- * @see DefaultMessageStore#load
- * @see DefaultMessageStore#destroy
- *
- * @param rootDir ${ROCKET_HOME}/store
- */
- public static String getStoreCheckpoint(String rootDir) {
- return rootDir + File.separator + "checkpoint";
- }
-
- }
- 复制代码
与abort文件相同,checkpoint文件也是DefaultMessageStore#load之时初始化,二者所在的目录也相同,不同的是abort的价值仅仅在于存在与否,checkpoint则需要时时刻刻准备同步刷盘点。为了性能考虑,该文件也使用了内存映射技术。
- public static final int OS_PAGE_SIZE = 1024 * 4;
-
- public StoreCheckpoint(String scpPath) throws IOException {
- File file = new File(scpPath);
- MappedFile.ensureDirOK(file.getParent());
- boolean fileExists = file.exists();
-
- /* "rw"标记,表示文件权限可读、可写 */
- this.randomAccessFile = new RandomAccessFile(file, "rw");
- this.fileChannel = this.randomAccessFile.getChannel();
- /* 内存映射 */
- this.mappedByteBuffer = fileChannel.map(
- MapMode.READ_WRITE,
- 0,
- MappedFile.OS_PAGE_SIZE
- );
-
- if (fileExists) {
- log.info("store checkpoint file exists, " + scpPath);
- /* !!!!!! */
- this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
- this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
- this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
- /* !!!!!! */
- } else {
- log.info("store checkpoint file not exists, " + scpPath);
- }
- }
- 复制代码
着重标出三行代码,每行代码都表示获取一个long型整数,分别取自文件中的0-7、8-15、16-23字节。也就是说虽然消耗了4kb的硬盘空间,4kb的内存空间,但是该文件发挥作用的只有前24byte。

每当单个文件刷盘之后就会更新该文件,但是此时只是在内存中维护进度,并没有持久化。 只有调用StoreCheckpoint#flush方法才会持久化到磁盘。IndexFile、ConsumeQueue刷盘时都会触发。
- public void flush() {
- this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
- this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
- this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
- this.mappedByteBuffer.force();
- }
- 复制代码
是否正常退出将直接决定文件恢复的方式。
- private void recover(boolean lastExitOK) {
- long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
-
- /* abort文件已经不存在 */
- if (lastExitOK) {
- /* 正常终止 */
- this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
- } else {
- /* 异常终止 */
- this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
- }
-
- this.recoverTopicQueueTable();
- }
- 复制代码
但是无论何种方式,都会使用maxPhyOffsetOfConsumeQueue这个关键信息。
- private long recoverConsumeQueue() {
- long maxPhysicOffset = -1;
-
- for (ConcurrentMap
maps : this.consumeQueueTable.values()) { - for (ConsumeQueue logic : maps.values()) {
- logic.recover();
- if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
- maxPhysicOffset = logic.getMaxPhysicOffset();
- }
- }
- }
-
- return maxPhysicOffset;
- }
- 复制代码
consumeQueueTable是一个二维的Map结构保留了当前Broker下所有的Queue信息。ConcurrentMap
正常退出时的恢复过程
- public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
- boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
- List
mappedFiles = this.mappedFileQueue.getMappedFiles(); - if (!mappedFiles.isEmpty()) {
- /* 重启时从倒数第三个文件开始恢复 */
- int index = mappedFiles.size() - 3;
- /* 不足三个则从第一个开始 */
- if (index < 0) index = 0;
-
- MappedFile mappedFile = mappedFiles.get(index);
- /* byteBuffer与mappedFile内部的mappedByteBuffer是同一份内存,只是独立维护了读指针 */
- ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
- /* 已确认的物理偏移量,即当前文件起始偏移量 */
- long processOffset = mappedFile.getFileFromOffset();
- /* 当前文件已经校验通过的offset */
- long mappedFileOffset = 0;
- while (true) {
- DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
- int size = dispatchRequest.getMsgSize();
- /* Normal data */
- if (dispatchRequest.isSuccess() && size > 0) {
- mappedFileOffset += size;
- } else if (dispatchRequest.isSuccess() && size == 0) {
- /* 说明该文件已经检验完毕 */
- index++;
- if (index >= mappedFiles.size()) {
- /* Current branch can not happen */
- break;
- } else {
- /* 还有其余的文件需要校验 */
- mappedFile = mappedFiles.get(index);
- byteBuffer = mappedFile.sliceByteBuffer();
- processOffset = mappedFile.getFileFromOffset();
- mappedFileOffset = 0;
- }
- }
- /* Intermediate file read error */
- else if (!dispatchRequest.isSuccess()) {
- log.info("recover physics file end, " + mappedFile.getFileName());
- break;
- }
- }
-
- processOffset += mappedFileOffset;
- /* 更新当前刷盘指针,表示该指针之前的所有数据已经持久化到磁盘 */
- this.mappedFileQueue.setFlushedWhere(processOffset);
- /* 当前数据提交指针,内存中ByteBuffer当前的写指针 */
- this.mappedFileQueue.setCommittedWhere(processOffset);
- /* 截断CommitLog脏文件 */
- this.mappedFileQueue.truncateDirtyFiles(processOffset);
-
- /* Clear ConsumeQueue redundant data */
- if (maxPhyOffsetOfConsumeQueue >= processOffset) {
- /* 截断ConsumeQueue脏文件 */
- this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
- }
- } else {
- /* CommitLog case files are deleted */
- log.warn("The commitlog files are deleted, and delete the consume queue files");
- this.mappedFileQueue.setFlushedWhere(0);
- this.mappedFileQueue.setCommittedWhere(0);
- this.defaultMessageStore.destroyLogics();
- }
- }
- 复制代码
其实根据我对Rocket MQ的理解,因为ConsumeQueue、IndexFile信息滞后于CommitLog,本质是ReputMessageService线程异步构建的,故而这两者中的索引信息记录的偏移量小于processOffset是可以理解的,但是我不太理解为什么会出现processOffset < maxPhyOffsetOfConsumeQueue的情况,就是索引信息反而跑到了元信息之前。
异常宕机时的恢复过程有一些不同
在这期间同一个消息完全可能被构造了两次索引,但是Rocket MQ本来就不保证只投递一次,瑕不掩瑜。
如果你愿意牺牲部分性能选择同步发送,同步刷盘,那么只要返回成功,那消息一定是已经持久化,这个无论是否正常退出,你都不用担心消息丢失。
如果不是上述配置,Rocket MQ本来就不保证不丢失消息,在这基础之上根据checkpoint尽可能的帮助你挽回损失,能保证checkpoint之前的消息一定不会丢失可以说是具备Crash-Safe能力。