• RocketMQ之MappedFileQueue详解


    MappedFileQueue介绍

    RocketMQ消息写入和刷盘由CommitLog控制,CommitLog持有MappedFileQueue对象,MappedFileQueue内部持有一个MappedFile的集合,每个MappedFile对应一个磁盘存储文件。消息写入时就是写入到对应的MappedFile内存中,并根据刷盘策略将MappedFile内存数据写入到文件中,完成持久化操作。

    MappedFileQueue功能详解

    load加载磁盘文件

    public boolean load() {
    		// ①
            File dir = new File(this.storePath);
            File[] files = dir.listFiles();
            if (files != null) {
                // ascending order
                // 按顺序加载到队列中
                Arrays.sort(files);
                for (File file : files) {
    				// ②
                    if (file.length() != this.mappedFileSize) {
                        log.warn(file + "\t" + file.length()
                            + " length not matched message store config value, please check it manually");
                        return false;
                    }
    
                    try {
                    	// ③
                        MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
    					
    					// ④
                        mappedFile.setWrotePosition(this.mappedFileSize);
                        mappedFile.setFlushedPosition(this.mappedFileSize);
                        mappedFile.setCommittedPosition(this.mappedFileSize);
                        // ⑤
                        this.mappedFiles.add(mappedFile);
                        log.info("load " + file.getPath() + " OK");
                    } catch (IOException e) {
                        log.error("load file " + file + " error", e);
                        return false;
                    }
                }
            }
    
            return true;
        }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    ①、根据配置的storePath路径加载文件
    ②、如果文件的大小不等于mappedFileSize则退出,一般发生在最后一个,所以在RocketMQ第一次启动之后就不能再修改mappedFileSize大小,复制导致存储的消息错乱无法恢复。
    ③、创建MappedFile对应,将本地文件映射到内存中。
    ④、修改MappedFile的位置信息。
    ⑤、添加到内存文件映射集合中。

    getMappedFileByTime

    根据时间戳获取MappedFile文件,将MappedFile队列排序,并且遍历,直到某个MappedFile文件的最后修改时间大于指定时间戳,则返回该队列,如果每个大于指定时间戳的MappedFile则返回最后一个。

    public MappedFile getMappedFileByTime(final long timestamp) {
            Object[] mfs = this.copyMappedFiles(0);
    
            if (null == mfs)
                return null;
    
            for (int i = 0; i < mfs.length; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                    return mappedFile;
                }
            }
    
            return (MappedFile) mfs[mfs.length - 1];
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    getLastMappedFile(long, boolean)

    getLastMappedFile方法是根据条件从队列里获取最新的一个MappedFile文件,可以根据时间戳获取,也可以根据偏移量获取,或是直接从队列中第一个和最后一个。我们看一个典型的getLastMappedFile(long, boolean),根据偏移量获取,如果没有则创建一个MappedFile。

    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
            long createOffset = -1;
            // ①
            MappedFile mappedFileLast = getLastMappedFile();
    
            if (mappedFileLast == null) {
                // ②
                createOffset = startOffset - (startOffset % this.mappedFileSize);
            }
    
            if (mappedFileLast != null && mappedFileLast.isFull()) {
                // ③
                createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
            }
    
            if (createOffset != -1 && needCreate) {
                // ④
                String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
                String nextNextFilePath = this.storePath + File.separator
                    + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
                MappedFile mappedFile = null;
    
                //  ⑤
                if (this.allocateMappedFileService != null) {
                    mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                        nextNextFilePath, this.mappedFileSize);
                } else {
                    try {
                        mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                    } catch (IOException e) {
                        log.error("create mappedFile exception", e);
                    }
                }
    
                if (mappedFile != null) {
                    if (this.mappedFiles.isEmpty()) {
                    	// ⑥
                        mappedFile.setFirstCreateInQueue(true);
                    }
                    // ⑦
                    this.mappedFiles.add(mappedFile);
                }
    
                return mappedFile;
            }
    
            return mappedFileLast;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    ①、获取MappedFile文件队列中最后一个文件,因为是按照顺序写的。
    ②、如果当前MappedFileQueue是空的,则要创建的文件的起始offset为不大于startOffset的最大能被mappedFileSize整除的数。比如[0,99),[100,199),[200,299)队列梯度是这样的,那个我需要拉取230偏移量所在的队列起始队列,那么就是230 - (230 % 100)= 200。
    ③、如果最后一个队列满了,则从最后一个队列的起始偏移位置 + 队列长度作为下一个队列的起始偏移位置。
    ④、如果需要创建,则计算创建MappedFile的文件名,文件名也是以起始偏移位置命名,这样能提升索引效率。
    ⑤、创建MappedFile文件。
    ⑥、如果MappedFile是第一个被创建的,会加上标识,这个标识在put消息的时候会使用到。
    ⑦、将创建的MappedFile加入队列中。

    deleteExpiredFileByTime

    指定过期时间删除MappedFile文件

    public int deleteExpiredFileByTime(final long expiredTime,
            final int deleteFilesInterval,
            final long intervalForcibly,
            final boolean cleanImmediately) {
            Object[] mfs = this.copyMappedFiles(0);
    
            if (null == mfs)
                return 0;
    
            int mfsLength = mfs.length - 1;
            int deleteCount = 0;
            List<MappedFile> files = new ArrayList<MappedFile>();
            if (null != mfs) {
            	// ①
                for (int i = 0; i < mfsLength; i++) {
                    MappedFile mappedFile = (MappedFile) mfs[i];
                    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                    // ②
                    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                        // ③
                        if (mappedFile.destroy(intervalForcibly)) {
                            files.add(mappedFile);
                            deleteCount++;
                            //一次最多删除10个
                            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                                break;
                            }
    
                            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                                try {
                                    //删除了一个文件之后,等待一段时间再删除下一个
                                    Thread.sleep(deleteFilesInterval);
                                } catch (InterruptedException e) {
                                }
                            }
                        } else {
                            break;
                        }
                    } else {
                        //avoid deleting files in the middle
                        break;
                    }
                }
            }
            //从mappedFiles中删除记录
            deleteExpiredFile(files);
    
            return deleteCount;
        }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    ①、遍历MappedFile队列,满足条件的全部删除。
    ②、如果MappedFile的最后修改时间过期或者是cleanImmediately立即清除,则会执行清除操作。
    ③、调用MappedFile的destroy方法,释放连接通道,并且加入删除文件队列。
    ④、真正删除MappedFile文件的方法。
    
    • 1
    • 2
    • 3
    • 4

    deleteExpiredFileByOffset

    根据消息偏移量删除过期文件

    public int deleteExpiredFileByOffset(long offset, int unitSize) {
            Object[] mfs = this.copyMappedFiles(0);
    
            List<MappedFile> files = new ArrayList<MappedFile>();
            int deleteCount = 0;
            if (null != mfs) {
    
                int mfsLength = mfs.length - 1;
    
                for (int i = 0; i < mfsLength; i++) {
                    boolean destroy;
                    MappedFile mappedFile = (MappedFile) mfs[i];
                    //获取映射文件最后一个位置的索引
                    //如果result == null,表明该映射文件还没有填充完,即不存在下一个位置索引文件
                    //因此无需删除当前的位置索引文件。
                    SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
                    if (result != null) {
                        //该文件最大的offSet
                        long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                        //调用mappedFile.selectMappedBuffer方法时,持有计数器加1,
                        //因此,查询完后,要释放引用,持有计数器减1.
                        result.release();
                        // 如果队列最大偏移量小于需要删除的位点,则需要进行删除
                        destroy = maxOffsetInLogicQueue < offset;
                        if (destroy) {
                            log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                                + maxOffsetInLogicQueue + ", delete it");
                        }
                    } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                        log.warn("Found a hanged consume queue file, attempting to delete it.");
                        destroy = true;
                    } else {
                        log.warn("this being not executed forever.");
                        break;
                    }
    
                    if (destroy && mappedFile.destroy(1000 * 60)) {
                        // 需要删除的文件
                        files.add(mappedFile);
                        deleteCount++;
                    } else {
                        break;
                    }
                }
            }
    
            deleteExpiredFile(files);
    
            return deleteCount;
        }
        ```
        
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
  • 相关阅读:
    猿创征文|【开发工具-我打辅助的】2022软件开发常用辅助工具
    【算法】快速排序与归并排序
    软件测试常用的功能测试方法
    自动控制原理 - 3 线性系统的时域分析
    爬取某网站计算机类图书
    洛谷千题详解 | P1005 [NOIP2007 提高组] 矩阵取数游戏【C++、 Java、Python语言】
    搭建docker镜像仓库(二):使用harbor搭建本地镜像仓库
    开源知识库软件xwiki在Windows下的安装
    ENVI实现QUAC、简化黑暗像元、FLAASH方法的遥感影像大气校正
    java面向对象(上)
  • 原文地址:https://blog.csdn.net/qq_39408435/article/details/125415915