目录
MappedFile是一个文件的映射类,MappedFileQueue则是MappedFile的列表,例如RocketMQ中一个CommitLog文件大小默认是1G,文件名是起始偏移量,首个文件如下

文件写满后,会自动生成下一个文件,MappedFileQueue就是代表这一种文件的列表

- public class MappedFileQueue {
-
- private final String storePath;//文件路径
-
- private final int mappedFileSize;// 每个文件的大小
-
- private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 多个文件的列表
-
- private final AllocateMappedFileService allocateMappedFileService;// 创建新文件的异步线程
-
- private long flushedWhere = 0;
- private long committedWhere = 0;
-
- private volatile long storeTimestamp = 0;
-
- public MappedFileQueue(final String storePath, int mappedFileSize,
- AllocateMappedFileService allocateMappedFileService) {
- this.storePath = storePath;
- this.mappedFileSize = mappedFileSize;
- this.allocateMappedFileService = allocateMappedFileService;
- }
- }
映射路径下的文件列表成List<MappedFile>
- public boolean load() {
- File dir = new File(this.storePath);
- File[] files = dir.listFiles();
- if (files != null) {
- // ascending order
- Arrays.sort(files);
- for (File file : files) {
-
- if (file.length() != this.mappedFileSize) {
- log.warn(file + "\t" + file.length()
- + " length not matched message store config value, please check it manually");
- return false;
- }
-
- try {
- // 映射文件到MappedFile,设置初始的position
- // 如果文件还没写满,在truncateDirtyFiles()方法中会将position重置,从哪里调用的在truncateDirtyFiles()暂不关注
- MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
-
- mappedFile.setWrotePosition(this.mappedFileSize);
- mappedFile.setFlushedPosition(this.mappedFileSize);
- mappedFile.setCommittedPosition(this.mappedFileSize);
- this.mappedFiles.add(mappedFile);
- log.info("load " + file.getPath() + " OK");
- } catch (IOException e) {
- log.error("load file " + file + " error", e);
- return false;
- }
- }
- }
-
- return true;
- }
清除offset之后的脏数据
1、重置offset所在的MappedFile的position
2、清理offset所在的MappedFile之后的文件
- public void truncateDirtyFiles(long offset) {
- List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
-
- for (MappedFile file : this.mappedFiles) {
- long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
- if (fileTailOffset > offset) {
- if (offset >= file.getFileFromOffset()) {
- file.setWrotePosition((int) (offset % this.mappedFileSize));
- file.setCommittedPosition((int) (offset % this.mappedFileSize));
- file.setFlushedPosition((int) (offset % this.mappedFileSize));
- } else {
- file.destroy(1000);
- willRemoveFiles.add(file);
- }
- }
- }
-
- this.deleteExpiredFile(willRemoveFiles);
- }
获取最小的offset,也就是这一系列文件的最小起始位置,第一个文件的名
- public long getMinOffset() {
-
- if (!this.mappedFiles.isEmpty()) {
- try {
- return this.mappedFiles.get(0).getFileFromOffset();
- } catch (IndexOutOfBoundsException e) {
- //continue;
- } catch (Exception e) {
- log.error("getMinOffset has exception.", e);
- }
- }
- return -1;
- }
获取最后一个文件的映射对象(MappedFile), 如果最后一个文件已经写满了,重新创建一个新文件
- // 根据给出的偏移量,获取所在的MappedFile
- // 如果文件写满了,创建新的
- public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
- // 判断是否需要新创建文件,如果不等于-1,表示需要创建一个文件名是createOffset的新文件
- long createOffset = -1;
- // 列表最后一个MappedFile
- MappedFile mappedFileLast = getLastMappedFile();
-
- // 如果没有文件,需要新键,文件名即偏移量
- // 文件都必须是相同大小(mappedFileSize)
- if (mappedFileLast == null) {
- createOffset = startOffset - (startOffset % this.mappedFileSize);
- }
-
- // 如果已经写满(文件大小==已写入的大小)
- // 当前最后的文件大小+mappedFileSize就是下个文件的名
- if (mappedFileLast != null && mappedFileLast.isFull()) {
- createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
- }
-
- // 需要创建新文件
- if (createOffset != -1 && needCreate) {
- String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
- String nextNextFilePath = this.storePath + File.separator
- + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
- MappedFile mappedFile = null;
-
- // allocateMappedFileService不为null,是异步创建文件
- // putRequestAndReturnMappedFile把创建新文件请求写入一个请求队列,然后countDownLatch.await等待创建完成
- // allocateMappedFileService.run中从队列取请求创建文件,创建完成countDownLatch.countdown
- if (this.allocateMappedFileService != null) {
- mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
- nextNextFilePath, this.mappedFileSize);
- } else {
- try {
- mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
- } catch (IOException e) {
- log.error("create mappedFile exception", e);
- }
- }
-
- if (mappedFile != null) {
- if (this.mappedFiles.isEmpty()) {
- mappedFile.setFirstCreateInQueue(true);
- }
- this.mappedFiles.add(mappedFile);
- }
-
- return mappedFile;
- }
-
- return mappedFileLast;
- }
根据当前记录的刷新和提交标志位获取对应的MappedFile,调用MappedFile.flush/commit
- private long flushedWhere = 0;
- private long committedWhere = 0;
-
- public boolean flush(final int flushLeastPages) {
- boolean result = true;
- MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
- if (mappedFile != null) {
- long tmpTimeStamp = mappedFile.getStoreTimestamp();
- int offset = mappedFile.flush(flushLeastPages);
- long where = mappedFile.getFileFromOffset() + offset;
- result = where == this.flushedWhere;
- this.flushedWhere = where;
- if (0 == flushLeastPages) {
- this.storeTimestamp = tmpTimeStamp;
- }
- }
-
- return result;
- }
-
- public boolean commit(final int commitLeastPages) {
- boolean result = true;
- MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
- if (mappedFile != null) {
- int offset = mappedFile.commit(commitLeastPages);
- long where = mappedFile.getFileFromOffset() + offset;
- result = where == this.committedWhere;
- this.committedWhere = where;
- }
-
- return result;
- }