• RocketMQ源码分析(十三)之ConsumeQueue


    版本

    1. 基于rocketmq-all-4.3.1版本

    ConsumeQueue

    1. 每个ConsumeQueue都有一个queueId,queueId 的值为0到TopicConfig配置的队列数量。比如某个Topic的消费队列数量为4,那么四个ConsumeQueue的queueId就分别为0、1、2、3。

    2. ConsumerQueue相当于CommitLog的索引文件,消费者消费时会先从ConsumerQueue中查找消息在CommitLog中的offset,再去CommitLog中找原始消息数据。如果某个消息只在CommitLog中有数据,没在ConsumerQueue中, 则消费者无法消费

    3. Consumequeue类对应的是每个topic和queuId下面的所有文件.默认存储路径是**$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由30w条数据组成,单个文件的大小是30w x 20Byte**

    4. 每一个ConsumeQueue存储的格式为commitLogOffset(8B)+size(4B)+tagHashCode(8B),总共20B。存tag是为了在Consumer取到消息offset后先根据tag做一次过滤,剩下的才需要到CommitLog中取消息详情

    5. ConsumeQueue核心属性

      public class ConsumeQueue {
         private final DefaultMessageStore defaultMessageStore;
         //映射文件队列,管理ConsumeQueue
         private final MappedFileQueue mappedFileQueue;
         // 消息topic
         private final String topic;
         // 消息队列Id
         private final int queueId;
         //指定大小的缓冲,记录的大小是20byte的固定大小
         private final ByteBuffer byteBufferIndex;
         //保存的路径
         private final String storePath;
         //映射文件的大小
         private final int mappedFileSize;
         //最后一个消息对应的物理偏移量  也就是在CommitLog中的偏移量
         private long maxPhysicOffset = -1;
         //最小的逻辑偏移量 在ConsumeQueue中的最小偏移量
         private volatile long minLogicOffset = 0;
         //ConsumeQueue的扩展文件
         private ConsumeQueueExt consumeQueueExt = null;
      }   
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    6. 构造方法

      public ConsumeQueue(
          final String topic,
          final int queueId,
          final String storePath,
          final int mappedFileSize,
          final DefaultMessageStore defaultMessageStore) {
          this.storePath = storePath;
          this.mappedFileSize = mappedFileSize;
          this.defaultMessageStore = defaultMessageStore;
      
          this.topic = topic;
          this.queueId = queueId;
          //存储路径${this.storePath}/{topic}/{queueId}/{fileName}
          String queueDir = this.storePath
              + File.separator + topic
              + File.separator + queueId;
      
          this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
          //分配一个存储单元大小(20B)的缓冲区
          this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
          //是否启用消息队列的扩展存储
          if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
              this.consumeQueueExt = new ConsumeQueueExt(
                  topic,
                  queueId,
                  StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
                  defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
                  defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
              );
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31

    加载(load)

    1. Broker启动时,调用load加载ConsumeQueue。加载直接委托mappedFileQueue进行加载

      public boolean load() {
          boolean result = this.mappedFileQueue.load();
          log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
          //扩展存储存在则加载
          if (isExtReadEnable()) {
              result &= this.consumeQueueExt.load();
          }
          return result;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    恢复(recover)

    1. Broker启动时会尝试恢复ConsumeQueue文件。

      • 如果文件个数大于3个就从倒数第三个文件开始恢复,否则从第一个开始
      • 循环遍历文件中的的所有数据,按照20个字节读取。知道全部读取完成
      • 删除有效offset之后的文件
    2. 源码

      public void recover() {
          final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
          if (!mappedFiles.isEmpty()) {
              //如果文件列表大于3就从倒数第3个开始,否则从第一个开始
              int index = mappedFiles.size() - 3;
              if (index < 0)
                  index = 0;
              //获取consumeQueue单个文件的大小
              int mappedFileSizeLogics = this.mappedFileSize;
              //获取index对应的映射文件
              MappedFile mappedFile = mappedFiles.get(index);
              ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
              //映射文件的起始偏移量(也是文件名)
              long processOffset = mappedFile.getFileFromOffset();
              long mappedFileOffset = 0;
              long maxExtAddr = 1;
              while (true) {
                  //遍历文件中的所有数据(20个字节一次遍历)
                  for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                      long offset = byteBuffer.getLong();
                      int size = byteBuffer.getInt();
                      long tagsCode = byteBuffer.getLong();
                      //顺序解析,每个数据单元隔20个字节,如果offset跟size大于0则表示有效
                      if (offset >= 0 && size > 0) {
                          //正常数据的大小
                          mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                          //设置最大的物理偏移量
                          this.maxPhysicOffset = offset;
                          if (isExtAddr(tagsCode)) {
                              maxExtAddr = tagsCode;
                          }
                      } else {
                          log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                              + offset + " " + size + " " + tagsCode);
                          break;
                      }
                  }
                  //文件加载完毕
                  if (mappedFileOffset == mappedFileSizeLogics) {
                      index++;
                      // 完成加载跳出循环
                      if (index >= mappedFiles.size()) {
      
                          log.info("recover last consume queue file over, last mapped file "
                              + mappedFile.getFileName());
                          break;
                      } else {
                          //下一个文件,继续循环读取
                          mappedFile = mappedFiles.get(index);
                          byteBuffer = mappedFile.sliceByteBuffer();
                          processOffset = mappedFile.getFileFromOffset();
                          mappedFileOffset = 0;
                          log.info("recover next consume queue file, " + mappedFile.getFileName());
                      }
                  } else {
                      log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                          + (processOffset + mappedFileOffset));
                      break;
                  }
              }
              // 最后一个文件的起始偏移量+正常数据的长度
              processOffset += mappedFileOffset;
              //设置flush和commit偏移量位置
              this.mappedFileQueue.setFlushedWhere(processOffset);
              this.mappedFileQueue.setCommittedWhere(processOffset);
              //删除有效的 offset 之后的文件(后面的是无效的,需要删除掉)
              this.mappedFileQueue.truncateDirtyFiles(processOffset);
              //如果有扩展文件,则恢复扩展文件
              if (isExtReadEnable()) {
                  this.consumeQueueExt.recover();
                  log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                  this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75

    追加分发的消息(putMessagePositionInfoWrapper)

    1. 分发的消息通过putMessagePositionInfoWrapper方法追加

      • 判断消息队列是否可写,是否开启写ConsumeQueue扩展文件默认false
      • 将消息写入缓存区MappedFile中
      • 如果追加成功,则更新checkpoint的时间戳
    2. putMessagePositionInfoWrapper源码

      public void putMessagePositionInfoWrapper(DispatchRequest request) {
          final int maxRetries = 30;
          //判断ConsumeQueue是否可写
          boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
          for (int i = 0; i < maxRetries && canWrite; i++) {
              long tagsCode = request.getTagsCode();
              //是否开启写ConsumeQueue扩展文件,默认false
              //bloom过滤器先记录消息的bitMap,这样consumer来读取消息时先通过bloom过滤器判断是否有符合过滤条件的消息
              if (isExtWriteEnable()) {
                  ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                  cqExtUnit.setFilterBitMap(request.getBitMap());
                  cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                  cqExtUnit.setTagsCode(request.getTagsCode());
      
                  long extAddr = this.consumeQueueExt.put(cqExtUnit);
                  if (isExtAddr(extAddr)) {
                      tagsCode = extAddr;
                  } else {
                      log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                          topic, queueId, request.getCommitLogOffset());
                  }
              }
              //写入缓冲区
              boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                  request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
              //如果更新成功,则更新checkpoint文件
              if (result) {
                  this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                  return;
              } else {
                  // XXX: warn and notify me
                  log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                      + " failed, retry " + i + " times");
      
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      log.warn("", e);
                  }
              }
          }
      
          // XXX: warn and notify me
          log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
          this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
      }
      
      private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
          final long cqOffset) {
      
          //maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量
          // 如果消息已经被处理,则直接返回true
          if (offset <= this.maxPhysicOffset) {
              return true;
          }
      
          this.byteBufferIndex.flip();
          //20byte
          this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
          this.byteBufferIndex.putLong(offset);//写入消息偏移量
          this.byteBufferIndex.putInt(size);//写入消息长度
          this.byteBufferIndex.putLong(tagsCode);//写入tag hashcode
          //cqOffset为ConsumerQueue中记录了偏移量总数
          final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
          //获取ConsumeQueue当前对应的MappedFile,ConsumeQueue本身也是通过MappedFileQueue来管理的
          MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
          if (mappedFile != null) {
              //对于新建的文件,填充0来预热PageCache
              if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                  this.minLogicOffset = expectLogicOffset;
                  this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                  this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                  this.fillPreBlank(mappedFile, expectLogicOffset);
                  log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                      + mappedFile.getWrotePosition());
              }
      
              if (cqOffset != 0) {
                  long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
                  //说明已经处理过
                  if (expectLogicOffset < currentLogicOffset) {
                      log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                          expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                      return true;
                  }
      
                  if (expectLogicOffset != currentLogicOffset) {
                      LOG_ERROR.warn(
                          "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                          expectLogicOffset,
                          currentLogicOffset,
                          this.topic,
                          this.queueId,
                          expectLogicOffset - currentLogicOffset
                      );
                  }
              }
              //更新物理偏移量,追加到MappedFile。如果appendMessage追加失败了,等下次继续追加,所以这里可以直接给maxPhysicOffset赋值,不用关心是否追加成功
              this.maxPhysicOffset = offset;
              return mappedFile.appendMessage(this.byteBufferIndex.array());
          }
          return false;
      }
      
      private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
          ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
          byteBuffer.putLong(0L);
          byteBuffer.putInt(Integer.MAX_VALUE);
          byteBuffer.putLong(0L);
      
          int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
          for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
              mappedFile.appendMessage(byteBuffer.array());
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115

    ReputMessageService

    1. ReputMessageService是一个服务线程,用于ConsumeQueue持久化,它是DefaultMessageStore的内部类。此服务线程主要运行doReput方法。每间隔1ms执行一次

      public void run() {
          DefaultMessageStore.log.info(this.getServiceName() + " service started");
      
          while (!this.isStopped()) {
              try {
                  Thread.sleep(1);
                  this.doReput();
              } catch (Exception e) {
                  DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
              }
          }
      
          DefaultMessageStore.log.info(this.getServiceName() + " service end");
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    2. doReput大致步骤

      • 获取CommitLog在reputFromOffset处存储的可以被处理的消息(可能在不同的MappedFile中)
      • 循环遍历这些消息,并将每个消息相关数据转换为DispatchRequest
      • 分发DispatchRequest到CommitLogDispatcher中执行
    3. doReput流程图
      在这里插入图片描述

    CommitLogDispatcher

    1. CommitLogDispatcherCommitLog日志消息分发器,主要用来生成ConsumerQueueIndexFile。此接口有三个实现

      • CommitLogDispatcherBuildConsumeQueue 处理ConsumeQueue的生成
      • CommitLogDispatcherBuildIndex处理IndexFile的生成
      • CommitLogDispatcherCalcBitMap 处理计算bit Map
    2. CommitLogDispatcherBuildConsumeQueue源码。只处理TRANSACTION_NOT_TYPETRANSACTION_COMMIT_TYPE这两种消息。这两种消息一种是非事务消息,即普通消息。一种是事务已经确切提交的消息

      class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
      
          @Override
          public void dispatch(DispatchRequest request) {
              final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
              switch (tranType) {
                  case MessageSysFlag.TRANSACTION_NOT_TYPE:
                  case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                      DefaultMessageStore.this.putMessagePositionInfo(request);
                      break;
                  case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                  case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                      break;
              }
          }
      }
      public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
          //根据主题与队列获取对应的ConsumeQueue
          ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
          cq.putMessagePositionInfoWrapper(dispatchRequest);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
  • 相关阅读:
    初识《时间复杂度和空间复杂度》
    笙默考试管理系统-MyExamTest----codemirror(28)
    js去除字符串空格的几种方式
    ffmpeg安装及使用
    第三章、注册中心-Zookeeper(简介与安装)
    JSON数据
    项目踩坑—跨域问题
    关于 Notion-Like 工具的反思和畅想
    通过easyexcel实现数据导入功能
    React 像 vue 一样配置页面路由,并支持重定向路由,路由守卫等(使用 useRoutes 完成)...
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126394788