• Rocket MQ Crash-Safe机制浅析


    截止目前,Rocket MQ的官方文档尚未明确表示自身具备的Crash-Safe的能力。所以这个概念是笔者根据自身理解提出的。指明这个事实是因为想象到了有的读者如若遇到“咬文嚼字”老学究式的交流对象可能不太愿意认可官方之外的内容。那我们只需要清楚背后的实现原理,设计思想即可,不必过分较真概念与名词。

    Crash-Safe

          MySQL中Crash-Safe能力是指宕机、异常重启之后提供的强数据一致性保证:

    • 所有已经提交的事务的数据仍然存在
    • 所有没有提交的事务的数据能够回滚

          Rocket MQ中也有类似的机制保证数据的一致性:

    • CommitLog已持久化的数据ConsumeQueue、IndexFile中的索引数据仍然有效
    • CommitLog未持久化的数据ConsumeQueue、IndexFile中的索引数据必须销毁

          其实这个很好理解,ConsumeQueue、IndexFile相当于都是索引文件,并不负责存储任何消息的具体数据。所以即使索引文件已经构建完备,但是如果没有CommitLog的加持也将一文不值。

          Crash-Safe与存储息息相关,Rocket MQ中涉及到持久化的功能绝大部分集中在Broker节点(鲜有例外,但是客观上确实存在,比如广播模式下的消费进度管理)。况且上文提到的三个文件全部都是Broker节点负责管理,因此本文只关注ta。

    正常退出流程

          Broker启动之初已经注册了JVM ShutdownHook,虚拟机停止运行之前会尽可能的走完漫长的BrokerController#shutdown过程。

    1. public class BrokerStartup {
    2. /* 启动入口 */
    3. public static void main(String[] args) {
    4. start(createBrokerController(args));
    5. }
    6. /* 创建BrokerController对象 */
    7. public static BrokerController createBrokerController(String[] args) {
    8. ......
    9. BrokerController controller = new BrokerController(
    10. brokerConfig, nettyServerConfig,
    11. nettyClientConfig, messageStoreConfig
    12. );
    13. ......
    14. /* 注册shutdown函数 */
    15. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
    16. private volatile boolean hasShutdown = false;
    17. private final AtomicInteger shutdownTimes = new AtomicInteger(0);
    18. @Override
    19. public void run() {
    20. synchronized (this) {
    21. if (!this.hasShutdown) {
    22. this.hasShutdown = true;
    23. controller.shutdown();
    24. }
    25. }
    26. }
    27. }, "ShutdownHook")
    28. );
    29. return controller;
    30. }
    31. }
    32. 复制代码

          Broker shutdown时需要关闭的资源非常多,包含但不仅限于多维度数据统计结束、主从高可用数据同步结束、多个文件进行刷盘、持久化消费进度、关闭网络连接、关闭线程资源......笔者只分析跟本文相关的部分。对CommitLog、ConsumeQueue、IndexFile三个文件的善后处理,全部集中在DefaultMessageStore#shutdown,同样该方法也很繁琐,笔者截取部分关键代码。

    1. public void shutdown() {
    2. ......
    3. if (!this.shutdown) {
    4. this.indexService.shutdown();
    5. this.commitLog.shutdown();
    6. this.reputMessageService.shutdown();
    7. this.flushConsumeQueueService.shutdown();
    8. }
    9. ......
    10. }
    11. 复制代码

          退出之前Rocket MQ会将三个文件,尽可能的全部刷盘持久化。笔者认为除非存在代码缺陷,否则可以认为这三个关键文件是完整的、可以信任的。日后重启,获得FileChannel而后map进内存即可,不需要复杂的恢复逻辑。但Rocket MQ面对正常退出的情景,依然会有check过程。

    abort

          虽然都会进行文件修复工作,但是两者逻辑有较大区别,随之而来的问题是我们如何来断定当时服务所处的情形呢?总不会每次重启都要人工判断吧,既不快捷,也不可靠。

          作为一个高效、自洽的中间件,Rocket MQ坚持最小化外部依赖,所以当时的状态绝对不可以保存在第三方应用中。假如你是Rocket MQ的作者你会怎么设计呢?本来Broker就可以看作是一个优秀的文件管理系统,难道不能设计一个文件来保存当时的状态吗。 实不相瞒Rocket MQ真的就是这么做的。只不过他不在文件中记录信息,而是以abort文件是否存在来辨别。

          保存日志数据的根目录:

    1. public class MessageStoreConfig {
    2. /* The root directory in which the log data is kept */
    3. @ImportantField
    4. private String storePathRootDir =
    5. System.getProperty("user.home")
    6. + File.separator
    7. + "store";
    8. }
    9. 复制代码

          根据Rocket MQ自身约定,storePathRootDir下入如果存在名为"abort"的文件,则说明异常退出,该判断逻辑会在Broker服务初始化DefaultMessageStore#load这一环节触发。

    1. public class DefaultMessageStore implements MessageStore {
    2. /* 检查abort文件是否存在 */
    3. private boolean isTempFileExist() {
    4. String fileName = StorePathConfigHelper
    5. .getAbortFile(
    6. this.messageStoreConfig.getStorePathRootDir()
    7. );
    8. File file = new File(fileName);
    9. return file.exists();
    10. }
    11. }
    12. public class StorePathConfigHelper {
    13. /* 获取abort文件路径,通过根目录与文件名拼接而成 */
    14. public static String getAbortFile(String rootDir) {
    15. return rootDir + File.separator + "abort";
    16. }
    17. }
    18. 复制代码

          那么abort文件是什么时候删除的呢?其实DefaultMessageStore#shutdown中如果符合条件,会删除这个文件,只不过刚刚害怕剧透,笔者没有展示。

    1. public void shutdown() {
    2. ......
    3. if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
    4. this.deleteFile(
    5. StorePathConfigHelper.getAbortFile(
    6. this.messageStoreConfig.getStorePathRootDir()
    7. )
    8. );
    9. shutDownNormal = true;
    10. } else {
    11. log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
    12. }
    13. ......
    14. }
    15. 复制代码

          经过上面isTempFileExist方法的检查,已经可以明确得知上次服务退出的状况。如果上一次是正常退出,意味着此时abort文件是不存在的,下一次重启的时候就没有办法确定是否正常退出了,因此启动之初我们肯定要创建这个文件,确实如我们所料,在Broker启动之时会触发此逻辑。

    1. private void createTempFile() throws IOException {
    2. String fileName = StorePathConfigHelper.getAbortFile(
    3. this.messageStoreConfig.getStorePathRootDir()
    4. );
    5. File file = new File(fileName);
    6. MappedFile.ensureDirOK(file.getParent());
    7. /* 上一次是异常退出,则文件已经存在,不用重新创建 */
    8. boolean result = file.createNewFile();
    9. log.info(fileName + (result ? " create OK" : " already exists"));
    10. }
    11. 复制代码

    CheckPoint

          前面提到的一切都无法解决异常退出时候的问题,完全有可能三个重要文件的刷盘进度都不一致,那我应该怎么确定可信范围呢?不可能我将文件全部丢弃,那样损失的消息实在太多。任何业务系统都不允许这么沉重的代价。所以Rocket MQ也引入了CheckPoint机制,尽可能的减少消息丢失。

          不同之处在于Rocket MQ并没有MySQL那么复杂的机制,而且CheckPoint是以文件的形式存在,里面包含多种文件的最后刷盘点。但是有一点是高度相似的,即check point之前的数据是一定可信的。

    1. public class StorePathConfigHelper {
    2. /**
    3. * 获取checkpoint的文件路径
    4. *
    5. * 存储内容:
    6. * CommitLog文件最后一次刷盘时间戳、
    7. * ConsumerQueue最后一次刷盘时间戳、
    8. * IndexFile索引最后一次刷盘时间戳
    9. *
    10. * @see DefaultMessageStore#load
    11. * @see DefaultMessageStore#destroy
    12. *
    13. * @param rootDir ${ROCKET_HOME}/store
    14. */
    15. public static String getStoreCheckpoint(String rootDir) {
    16. return rootDir + File.separator + "checkpoint";
    17. }
    18. }
    19. 复制代码

          与abort文件相同,checkpoint文件也是DefaultMessageStore#load之时初始化,二者所在的目录也相同,不同的是abort的价值仅仅在于存在与否,checkpoint则需要时时刻刻准备同步刷盘点。为了性能考虑,该文件也使用了内存映射技术。

    1. public static final int OS_PAGE_SIZE = 1024 * 4;
    2. public StoreCheckpoint(String scpPath) throws IOException {
    3. File file = new File(scpPath);
    4. MappedFile.ensureDirOK(file.getParent());
    5. boolean fileExists = file.exists();
    6. /* "rw"标记,表示文件权限可读、可写 */
    7. this.randomAccessFile = new RandomAccessFile(file, "rw");
    8. this.fileChannel = this.randomAccessFile.getChannel();
    9. /* 内存映射 */
    10. this.mappedByteBuffer = fileChannel.map(
    11. MapMode.READ_WRITE,
    12. 0,
    13. MappedFile.OS_PAGE_SIZE
    14. );
    15. if (fileExists) {
    16. log.info("store checkpoint file exists, " + scpPath);
    17. /* !!!!!! */
    18. this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
    19. this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
    20. this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
    21. /* !!!!!! */
    22. } else {
    23. log.info("store checkpoint file not exists, " + scpPath);
    24. }
    25. }
    26. 复制代码

          着重标出三行代码,每行代码都表示获取一个long型整数,分别取自文件中的0-7、8-15、16-23字节。也就是说虽然消耗了4kb的硬盘空间,4kb的内存空间,但是该文件发挥作用的只有前24byte。

    • physicMsgTimestamp: CommitLog文件最后刷盘时间点
    • logicsMsgTimestamp:  ConsumeQueue文件最后刷盘时间点
    • indexMsgTimestamp :  IndexFile文件最后刷盘时间点

          每当单个文件刷盘之后就会更新该文件,但是此时只是在内存中维护进度,并没有持久化。       只有调用StoreCheckpoint#flush方法才会持久化到磁盘。IndexFile、ConsumeQueue刷盘时都会触发。

    1. public void flush() {
    2. this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
    3. this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
    4. this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
    5. this.mappedByteBuffer.force();
    6. }
    7. 复制代码

    文件恢复

          是否正常退出将直接决定文件恢复的方式。

    1. private void recover(boolean lastExitOK) {
    2. long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    3. /* abort文件已经不存在 */
    4. if (lastExitOK) {
    5. /* 正常终止 */
    6. this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    7. } else {
    8. /* 异常终止 */
    9. this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    10. }
    11. this.recoverTopicQueueTable();
    12. }
    13. 复制代码

          但是无论何种方式,都会使用maxPhyOffsetOfConsumeQueue这个关键信息。

    1. private long recoverConsumeQueue() {
    2. long maxPhysicOffset = -1;
    3. for (ConcurrentMap maps : this.consumeQueueTable.values()) {
    4. for (ConsumeQueue logic : maps.values()) {
    5. logic.recover();
    6. if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
    7. maxPhysicOffset = logic.getMaxPhysicOffset();
    8. }
    9. }
    10. }
    11. return maxPhysicOffset;
    12. }
    13. 复制代码

          consumeQueueTable是一个二维的Map结构保留了当前Broker下所有的Queue信息。ConcurrentMap>这么一表示应该就很好理解了。这个方法就是遍历所有的ConsumeQueue文件,拿到最大的偏移量。也就是说maxPhysicOffset偏移量之前的消息都已被构建索引。

    正常退出

          正常退出时的恢复过程

    • 获取checkCRCOnRecover配置决定消息要不要进行CRC32检查,开启的话会有部分性能损失,建议开启
    • 判断映射文件的数量,如果 > 3,则从倒数第三个文件开始恢复,如果不足三个则从正数第一个开始
    • 循环处理文件中的每一条消息,如果消息验证通过则将验证的偏移量往上累加,同时该文件的读指针也往后推动
    • 处理完文件如果发现CommitLog实际落盘的偏移量 < maxPhysicOffset,则将索引文件中多余的信息清除掉
    1. public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    2. boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    3. List mappedFiles = this.mappedFileQueue.getMappedFiles();
    4. if (!mappedFiles.isEmpty()) {
    5. /* 重启时从倒数第三个文件开始恢复 */
    6. int index = mappedFiles.size() - 3;
    7. /* 不足三个则从第一个开始 */
    8. if (index < 0) index = 0;
    9. MappedFile mappedFile = mappedFiles.get(index);
    10. /* byteBuffer与mappedFile内部的mappedByteBuffer是同一份内存,只是独立维护了读指针 */
    11. ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
    12. /* 已确认的物理偏移量,即当前文件起始偏移量 */
    13. long processOffset = mappedFile.getFileFromOffset();
    14. /* 当前文件已经校验通过的offset */
    15. long mappedFileOffset = 0;
    16. while (true) {
    17. DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
    18. int size = dispatchRequest.getMsgSize();
    19. /* Normal data */
    20. if (dispatchRequest.isSuccess() && size > 0) {
    21. mappedFileOffset += size;
    22. } else if (dispatchRequest.isSuccess() && size == 0) {
    23. /* 说明该文件已经检验完毕 */
    24. index++;
    25. if (index >= mappedFiles.size()) {
    26. /* Current branch can not happen */
    27. break;
    28. } else {
    29. /* 还有其余的文件需要校验 */
    30. mappedFile = mappedFiles.get(index);
    31. byteBuffer = mappedFile.sliceByteBuffer();
    32. processOffset = mappedFile.getFileFromOffset();
    33. mappedFileOffset = 0;
    34. }
    35. }
    36. /* Intermediate file read error */
    37. else if (!dispatchRequest.isSuccess()) {
    38. log.info("recover physics file end, " + mappedFile.getFileName());
    39. break;
    40. }
    41. }
    42. processOffset += mappedFileOffset;
    43. /* 更新当前刷盘指针,表示该指针之前的所有数据已经持久化到磁盘 */
    44. this.mappedFileQueue.setFlushedWhere(processOffset);
    45. /* 当前数据提交指针,内存中ByteBuffer当前的写指针 */
    46. this.mappedFileQueue.setCommittedWhere(processOffset);
    47. /* 截断CommitLog脏文件 */
    48. this.mappedFileQueue.truncateDirtyFiles(processOffset);
    49. /* Clear ConsumeQueue redundant data */
    50. if (maxPhyOffsetOfConsumeQueue >= processOffset) {
    51. /* 截断ConsumeQueue脏文件 */
    52. this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
    53. }
    54. } else {
    55. /* CommitLog case files are deleted */
    56. log.warn("The commitlog files are deleted, and delete the consume queue files");
    57. this.mappedFileQueue.setFlushedWhere(0);
    58. this.mappedFileQueue.setCommittedWhere(0);
    59. this.defaultMessageStore.destroyLogics();
    60. }
    61. }
    62. 复制代码

          其实根据我对Rocket MQ的理解,因为ConsumeQueue、IndexFile信息滞后于CommitLog,本质是ReputMessageService线程异步构建的,故而这两者中的索引信息记录的偏移量小于processOffset是可以理解的,但是我不太理解为什么会出现processOffset < maxPhyOffsetOfConsumeQueue的情况,就是索引信息反而跑到了元信息之前。

    异常宕机

          异常宕机时的恢复过程有一些不同

    • 获取checkCRCOnRecover配置决定消息要不要进行CRC32检查,开启的话会有部分性能损失,建议开启
    • 从最后一个文件开始扫描,直到找到合适的文件,如果没有一个符合条件则选中正数第一个,参见CommitLog#isMappedFileMatchedRecover方法
      • 校验文件的正数第4-7字节是否等于CommitLog#MESSAGE_MAGIC_CODE
      • 根据Flag判断读取第48 + 8或者48 + 20往后的两个字节获得该消息的存储时间戳是否为零
      • 检查messageIndexEnable、messageIndexSafe配置项如果都为true,则取三个文件中最早的刷盘时间点,默认只选取CommitLog、ConsumeQueue两个文件参与计算(checkpoint文件中的时间)
      • 比较当前文件第一条消息的落盘时间如果小于上述时间,则说明可以从该文件开始修复数据。如果大于则选择上一个文件。
    • 经历上述步骤,如果能找到符合要求的文件,则遍历该文件,检查消息是否合法,合法的消息将重新分发给ConsumeQueue、IndexFile文件建立索引。
    • 如果没有找到则设置MappedFileQueue的flushedWhere、committedWhere指针都为零,删除ConsumeQueue文件

          在这期间同一个消息完全可能被构造了两次索引,但是Rocket MQ本来就不保证只投递一次,瑕不掩瑜。

    总结

          如果你愿意牺牲部分性能选择同步发送,同步刷盘,那么只要返回成功,那消息一定是已经持久化,这个无论是否正常退出,你都不用担心消息丢失。

          如果不是上述配置,Rocket MQ本来就不保证不丢失消息,在这基础之上根据checkpoint尽可能的帮助你挽回损失,能保证checkpoint之前的消息一定不会丢失可以说是具备Crash-Safe能力。


     

  • 相关阅读:
    vue 3.0 常用API 的介绍
    (ELK安装part1)Elasticsearch8.4.0集群安装
    Python期末复习题:文件
    2023云栖大会,Salesforce终敲开中国CRM市场
    深入React源码揭开渲染更新流程的面纱
    Spring中事务的传播机制以及REQUIRED、REQUIRES_NEW、NESTED区别以及代码演示
    Greenplum数据库故障排查及修复
    35岁程序员危机,有何破解之法
    力扣记录:动态规划4股票问题——121,122,123,188 ,309,714买卖股票的最佳时机(I,II,III,IV,含冷冻期,含手续费)
    【Matplotlib绘制图像大全】(二十八):Matplotlib使用imshow()可视化图像
  • 原文地址:https://blog.csdn.net/Trouvailless/article/details/126190038