• # 消息中间件 RocketMQ 高级功能和源码分析(九)


    消息中间件 RocketMQ 高级功能和源码分析(九)

    一、消息中间件 RocketMQ 源码分析: 同步刷盘分析

    1、刷盘机制

    RocketMQ 的存储是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。

    2、同步刷盘

    消息追加到内存后,立即将数据刷写到磁盘文件

    3、同步刷盘流程 示例图:

    在这里插入图片描述

    4、 代码:CommitLog#handleDiskFlush

    
    //刷盘服务
    final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
    if (messageExt.isWaitStoreMsgOK()) {
        //封装刷盘请求
        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
        //提交刷盘请求
        service.putRequest(request);
        //线程阻塞5秒,等待刷盘结束
        boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
        if (!flushOK) {
            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
    

    5、 GroupCommitRequest

    在这里插入图片描述

    
    long nextOffset;	//刷盘点偏移量
    CountDownLatch countDownLatch = new CountDownLatch(1);	//倒计树锁存器
    volatile boolean flushOK = false;	//刷盘结果;默认为false
    

    6、 代码:GroupCommitService#run

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
                //线程等待10ms
                this.waitForRunning(10);
                //执行提交
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    	...
    }
    

    7、 代码:GroupCommitService#doCommit

    
    private void doCommit() {
        //加锁
        synchronized (this.requestsRead) {
            if (!this.requestsRead.isEmpty()) {
                //遍历requestsRead
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    boolean flushOK = false;
                    for (int i = 0; i < 2 && !flushOK; i++) {
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
    					//刷盘
                        if (!flushOK) {
                            CommitLog.this.mappedFileQueue.flush(0);
                        }
                    }
    				//唤醒发送消息客户端
                    req.wakeupCustomer(flushOK);
                }
    			
                //更新刷盘监测点
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {               CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
    			
                this.requestsRead.clear();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    }
    

    二、消息中间件 RocketMQ 源码分析:异步刷盘说明

    1、异步刷盘

    在消息追加到内存后,立即返回给消息发送端。如果开启 transientStorePoolEnable,RocketMQ 会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启 transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。

    2、异步刷盘流程 示例图:

    在这里插入图片描述

    3、开启 transientStorePoolEnable 后异步刷盘步骤:

    
    - 1. 将消息直接追加到 ByteBuffer(堆外内存)。
    - 2. CommitRealTimeService 线程每隔 200ms 将 ByteBuffer 新追加内容提交到 MappedByteBuffer 中。
    - 3. MappedByteBuffer在内存中追加提交的内容,wrotePosition 指针向后移动。
    - 4. commit 操作成功返回,将committedPosition 位置恢复。
    - 5. FlushRealTimeService 线程默认每 500ms 将 MappedByteBuffer 中新追加的内存刷写到磁盘。
    

    4、 代码:CommitLog$CommitRealTimeService#run

    提交线程工作机制

    
    //间隔时间,默认200ms
    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
    
    //一次提交的至少页数
    int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
    
    //两次真实提交的最大间隔,默认200ms
    int commitDataThoroughInterval =
    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
    
    //上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataThoroughInterval参数,直接提交
    long begin = System.currentTimeMillis();
    if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
        this.lastCommitTimestamp = begin;
        commitDataLeastPages = 0;
    }
    
    //执行提交操作,将待提交数据提交到物理文件的内存映射区
    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
    long end = System.currentTimeMillis();
    if (!result) {
        this.lastCommitTimestamp = end; // result = false means some data committed.
        //now wake up flush thread.
        //唤醒刷盘线程
        flushCommitLogService.wakeup();
    }
    
    if (end - begin > 500) {
        log.info("Commit data to file costs {} ms", end - begin);
    }
    this.waitForRunning(interval);
    

    5、 代码:CommitLog$FlushRealTimeService#run

    刷盘线程工作机制

    
    //表示await方法等待,默认false
    boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    //线程执行时间间隔
    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
    //一次刷写任务至少包含页数
    int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    //两次真实刷写任务最大间隔
    int flushPhysicQueueThoroughInterval =
    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    ...
    //距离上次提交间隔超过flushPhysicQueueThoroughInterval,则本次刷盘任务将忽略flushPhysicQueueLeastPages,直接提交
    long currentTimeMillis = System.currentTimeMillis();
    if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
        this.lastFlushTimestamp = currentTimeMillis;
        flushPhysicQueueLeastPages = 0;
        printFlushProgress = (printTimes++ % 10) == 0;
    }
    ...
    //执行一次刷盘前,先等待指定时间间隔
    if (flushCommitLogTimed) {
        Thread.sleep(interval);
    } else {
        this.waitForRunning(interval);
    }
    ...
    long begin = System.currentTimeMillis();
    //刷写磁盘
    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
    if (storeTimestamp > 0) {
    //更新存储监测点文件的时间戳
    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
    
    

    三、消息中间件 RocketMQ 源码分析:删除过期文件机制分析

    1、过期文件删除机制

    由于 RocketMQ 操作 CommitLog、ConsumerQueue文 件是基于内存映射机制并在启动的时候回加载 CommitLog、ConsumerQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ 顺序写 CommitLog、ConsumerQueue 文件,所有写操作全部落在最后一个 CommitLog 或者 ConsumerQueue 文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ 清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。

    2、 代码:DefaultMessageStore#addScheduleTask

    
    private void addScheduleTask() {
    	//每隔10s调度一次清除文件
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    	...
    }
    

    3、 代码:DefaultMessageStore#cleanFilesPeriodically

    
    private void cleanFilesPeriodically() {
        //清除存储文件
        this.cleanCommitLogService.run();
        //清除消息消费队列文件
        this.cleanConsumeQueueService.run();
    }
    

    4、 代码:DefaultMessageStore#deleteExpiredFiles

    
    private void deleteExpiredFiles() {
        //删除的数量
        int deleteCount = 0;
        //文件保留的时间
        long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
        //删除物理文件的间隔
        int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
        //线程被占用,第一次拒绝删除后能保留的最大时间,超过该时间,文件将被强制删除
        int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    
    boolean timeup = this.isTimeToDelete();
    boolean spacefull = this.isSpaceToDelete();
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    if (timeup || spacefull || manualDelete) {
    	...执行删除逻辑
    }else{
        ...无作为
    }
    

    5、删除文件操作的条件

      1. 指定删除文件的时间点,RocketMQ 通过 deleteWhen 设置一天的固定时间执行一次删除过期文件操作,默认零晨4点
      1. 磁盘空间如果不充足,删除过期文件
      1. 预留,手工触发。

    6、 代码:CleanCommitLogService#isSpaceToDelete

    当磁盘空间不足时执行删除过期文件

    
    private boolean isSpaceToDelete() {
        //磁盘分区的最大使用量
        double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    	//是否需要立即执行删除过期文件操作
        cleanImmediately = false;
    
        {
            String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
            //当前CommitLog目录所在的磁盘分区的磁盘使用率
            double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
            //diskSpaceWarningLevelRatio:磁盘使用率警告阈值,默认0.90
            if (physicRatio > diskSpaceWarningLevelRatio) {
                boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                if (diskok) {
                    DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
                }
    			//diskSpaceCleanForciblyRatio:强制清除阈值,默认0.85
                cleanImmediately = true;
            } else if (physicRatio > diskSpaceCleanForciblyRatio) {
                cleanImmediately = true;
            } else {
                boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                if (!diskok) {
                DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
            }
        }
    
        if (physicRatio < 0 || physicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
        }
    }
    

    7、 代码:MappedFileQueue#deleteExpiredFileByTime

    执行文件销毁和删除

    
    for (int i = 0; i < mfsLength; i++) {
        //遍历每隔文件
        MappedFile mappedFile = (MappedFile) mfs[i];
        //计算文件存活时间
        long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
        //如果超过72小时,执行文件删除
        if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
            if (mappedFile.destroy(intervalForcibly)) {
                files.add(mappedFile);
                deleteCount++;
    
                if (files.size() >= DELETE_FILES_BATCH_MAX) {
                    break;
                }
    
                if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                    try {
                        Thread.sleep(deleteFilesInterval);
                    } catch (InterruptedException e) {
                    }
                }
            } else {
                break;
            }
        } else {
            //avoid deleting files in the middle
            break;
        }
    }
    

    四、消息中间件 RocketMQ 源码分析: 消息存储总结

    1、RocketMQ 的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash 索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。

    单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

    2、CommitLog,消息存储文件

    RocketMQ 为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此 RocketMQ 为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时 RocketMQ 为消息实现了 Hash 索引,可以为消息设置索引键,根据所以能够快速从 CommitLog 文件中检索消息。

    3、ReputMessageService 线程实时地将消息转发给消息消费队列文件与索引文件。abort 文件引入。

    当消息达到 CommitLog 后,会通过 ReputMessageService 线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ 引入 abort 文件,记录 Broker 的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证 CommitLog 文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。

    4、RocketMQ 文件过期机制

    RocketMQ 不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。

    上一节关联链接请点击:
    # 消息中间件 RocketMQ 高级功能和源码分析(八)

  • 相关阅读:
    ssm在线教学质量评价系统毕业设计源码141550
    OpenCV4 :并行计算cv::parallel_for_
    Netty之protobuf服务端、nodejs客户端
    【pytorch笔记】第三篇 Tensorboard使用
    SQL笔记(非DQL语句)
    【深度学习-第4篇】使用MATLAB快速实现CNN多变量回归预测
    VSCode自定义代码块详解
    【python的输入】sys.stdin与sys.argv
    asp.net毕业设计家电维修保养信息系统
    YOLO对于检测目标不全也被检测到了,如何改进?
  • 原文地址:https://blog.csdn.net/qfyh_djh/article/details/139824630