• 浅析RocketMQ-MappedFileQueue和MappedFile


    RocketMQ的Commit,Comsumequeue,Index文件的代码实现都是MappedFile,而MappedFileQueue则持有了多个MappedFile,可以理解为对应的文件夹。本文主要分析下其重要的方法。

    一.创建MappedFile

    RocketMQ要向MappedFile中写入数据时,会调用getLastMappedFile获取最新的写入文件

    1.getLastMappedFile
     public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
            long createOffset = -1;
            // 获取MappedFile集合中末尾的文件,对应的MappedFileQueue的mappedFiles
            MappedFile mappedFileLast = getLastMappedFile();
    		// 1.MappedFile不存在,计算新文件初始偏移量,注意这里不为0
            if (mappedFileLast == null) {
                createOffset = startOffset - (startOffset % this.mappedFileSize);
            }
    		// MappedFile存在但是无法写入了,计算新文件初始偏移量
            if (mappedFileLast != null && mappedFileLast.isFull()) {
                createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
            }
    		// 创建新文件
            if (createOffset != -1 && needCreate) {
                return tryCreateMappedFile(createOffset);
            }
    
            return mappedFileLast;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    注释1处,计算新文件偏移量为啥直接不为0呢?
    因为MappedFile文件可能被删除了,故需要重新计算

        protected MappedFile tryCreateMappedFile(long createOffset) {
        	// 拼接文件路径,这里获取了下个文件路径和下下的文件路径
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
                    + this.mappedFileSize);
            return doCreateMappedFile(nextFilePath, nextNextFilePath);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
        protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
            MappedFile mappedFile = null;
    		// 正常是allocateMappedFileService在初始化时会创建
            if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                        nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                	// 否则直接构建文件
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {}
            }
    		
            if (mappedFile != null) {
            	// 设置首次创建标识
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                // 持有新的mappedFile
                this.mappedFiles.add(mappedFile);
            }
    
            return mappedFile;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    2.putRequestAndReturnMappedFile

    这里有个transientStorePoolEnable参数,等于true时,开启堆外内存配置,表示消息存储时会先存在堆外内存,然后通过Commit线程将数据提交到内存映射的Buffer中,最后通过Flush线程将数据持久化到磁盘中。

    putRequestAndReturnMappedFile会生成下个文件和下下个文件。但提前生成的下下个文件不会返回,留到下次调用直接返回。这样做到每次快人一步,提升了运行效率。

        public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
            int canSubmitRequests = 2;
            // 开启了transientStorePoolEnable,默认为false
            if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            	// 开启了fastFailIfNoBufferInStorePool配置,默认为false
                if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                	// 要求是主节点
                    && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) {
                    // 分配的堆外空间是有限数量的
                    canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
                }
            }
    		// 生成下个文件,包装一下成一个特定请求
            AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
            boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
    		
            if (nextPutOK) {
                if (canSubmitRequests <= 0) {
                    this.requestTable.remove(nextFilePath);
                    return null;
                }
                // requestQueue是一个堵塞队列 
                // 初始化时AllocateMappedFileService会调用一个线程,不断读取requestQueue数据,生成具体的文件
                boolean offerOK = this.requestQueue.offer(nextReq);
                canSubmitRequests--;
            }
    		// 生成下下个文件,跟生成生成下个文件逻辑一致
            AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
            boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
            if (nextNextPutOK) {
                if (canSubmitRequests <= 0) {
                    this.requestTable.remove(nextNextFilePath);
                } else {
                    boolean offerOK = this.requestQueue.offer(nextNextReq);
                }
            }
    		// 具体生成文件过程有误,这里拦截方法返回
            if (hasException) {
                return null;
            }
    		// 只获取生成的下个文件结果,下下个文件让线程慢慢运行
            AllocateRequest result = this.requestTable.get(nextFilePath);
            try {
                if (result != null) {
                    boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                    if (!waitOK) {
                        return null;
                    } else {
                        this.requestTable.remove(nextFilePath);
                        return result.getMappedFile();
                    }
                }
            } catch (InterruptedException e) {}
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    AllocateMappedFileService 调度运行的线程,执行mmapOperation进行创建文件

    private boolean mmapOperation() {
            boolean isSuccess = false;
            AllocateRequest req = null;
            try {
            	// 获取创建文件的请求,获取不到则堵塞住
                req = this.requestQueue.take();
                AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
                // 请求对象发生变动,直接返回
                if (null == expectedRequest) {
                    return true;
                }
                if (expectedRequest != req) {
                    return true;
                }
    			// 这里不为null说明,已经提前生成了,对应了生成下下个文件的逻辑
                if (req.getMappedFile() == null) {
                    long beginTime = System.currentTimeMillis();
    
                    MappedFile mappedFile;
                    // 开启了堆外内存
                    if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                        try {
                            mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                            // 这里比直接构造,多了一个将堆外内存分配给writeBuffer的操作
                            mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                        } catch (RuntimeException e) {
                            mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                        }
                    } else {
                    	// 直接构建对象
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                    }
    
    
                    // 文件预热操作
                    if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                        .getMappedFileSizeCommitLog()
                        &&
                        this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                        // getFlushDiskType为异步刷盘,getFlushLeastPagesWhenWarmMapedFile为4k
                        mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                            this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                    }
    
                    req.setMappedFile(mappedFile);
                    this.hasException = false;
                    isSuccess = true;
                }
            } 
            ... // 省略异常捕获处理,主要设置hasException=true
            } finally {
            	// 对应putRequestAndReturnMappedFile的getCountDownLatch.await操作
                if (req != null && isSuccess)
                    req.getCountDownLatch().countDown();
            }
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    3. warmMappedFile

    为啥要进行文件预热?
    我们知道RocketMQ使用了内存映射技术mmap,它将文件在磁盘位置的地址和的虚拟地址通过映射对应起来。但是操作系统并没有加载到物理内存中。文件预热可以理解为将数据加载到物理内存的操作。

       public void warmMappedFile(FlushDiskType type, int pages) {
       		// 创建一个共享的缓存区
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            int flush = 0;
            // MappedFile.OS_PAGE_SIZE= 4k
            for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            	// 1.每隔4k,向缓存区写入一个字节
                byteBuffer.put(i, (byte) 0);
                // 同步刷盘
                if (type == FlushDiskType.SYNC_FLUSH) {
                	// 每隔16K,强制刷盘一次,pages=4k,OS_PAGE_SIZE为=4k
                    if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                        flush = i;
                        mappedByteBuffer.force();
                    }
                }
    
                // 这里是让出CPU,避免长时间占用
                if (j % 1000 == 0) {
                    try {
                        Thread.sleep(0);
                    } catch (InterruptedException e) {}
                }
            }
    
            // 强制刷盘
            if (type == FlushDiskType.SYNC_FLUSH) {
                mappedByteBuffer.force();
            }
            // 将进程使用的部分或全部的地址空间锁定在物理内存中,防止其被交换到swap空间
            this.mlock();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    注释1 为啥每次写入是4k?
    这跟Page Cache 页缓存有关,每一页大小约为4k。系统每次读写数据会先到Page Cache,再到硬盘,如果请求过来,在页缓存上没对应的数据,则会发生缺页中断,磁盘重新加载数据到内存。每隔4k写入,用于保证不会发生缺页。

    二.查找MappedFile

    findMappedFileByOffset 根据偏移量查找文件

        public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
            try {
            	// 获取首个文件
                MappedFile firstMappedFile = this.getFirstMappedFile();
                //获取最后一个文件
                MappedFile lastMappedFile = this.getLastMappedFile();
                if (firstMappedFile != null && lastMappedFile != null) {
                	// 偏移量小于现有最小的或者大于现有最大的,直接返回
                    if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    } else {
                    	//计算文件索引,这里也是考虑到首个文件偏移量不为0的情况
                        int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                        MappedFile targetFile = null;
                        try {
                            targetFile = this.mappedFiles.get(index);
                        } catch (Exception ignored) {
                        }
    					// 正好存在
                        if (targetFile != null && offset >= targetFile.getFileFromOffset()
                            && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                            return targetFile;
                        }
    
                        for (MappedFile tmpMappedFile : this.mappedFiles) {
                            if (offset >= tmpMappedFile.getFileFromOffset()
                                && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                                return tmpMappedFile;
                            }
                        }
                    }
    
                    if (returnFirstOnNotFound) {
                        return firstMappedFile;
                    }
                }
            } catch (Exception e) { }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
  • 相关阅读:
    VirtualBox 安装CentOs6.8 无法上网问题和无法yum 安装文件的问题
    农村当前最大的红利,以及三大赚钱项目
    MySQL的主从复制
    这种动态规划你见过吗——状态机动态规划之股票问题(上)
    Redis学习笔记
    5-1传输层-传输层提供的服务
    Redis缓存(笔记一:缓存介绍和数据库启动)
    第六十三天 p1192
    【JVM】jvm的类加载机制
    Python中使用item()方法遍历字典的例子
  • 原文地址:https://blog.csdn.net/qq_34789577/article/details/126805984