• 从源码分析RocketMq消息的存储原理


    rocketmq在存储消息的时候,最终是通过mmap映射成磁盘文件进行存储的,本文就消息的存储流程作一个整理。源码版本是4.9.2
    主要的存储组件有如下4个:
    CommitLog:存储的业务层,接收“保存消息”的请求
    MappedFile:存储的最底层对象,一个MappedFile对象就对应了一个实际的文件
    MappedFileQueue:管理MappedFile的容器
    AllocateMappedFileService:异步创建mappedFile的服务
    对于rocketmq来说,存储消息的主要文件被称为CommitLog,因此就从该类入手。处理存储请求的入口方法是asyncPutMessage,主要流程如下:

    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        ...
        //可能会有多个线程并发请求,虽然支持集群,但是对于每个单独的broker都是本地存储,所以内存锁就足够了
        putMessageLock.lock();
        try {
        	//获取最新的文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            ...
            //如果文件为空,或者已经存满,则创建一个新的commitlog文件
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            ...
            //调用底层的mappedFile进行出处,但是注意此时还没有刷盘
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            ...
        } finally {
            putMessageLock.unlock();
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    	...
    }
    
    

    因此对于Commitlog.asyncPutMessage来说,主要的工作就是2步:
    1.获取或者创建一个MappedFile
    2.调用appendMessage进行存储

    接下去我们先看MappedFile的创建,查看mappedFileQueue.getLastMappedFile方法,最终会调用到doCreateMappedFile方法,调用流如下:
    getLastMappedFile-->tryCreateMappedFile-->doCreateMappedFile

    protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
        MappedFile mappedFile = null;
    	//如果异步服务对象不为空,那么就采用异步创建文件的方式
        if (this.allocateMappedFileService != null) {
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
        } else {
        //否则就同步创建
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }
        ...
        return mappedFile;
    }
    

    因此对于MappedFileQueue来说,主要工作就2步:
    1.如果有异步服务,那么就异步创建mappedFile
    2.否则就同步创建

    接下去主要看异步创建的流程,查看allocateMappedFileService.putRequestAndReturnMappedFile

    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        ...
        //创建mappedFile的请求,
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        //将其放入ConcurrentHashMap中,主要用于并发判断,保证不会创建重复的mappedFile
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
    	//如果map添加成功,就可以将request放入队列中,实际创建mappedFile的线程也是从该queue中获取request
        if (nextPutOK) {
            boolean offerOK = this.requestQueue.offer(nextReq);
        }
    	
        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
            	//因为是异步创建,所以这里需要await,等待mappedFile被异步创建成功
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                //返回创建好的mappedFile
                return result.getMappedFile();
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
        return null;
    }
    

    因此对于AllocateMappedFileService.putRequestAndReturnMappedFile,主要工作也是2步:
    1.将“创建mappedFile”的请求放入队列中
    2.等待异步线程实际创建完mappedFile

    接下去看异步线程是如何具体创建mappedFile的。既然AllocateMappedFileService本身就是负责创建mappedFile的,并且其本身也实现了Runnable接口,我们查看其run方法,其中会调用mmapOperation,这就是最终执行创建mappedFile的方法

    private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            //从队列中拿request
            req = this.requestQueue.take();
            ...
            if (req.getMappedFile() == null) {
                MappedFile mappedFile;
                //判断是否采用堆外内存
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {
                        //如果开启了堆外内存,rocketmq允许外部注入自定义的MappedFile实现
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                    	//如果没有自定义实现,那么就采用默认的实现
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    //如果未采用堆外内存,那么就直接采用默认实现
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }
    			...
                //这里会预热文件,这里涉及到了系统的底层调用
                mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                req.setMappedFile(mappedFile);
            }
            ...
        } finally {
            if (req != null && isSuccess)
                //无论是否创建成功,都要唤醒putRequestAndReturnMappedFile方法中的等待线程
                req.getCountDownLatch().countDown();
        }
        return true;
    }
    
    

    因此对于mmapOperation创建mappedFile,主要工作为4步:
    1.从队列中获取putRequestAndReturnMappedFile方法存放的request
    2.根据是否启用对外内存,分支创建mappedFile
    3.预热mappedFile
    4.唤醒putRequestAndReturnMappedFile方法中的等待线程

    接下去查看mappedFile内部的具体实现,我们可以发现在构造函数中,也会调用内部的init方法,这就是主要实现mmap的方法

    private void init(final String fileName, final int fileSize) throws IOException {
        ...
        //创建文件对象
        this.file = new File(fileName);
        try {
            //获取fileChannel
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            //进行mmap操作,将磁盘空间映射到内存
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            ...
        } finally {
            ...
        }
    }
    

    因此对于init执行mmap,主要工作分为2步:
    1.获取文件的fileChannel
    2.执行mmap映射

    而如果采用了堆外内存,那么除了上述的mmap操作,还会额外分配对外内存

    this.writeBuffer = transientStorePool.borrowBuffer();
    

    到这里,CommitLog.asyncPutMessage方法中的获取或创建mappedFile就完成了。

    接下去需要查看消息具体是符合被写入文件中的。查看mappedFile的appendMessage方法,最终会调用到appendMessagesInner方法:

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
                                                   PutMessageContext putMessageContext) {
        //如果是启用了对外内存,那么会优先写入对外内存,否则直接写入mmap内存
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        ...
        //调用外部的callback执行实际的写入操作
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
               (MessageExtBrokerInner) messageExt, putMessageContext);
        ...
        return result;
    }
    

    因此对于appendMessage方法,主要工作分为2步:
    1.判断是否启用对外内存,从而选择对应的buffer对象
    2.调用传入的callback方法进行实际写入

    接下去查看外部传入的callback方法,是由CommitLog.asyncPutMessage传入

    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    

    而this.appendMessageCallback则是在CommitLog的构造函数中初始化的

    this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
    

    查看DefaultAppendMessageCallback.doAppend方法,因为本文不关心消息的具体结构,所以省略了大部分构造buffer的代码:

    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
        final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
        ...
        //获取消息编码后的buffer
        ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
        ...
        //写入buffer中,如果启用了对外内存,那么就会写入外部传入的writerBuffer,否则直接写入mappedByteBuffer中
        byteBuffer.put(preEncodeBuffer);
        ...
        return result;
    }
    

    因此对于doAppend方法,主要工作分为2步:
    1.将消息编码
    2.将编码后的消息写入buffer中,可以是writerBuffer或者mappedByteBuffer

    此时虽然字节流已经写入了buffer中,但是对于堆外内存,此时数据还仅存在于内存中,而对于mappedByteBuffer,虽然会有系统线程定时刷数据落盘,但是这并非我们可以控,因此也只能假设还未落盘。为了保证数据能落盘,rocketmq还有一个异步刷盘的线程,接下去再来看下异步刷盘是如何处理的。
    查看CommitLog的构造函数,其中有3个service,分别负责同步刷盘、异步刷盘和堆外内存写入fileChannel

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        ...
        //同步刷盘
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            this.flushCommitLogService = new GroupCommitService();
        } else {
            //异步刷盘
            this.flushCommitLogService = new FlushRealTimeService();
        }
        //将对外内存的数据写入fileChannel
        this.commitLogService = new CommitRealTimeService();
        ...
    }
    

    先看CommitRealTimeService.run方法,其中最关键的代码如下:

    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
    

    查看mappedFileQueue.commit方法,关键如下:

    int offset = mappedFile.commit(commitLeastPages);
    

    查看mappedFile.commit方法:

    public int commit(final int commitLeastPages) {
    	//如果为空,说明不是堆外内存,就不需要任何操作,只需等待刷盘即可
        if (writeBuffer == null) {
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                //如果是堆外内存,那么需要做commit
                commit0();
                this.release();
            }
            ...
        }
        return this.committedPosition.get();
    }
    

    查看commit0方法:

    protected void commit0() {
        ...
        //获取堆外内存
        ByteBuffer byteBuffer = writeBuffer.slice();
        //写入fileChannel
        this.fileChannel.write(byteBuffer);
        ...
    }
    

    因此对于CommitRealTimeService,工作主要分2步:
    1.判断是否是对外内存,如果不是那就不需要处理
    2.如果是对外内存,则写入fileChannel

    最后查看同步刷盘的GroupCommitService和异步刷盘FlushRealTimeService,查看其run方法,会发现其本质都是调用了如下方法:

    CommitLog.this.mappedFileQueue.flush
    

    当然在处理的逻辑上还有计算position等等逻辑,但这不是本文所关心的,所以就省略了。
    同步和异步的区别体现在了执行刷盘操作的时间间隔,对于同步刷盘,固定间隔10ms:

    this.waitForRunning(10);
    

    而对于异步刷盘,时间间隔为配置值,默认500ms:

    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
    ...
    if (flushCommitLogTimed) {
        Thread.sleep(interval);
    } else {
        this.waitForRunning(interval);
    }
    

    最后查看mappedFileQueue.flush是如何刷盘的。最终会调用到mappedFile的flush方法:

    public int flush(final int flushLeastPages) {
    	...
        //如果是使用了堆外内存,那么调用的是fileChannel的刷盘
        if (writeBuffer != null || this.fileChannel.position() != 0) {
            this.fileChannel.force(false);
        } else {
        //如果非堆外内存,那么调用的是mappedByteBuffer的刷盘
            this.mappedByteBuffer.force();
        }
        ...       
        return this.getFlushedPosition();
    }
    

    因此最终的刷盘,工作主要分2步,正和前面的CommitRealTimeService工作对应:
    1.如果是使用了堆外内存,那么调用fileChannel的刷盘
    2.如果非堆外内存,那么调用mappedByteBuffer的刷盘

    至此,整个rocketmq消息落盘的流程就完成了,接下去重新整理下整个流程:
    1.CommitLog:存储的业务层,接收“保存消息”的请求,主要有2个功能:创建mappedFile、异步写入消息。
    2.AllocateMappedFileService:异步创建mappedFile的服务,通过构建AllocateRequest对象和队列进行线程间的通讯。虽然MappedFile的实际创建是通过异步线程执行的,但是当前线程会等待创建完成后再返回,所以实际上是异步阻塞的。
    3.MappedFile:存储的最底层对象,一个MappedFile对象就对应了一个实际的文件。在init方法中创建了fileChannel,并完成了mmap操作。如果启用了堆外内存,则会额外初始化writeBuffer,实现读写分离。
    4.MappedFileQueue:管理MappedFile的容器。
    5.写入消息的时候,会根据是否启用堆外内存,写入writeBuffer或者mappedByteBuffer。
    6.实际落盘是通过异步的线程实现的,分为名义上的同步(GroupCommitService)和异步(FlushRealTimeService),不过主要区别在于执行落盘方法的时间间隔不同,最终都是调用mappedFile的flush方法
    7.落盘会根据是否启用对外内存,分别调用fileChannel.force或者mappedByteBuffer.force

  • 相关阅读:
    2.5 整理了3种小红书笔记爆文写作文案【玩赚小红书】
    AI编程案例002/ 根据草图设计小红书封面
    网络知识:内网、外网、宽带、带宽、流量、网速之间的联系?
    leetcode:1662. 检查两个字符串数组是否相等(python3解法)
    01背包详解
    CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2
    车辆调度算法
    Linux 查找
    Redis 主从同步原理
    操作系统4小时速成:文件管理,文件结构,属性,基本操作,逻辑有无结构,目录结构,文件系统
  • 原文地址:https://www.cnblogs.com/tera/p/16035291.html