• RocketMQ源码阅读(五)MappedFileQueue


    目录

    load

    truncateDirtyFiles

    getMinOffset

    getLastMappedFile

    flush/commit


    MappedFile是一个文件的映射类,MappedFileQueue则是MappedFile的列表,例如RocketMQ中一个CommitLog文件大小默认是1G,文件名是起始偏移量,首个文件如下

    文件写满后,会自动生成下一个文件,MappedFileQueue就是代表这一种文件的列表

    1. public class MappedFileQueue {
    2. private final String storePath;//文件路径
    3. private final int mappedFileSize;// 每个文件的大小
    4. private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 多个文件的列表
    5. private final AllocateMappedFileService allocateMappedFileService;// 创建新文件的异步线程
    6. private long flushedWhere = 0;
    7. private long committedWhere = 0;
    8. private volatile long storeTimestamp = 0;
    9. public MappedFileQueue(final String storePath, int mappedFileSize,
    10. AllocateMappedFileService allocateMappedFileService) {
    11. this.storePath = storePath;
    12. this.mappedFileSize = mappedFileSize;
    13. this.allocateMappedFileService = allocateMappedFileService;
    14. }
    15. }

    load

    映射路径下的文件列表成List<MappedFile>

    1. public boolean load() {
    2. File dir = new File(this.storePath);
    3. File[] files = dir.listFiles();
    4. if (files != null) {
    5. // ascending order
    6. Arrays.sort(files);
    7. for (File file : files) {
    8. if (file.length() != this.mappedFileSize) {
    9. log.warn(file + "\t" + file.length()
    10. + " length not matched message store config value, please check it manually");
    11. return false;
    12. }
    13. try {
    14. // 映射文件到MappedFile,设置初始的position
    15. // 如果文件还没写满,在truncateDirtyFiles()方法中会将position重置,从哪里调用的在truncateDirtyFiles()暂不关注
    16. MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
    17. mappedFile.setWrotePosition(this.mappedFileSize);
    18. mappedFile.setFlushedPosition(this.mappedFileSize);
    19. mappedFile.setCommittedPosition(this.mappedFileSize);
    20. this.mappedFiles.add(mappedFile);
    21. log.info("load " + file.getPath() + " OK");
    22. } catch (IOException e) {
    23. log.error("load file " + file + " error", e);
    24. return false;
    25. }
    26. }
    27. }
    28. return true;
    29. }

    truncateDirtyFiles

    清除offset之后的脏数据
    1、重置offset所在的MappedFile的position
    2、清理offset所在的MappedFile之后的文件

    1. public void truncateDirtyFiles(long offset) {
    2. List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
    3. for (MappedFile file : this.mappedFiles) {
    4. long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
    5. if (fileTailOffset > offset) {
    6. if (offset >= file.getFileFromOffset()) {
    7. file.setWrotePosition((int) (offset % this.mappedFileSize));
    8. file.setCommittedPosition((int) (offset % this.mappedFileSize));
    9. file.setFlushedPosition((int) (offset % this.mappedFileSize));
    10. } else {
    11. file.destroy(1000);
    12. willRemoveFiles.add(file);
    13. }
    14. }
    15. }
    16. this.deleteExpiredFile(willRemoveFiles);
    17. }

    getMinOffset

    获取最小的offset,也就是这一系列文件的最小起始位置,第一个文件的名 

    1. public long getMinOffset() {
    2. if (!this.mappedFiles.isEmpty()) {
    3. try {
    4. return this.mappedFiles.get(0).getFileFromOffset();
    5. } catch (IndexOutOfBoundsException e) {
    6. //continue;
    7. } catch (Exception e) {
    8. log.error("getMinOffset has exception.", e);
    9. }
    10. }
    11. return -1;
    12. }

    getLastMappedFile

    获取最后一个文件的映射对象(MappedFile), 如果最后一个文件已经写满了,重新创建一个新文件

    1. // 根据给出的偏移量,获取所在的MappedFile
    2. // 如果文件写满了,创建新的
    3. public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    4. // 判断是否需要新创建文件,如果不等于-1,表示需要创建一个文件名是createOffset的新文件
    5. long createOffset = -1;
    6. // 列表最后一个MappedFile
    7. MappedFile mappedFileLast = getLastMappedFile();
    8. // 如果没有文件,需要新键,文件名即偏移量
    9. // 文件都必须是相同大小(mappedFileSize)
    10. if (mappedFileLast == null) {
    11. createOffset = startOffset - (startOffset % this.mappedFileSize);
    12. }
    13. // 如果已经写满(文件大小==已写入的大小)
    14. // 当前最后的文件大小+mappedFileSize就是下个文件的名
    15. if (mappedFileLast != null && mappedFileLast.isFull()) {
    16. createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    17. }
    18. // 需要创建新文件
    19. if (createOffset != -1 && needCreate) {
    20. String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
    21. String nextNextFilePath = this.storePath + File.separator
    22. + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
    23. MappedFile mappedFile = null;
    24. // allocateMappedFileService不为null,是异步创建文件
    25. // putRequestAndReturnMappedFile把创建新文件请求写入一个请求队列,然后countDownLatch.await等待创建完成
    26. // allocateMappedFileService.run中从队列取请求创建文件,创建完成countDownLatch.countdown
    27. if (this.allocateMappedFileService != null) {
    28. mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
    29. nextNextFilePath, this.mappedFileSize);
    30. } else {
    31. try {
    32. mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
    33. } catch (IOException e) {
    34. log.error("create mappedFile exception", e);
    35. }
    36. }
    37. if (mappedFile != null) {
    38. if (this.mappedFiles.isEmpty()) {
    39. mappedFile.setFirstCreateInQueue(true);
    40. }
    41. this.mappedFiles.add(mappedFile);
    42. }
    43. return mappedFile;
    44. }
    45. return mappedFileLast;
    46. }

    flush/commit

     根据当前记录的刷新和提交标志位获取对应的MappedFile,调用MappedFile.flush/commit

    1. private long flushedWhere = 0;
    2. private long committedWhere = 0;
    3. public boolean flush(final int flushLeastPages) {
    4. boolean result = true;
    5. MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    6. if (mappedFile != null) {
    7. long tmpTimeStamp = mappedFile.getStoreTimestamp();
    8. int offset = mappedFile.flush(flushLeastPages);
    9. long where = mappedFile.getFileFromOffset() + offset;
    10. result = where == this.flushedWhere;
    11. this.flushedWhere = where;
    12. if (0 == flushLeastPages) {
    13. this.storeTimestamp = tmpTimeStamp;
    14. }
    15. }
    16. return result;
    17. }
    18. public boolean commit(final int commitLeastPages) {
    19. boolean result = true;
    20. MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
    21. if (mappedFile != null) {
    22. int offset = mappedFile.commit(commitLeastPages);
    23. long where = mappedFile.getFileFromOffset() + offset;
    24. result = where == this.committedWhere;
    25. this.committedWhere = where;
    26. }
    27. return result;
    28. }

  • 相关阅读:
    ArrayList 和 LinkedList 到底有哪些区别?
    windows和liunx上端口被占用如何删除
    Selenium —— 网页frame与多窗口处理!
    GID:旷视提出全方位的检测模型知识蒸馏 | CVPR 2021
    驱动开发:内核强制结束进程运行
    vue-router传参的四种方式超详细
    [记忆化dfs]leetcode2400:恰好移动 k 步到达某一位置的方法数目(medium)
    多机局域网办公神器 rustdesk 使用强推!!!
    Hadoop之小文件问题及解决方案
    RocketMQ第二话 -- RocketMQ事务消息、延时消息实现
  • 原文地址:https://blog.csdn.net/xyjy11/article/details/125377041