• RocketMQ源码分析(十二)之CommitLog同步与异步刷盘


    版本

    1. 基于rocketmq-all-4.3.1版本

    简介

    1. RocketMQ消息存储是首先将消息追加到内存中,然后根据刷盘策略在不同时间刷盘。

      • 同步刷盘,消息追加到内存,调用**MappedByteBuffer.force()**方法实现刷盘
      • 异步刷盘,消息追加到内存后,立即返回给Producer。使用单独的异步线程按照一定的频率执行刷盘操作
    2. Index文件的刷盘并不是采取定时刷盘机制,而是每更新一次Index文件就会将上一次的改动写入磁盘

    3. 刷盘代码CommitLog#handleDiskFlush,可以看到同步刷盘由GroupCommitService完成

      public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
          //刷盘策略,同步刷盘阻塞等待,异步刷盘唤醒commitLogService
          // Synchronization flush
          if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
              final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
              if (messageExt.isWaitStoreMsgOK()) {
                  //构建刷盘请求放入GroupCommitService队列中(List中)
                  GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                  service.putRequest(request);
                  //GroupCommitService线程在broker启动时会启动,阻塞,等待线程刷盘完成,默认超时时间5s,如果超时返回false
                  //即如果超时,响应给生产者的是FLUSH_DISK_TIMEOUT
                  boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                  if (!flushOK) {
                      log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                          + " client address: " + messageExt.getBornHostString());
                      putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                  }
              } else {
                  service.wakeup();
              }
          }
          // Asynchronous flush
          else {
              if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                  //使用MappedByteBuffer,默认策略
                  flushCommitLogService.wakeup();
              } else {
                  //异步刷盘,如果开启TransientStorePool,使用写入缓冲区+FileChannel
                  commitLogService.wakeup();
              }
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33

    FlushCommitLogService

    1. UML图 在这里插入图片描述

    2. 实现类

      • CommitRealTimeService:异步刷盘并且transientStorePoolEnable设置为true
      • FlushRealTimeService:异步刷盘并且transientStorePoolEnable设置为false
      • GroupCommitService:同步刷盘
    3. FlushCommitLogService没有任何实现,只是定义了一个常量

      abstract class FlushCommitLogService extends ServiceThread {
          protected static final int RETRY_TIMES_OVER = 10;
      }
      
      • 1
      • 2
      • 3

    同步刷盘

    1. 同步刷盘指的是在消息追加到内存映射文件(MappedByteBuffer)的内存中后,立即将数据从内存写入磁盘文件(MappedByteBuffer.force()

    GroupCommitService

    1. 同步刷盘由GroupCommitService完成

      • 第一步:构建刷盘请求对象GroupCommitRequest,并将对象添加到requestsWrite队列中
      • 第二步:默认等待5s,如果返回false,响应给生产者的是FLUSH_DISK_TIMEOUT
    2. GroupCommitService有一个写队列和一个读队列,即将请求和刷盘进行读写分离。请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复

      在这里插入图片描述

    3. GroupCommitRequest

      public static class GroupCommitRequest {
          private final long nextOffset;
          private final CountDownLatch countDownLatch = new CountDownLatch(1);
          //刷盘结果
          private volatile boolean flushOK = false;
      
          public GroupCommitRequest(long nextOffset) {
              this.nextOffset = nextOffset;
          }
      
          public long getNextOffset() {
              return nextOffset;
          }
      
          //唤醒阻塞等待的线程
          //FIXME by jannal 此处有并发问题,this.flushOK = flushOK不是原子操作。正常需要加同步
          //由于只有一个线程操作,所以即使不是原子性也问题不大
          public void wakeupCustomer(final boolean flushOK) {
              this.flushOK = flushOK;
              this.countDownLatch.countDown();
          }
          //等待刷盘
          public boolean waitForFlush(long timeout) {
              try {
                  this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                  return this.flushOK;
              } catch (InterruptedException e) {
                  log.error("Interrupted", e);
                  return false;
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
    4. GroupCommitService源码分析

      class GroupCommitService extends FlushCommitLogService {
          //读写容器
          private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
          private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
      
          public synchronized void putRequest(final GroupCommitRequest request) {
              //FIXME by jannal 思考:既然方法已经加锁,为什么此处需要再次加锁?
              //swapRequests可能在其他线程并发执行,所以需要给requestsWrite单独加锁
              //swapRequests导致requestsWrite的引用变化,会不会出现问题?
              //可以将swapRequests加一个与操作requestsWrite的锁,来优化此处代码,避免不好理解
              synchronized (this.requestsWrite) {
                  this.requestsWrite.add(request);
              }
              // 通知服务线程已经接收到GroupCommitRequest
              //FIXME 直接调用父类的this.wakeUp()多好?
              if (hasNotified.compareAndSet(false, true)) {
                  waitPoint.countDown(); // notify
              }
          }
      
          private void swapRequests() {
              // volatile可以保证可见性,requestsWrite写入时加锁了,所以此处无需加锁,通过volatile可以实现低开销的读
              List<GroupCommitRequest> tmp = this.requestsWrite;
              this.requestsWrite = this.requestsRead;
              this.requestsRead = tmp;
          }
      
          private void doCommit() {
              synchronized (this.requestsRead) {
                  if (!this.requestsRead.isEmpty()) {
                      for (GroupCommitRequest req : this.requestsRead) {
                          // There may be a message in the next file, so a maximum of
                          // two times the flush
                          boolean flushOK = false;
                          /**
                           * 两个MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。
                           */
                          for (int i = 0; i < 2 && !flushOK; i++) {
                              //请求的offset超过已经flushed的offset,则强制刷盘
                              flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
      
                              if (!flushOK) {
                                  CommitLog.this.mappedFileQueue.flush(0);
                              }
                          }
      
                          req.wakeupCustomer(flushOK);
                      }
      
                      long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                      if (storeTimestamp > 0) {
                        	//更新刷盘检测点StoreCheckpoint中的physicMsg Timestamp
                          //刷盘检测点的刷盘操作将在刷写消息队列文件时触发
                          CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                      }
      
                      this.requestsRead.clear();
                  } else {
                      // Because of individual messages is set to not sync flush, it
                      // will come to this process
                      CommitLog.this.mappedFileQueue.flush(0);
                  }
              }
          }
      
          public void run() {
              CommitLog.log.info(this.getServiceName() + " service started");
      
              while (!this.isStopped()) {
                  try {
                      //调用swapRequests=>doCommit
                      this.waitForRunning(10);
                      this.doCommit();
                  } catch (Exception e) {
                      CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                  }
              }
      
              // Under normal circumstances shutdown, wait for the arrival of the
              // request, and then flush
              try {
                  Thread.sleep(10);
              } catch (InterruptedException e) {
                  CommitLog.log.warn("GroupCommitService Exception, ", e);
              }
              //FIXME by jannal 上面没有加锁,这里为啥加锁?
              synchronized (this) {
                  this.swapRequests();
              }
      
              this.doCommit();
      
              CommitLog.log.info(this.getServiceName() + " service end");
          }
      
          @Override
          protected void onWaitEnd() {
              this.swapRequests();
          }
      
          @Override
          public String getServiceName() {
              return GroupCommitService.class.getSimpleName();
          }
      
          @Override
          public long getJointime() {
              return 1000 * 60 * 5;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110

    异步刷盘

    1. CommitLog#handleDiskFlush中异步刷盘代码如下。异步刷盘有两种方式

      • 开启transientStorePoolEnable=true机制则启动CommitRealTimeService异步刷盘方式。
      • 如果没有开启transientStorePoolEnable=false,则启动FlushRealTimeService
      • CommitRealTimeService在commit成功后,会执行flushCommitLogService.wakeup();也就是让FlushRealTimeService将Page Cache中的数据同步至磁盘。
      if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
          //使用MappedByteBuffer,默认策略
          flushCommitLogService.wakeup();
      } else {
          //异步刷盘,如果开启TransientStorePool,使用写入缓冲区+FileChannel
          commitLogService.wakeup();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    2. 异步刷盘流程
      在这里插入图片描述

    CommitRealTimeService

    1. 如果transientStorePoolEnable=true,Broker会申请一个与CommitLog同样大小的堆外内存,该堆外内存会使用内存锁定(mlock),将其变为常驻内存,避免被操作系统调到swap空间中。

      • 消息追加到堆外内存
      • 提交到内存映射文件中
      • 使用flush刷盘
    2. CommitRealTimeService服务线程执行逻辑

      • 默认每200ms将ByteBuffer新追加的数据(新追加的数据=wrotePosition-commitedPosition)提交到FileChannel中

    FlushRealTimeService

    1. 无论是否开启写入缓冲池,刷盘最终都由FlushRealTimeService来执行,CommitRealTimeService在commit成功后,会执行flushCommitLogService.wakeup();也就是让FlushRealTimeService将Page Cache中的数据同步至磁盘。
    2. 将内存(Page Cache)中的数据同步至磁盘(flush)有一些前提条件
      • 若当前时间距离上次实际刷盘时间已经超过10S,则会忽略其他所有前提,确定刷盘,这样即使服务器宕机了最多也仅丢失10S的数据,提高了消息队列的可靠性。
      • 正常情况下刷盘需要满足持久化数据大于配置的最小页数,默认4,也就是新写入内存中的数据大于或等于16KB(4*4KB)
        • 当开启写入缓冲,也就是追加到fileChannel的数据大于或等于16KB
        • 未开启写入缓冲则是追加到mappedByteBuffer的数据大于或等于16KB
  • 相关阅读:
    【编程题 动态规划】把数字翻译成字符串(详细注释 易懂)
    vue中计算属性computed的特性和应用
    Redis缓存的雪崩、穿透、击穿
    中国智能音箱市场销量下降,百度稳居第一 /中国即评出10个大模型创新案例 |魔法半周报
    图像分割数据集的相关操作(二)—— albumentations 数据增强
    .NET 6.0 Web API Hangfire
    性能测试知多少---性能分析与调优的原理
    PostgreSQL 与 Oracle 访问分区表执行计划差异
    ZCU102 Zynq MPSoC IP设置与说明
    [C#,Unity面试题]本期主要针对C#跟Unity基础(二)
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126394722