• 源码分析RocketMQ之broker-文件恢复


    源码分析RocketMQ之broker-文件恢复
    broker启动时会进行文件恢复检查,通过调用DefaultMessageStore.load
    1、判断 ${ROCKET_HOME}/storepath/abort 文件是否存在,如果文件存在,则返回true,否则返回false,
      abort在DefaultMessageStore 启动时创建,在 shutdown 时删除,也就是如果该文件存在,说明不是正常的关闭
    2、判断是否延迟消息启动,如果是完成延迟消息消费队列消息进度的加载 delayOffset.json,完成
      delayLevelTable 数据结构的构造
    3、MappedFileQueue.load 加载 storePath 路径下所有commit log 文件,初始化时会重置
      wrotePosition、flushedPosition、committedPosition 为mappedFileSize 最大值,在后面流程recover 文件检测恢复时,
      会调用 mappedFileQueue.truncateDirtyFiles重置指针,把mappedFile添加到mappedFiles 中
    4、加载 Consume Queue loadConsumeQueue 获取topic下所有的队列,创建ConsumeQueue,添加到
      ConcurrentMap> consumeQueueTable 中
    5、文件存储检测点 checkpoint, 创建 RandomAccessFile,创建 FileChannel,通过FileChannel map mappedByteBuffer
       读取 physicMsgTimestamp logicsMsgTimestamp indexMsgTimestamp 信息
    6、索引文件加载
       通过storePath ,默认为/rocket_home/store/index 加载索引文件,
       创建IndexFile 构建MappedFile fileChannel  mappedByteBuffer 构建indexHeader
       假如索引文件超过IndexMsgTimestamp文件最大失效时间默认48小时,并最终执行File#delete()方法清除文件
       将IndexFile 加入 indexFileList 集合
    7、文件检测恢复
      1、恢复消息队列,遍历 consumeQueueTable 执行ConsumeQueue.recover
        1、获取该消息队列的所有内存映射文件列表
        2、只从倒数第3个文件开始,这应该是一个经验值
        3、循环验证 consumeque 包含条目的有效性(如果offset大于0并且size大于0,则表示是一个有效的条目)
        4、设置 consumequeue 中有效的 mappedFileOffset ,继续下一个条目的验证,如果发现不正常的条目,则跳出循环
        5、如果该 consumeque 文件中所有条目全部有效,则继续验证下一个文件,
          (index++),如果发现条目不合法,后面的文件不需要再检测
        6、设置 flushedWhere,committedWhere 为当前有效的偏移量
        7、truncateDirtyFiles 截断无效的consumeque文件
           1、如果无效的 offset 小于该文件最大的偏移量,如果 consumequeue 的 offset 大于失效的 offset
             则该文件整个删除,如果否,则设置 wrotePosition,commitedPosition,flushedPoisition 的值即可
      2、如果是正常退出,则按照正常修复;如果是异常退出,则走异常修复逻辑
        1、从最后一个文件开始检测,先找到第一个正常的 commitlog 文件,然后从该文件开始去恢复
           1、isMappedFileMatchedRecover 判断一个文件是否正常,主要是如下几个条件
             1、魔数正确
             2、消息存储时间不为0
             3、存储时间小于等于检测点(checkpoint)
        2、创建转发对象,该方法在消费队列 DispatchRequest 转发给对象,会同步更新consumequeue,index文件
        3、检测到非法的 commitlog 时,停止恢复,然后设置 commitlog 的检测点
        4、然后删除多于的不符合格式的文件
      3、修复主题队列,遍历consumeQueueTable 设置minPhyOffset

  • 相关阅读:
    Double 4 VR智能互动系统在轨道交通实训教学中的应用
    HMS Core音频编辑服务音源分离与空间音频渲染,助力快速进入3D音频的世界
    C++:用函数模板排列数组
    有了Composition API后,有些场景或许你不需要pinia了
    抖音小店无货源蓝海选品分享,月销十万+的玩法,强烈推荐
    @vue/cli3--使用命令创建项目--方法/实例
    Python+Pytest+Request【第一章】接口框架介绍
    linux环境验证c++程序库间调用
    c和c++的反汇编方法
    Elasticsearch 主副分片切换过程中对业务写入有影响吗
  • 原文地址:https://blog.csdn.net/liangshf520/article/details/126553988