目录
MappedFile是一个文件的映射,MappedFileQueue是一组文件的列表,而CommitLog类描述的是整个CommitLog文件目录
源码4.8.0
- public CommitLog(final DefaultMessageStore defaultMessageStore) {
- // 文件列表
- this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
- defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
- this.defaultMessageStore = defaultMessageStore;
-
- // 刷盘策略,同步和异步刷盘
- if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- this.flushCommitLogService = new GroupCommitService();
- } else {
- this.flushCommitLogService = new FlushRealTimeService();
- }
-
- // 异步刷盘时,定时将直接内存暂存区的数据提交到fileChannel,如果是同步刷盘那没啥用
- this.commitLogService = new CommitRealTimeService();
-
- // 用来写入数据,将数据在文件末尾append
- this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
- batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
- @Override
- protected MessageExtBatchEncoder initialValue() {
- return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
- }
- };
-
- // 因为是单个文件,需要考虑并发,这是写入的时候加的锁
- // 有ReentrantLock和自旋两种,默认自旋
- this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
- }
加载文件列表(将所有文件映射为MappedFile)
启动刷盘线程,如果是异步刷盘,启动异步提交线程
- public boolean load() {
- boolean result = this.mappedFileQueue.load();
- log.info("load commit log " + (result ? "OK" : "Failed"));
- return result;
- }
-
- public void start() {
- this.flushCommitLogService.start();
-
- if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- this.commitLogService.start();
- }
- }
异步写入消息
加写锁(自旋或ReentrantLock)、调用MappedFile.appendMessage在文件末尾append消息、解锁、刷盘和同步slave申请
- public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
- // Set the storage time
- msg.setStoreTimestamp(System.currentTimeMillis());
- // Set the message body BODY CRC (consider the most appropriate setting
- // on the client)
- msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
- // Back to Results
- AppendMessageResult result = null;
-
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
- String topic = msg.getTopic();
- int queueId = msg.getQueueId();
-
- // 应该是事务消息相关的,先忽略
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- // Delay Delivery
- if (msg.getDelayTimeLevel() > 0) {
- if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
-
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
-
- // Backup real topic, queueId
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
-
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
-
- long elapsedTimeInLock = 0;
- MappedFile unlockMappedFile = null;
- // 获取文件列表的最后一个文件准备写入
- MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
-
- // 写入锁,异步刷盘默认自旋
- putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
- try {
- long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
- this.beginTimeInLock = beginLockTimestamp;
-
- // Here settings are stored timestamp, in order to ensure an orderly
- // global
- msg.setStoreTimestamp(beginLockTimestamp);
-
- // 如果文件写满了,创建新文件返回
- if (null == mappedFile || mappedFile.isFull()) {
- mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
- }
- if (null == mappedFile) {
- log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- beginTimeInLock = 0;
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
- }
-
- // 同步写mappedByteBuffer,异步写writeBuffer暂存区
- result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- switch (result.getStatus()) {
- case PUT_OK:
- break;
- case END_OF_FILE:
- // 如果这个mappedFile剩余空间无法写入这条消息,会返回END_OF_FILE
- // 1、unlockMappedFile为了下边释放内存锁定
- // 2、mappedFileQueue.getLastMappedFile(0);为了创建一个新文件,然后重新写入
- unlockMappedFile = mappedFile;
- // Create a new file, re-write the message
- mappedFile = this.mappedFileQueue.getLastMappedFile(0);
- if (null == mappedFile) {
- // XXX: warn and notify me
- log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- beginTimeInLock = 0;
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
- }
- result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- break;
- case MESSAGE_SIZE_EXCEEDED:
- case PROPERTIES_SIZE_EXCEEDED:
- beginTimeInLock = 0;
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
- case UNKNOWN_ERROR:
- beginTimeInLock = 0;
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
- default:
- beginTimeInLock = 0;
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
- }
-
- elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
- beginTimeInLock = 0;
- } finally {
- // 释放消息写入锁
- putMessageLock.unlock();
- }
-
- if (elapsedTimeInLock > 500) {
- log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
- }
-
- // 如果WarmMapedFileEnable这个配置开启,那么MappedFile会进行内存锁定,防止在内存不足时操作系统将mappedFile部分页回收而导致使用时的缺页中断
- // 这里unlock即释放这个文件的内存锁定
- if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
- this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
- }
-
- PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
-
- // Statistics 记录该主题的消息写入数量
- storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
- storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
-
- // 异步刷盘MappedByteBuffer 和 暂存区+fileChannel
- CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
- // 提交同步slave申请,如果是 SYNC_MASTER,马上将信息同步至SLAVE; 若ASYNC_MASTER,则每隔1S唤醒SLAVE同步请求
- CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
- return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
- if (flushStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(flushStatus);
- }
- if (replicaStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(replicaStatus);
- if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
- log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
- msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
- }
- }
- return putMessageResult;
- });
- }
同步刷盘线程,有两个列表,读写分离
消息写入时提交刷盘申请写入到requestsWrite ,在GroupCommitService准备刷盘时将这两个列表互换,使用requestsRead来进行刷盘,requestsWrite继续接收写入请求
- class GroupCommitService extends FlushCommitLogService {
- private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
- private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
- }
- public void run() {
- CommitLog.log.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- try {
- // 等待10ms,中途如果来刷盘请求,会被唤醒
- this.waitForRunning(10);
-
- // 刷盘
- this.doCommit();
- } catch (Exception e) {
- CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- // 终止线程之前在进行一次刷盘
- // Under normal circumstances shutdown, wait for the arrival of the
- // request, and then flush
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- CommitLog.log.warn("GroupCommitService Exception, ", e);
- }
-
- synchronized (this) {
- this.swapRequests();
- }
-
- this.doCommit();
-
- CommitLog.log.info(this.getServiceName() + " service end");
- }
- protected void waitForRunning(long interval) {
- // 如果来请求调用了wakeup,hasNotified标志位会设置为true
- if (hasNotified.compareAndSet(true, false)) {
- // wait结束后的操作
- this.onWaitEnd();
- return;
- }
-
- //entry to wait
- // waitPoint就是CountDownLatch,不过增加了个reset方法来重置state标志位
- waitPoint.reset();
-
- try {
- // 等待
- waitPoint.await(interval, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.error("Interrupted", e);
- } finally {
- // 等待超时或者被唤醒之后,调用onWaitEnd
- hasNotified.set(false);
- this.onWaitEnd();
- }
- }
互换两个列表
- protected void onWaitEnd() {
- this.swapRequests();
- }
-
- private void swapRequests() {
- List<GroupCommitRequest> tmp = this.requestsWrite;
- this.requestsWrite = this.requestsRead;
- this.requestsRead = tmp;
- }
来新消息后,写入请求列表,唤醒run()方法
- public synchronized void putRequest(final GroupCommitRequest request) {
- synchronized (this.requestsWrite) {
- this.requestsWrite.add(request);
- }
- this.wakeup();
- }
刷盘方法
- private void doCommit() {
- synchronized (this.requestsRead) {
- if (!this.requestsRead.isEmpty()) {
- for (GroupCommitRequest req : this.requestsRead) {
- // There may be a message in the next file, so a maximum of
- // two times the flush
- // 因为这里的requestsRead可能是多条消息,并且可能跨了两个文件(第一条消息在file1末尾,第2条在file2头)
- // 所以如果FlushedWhere还没到最大的消息偏移量位置,尝试再次刷新
- boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
- for (int i = 0; i < 2 && !flushOK; i++) {
- CommitLog.this.mappedFileQueue.flush(0);
- flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
- }
- // 唤醒消费者
- req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
- }
-
- // 记录刷盘时间
- long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
- if (storeTimestamp > 0) {
- CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
- }
-
- this.requestsRead.clear();
- } else {
- // Because of individual messages is set to not sync flush, it
- // will come to this process
- CommitLog.this.mappedFileQueue.flush(0);
- }
- }
- }
异步策略的提交线程,定时将暂存区的数据提交到fileChannel
- class CommitRealTimeService extends FlushCommitLogService {
-
- private long lastCommitTimestamp = 0;
-
- @Override
- public String getServiceName() {
- return CommitRealTimeService.class.getSimpleName();
- }
-
- @Override
- public void run() {
- CommitLog.log.info(this.getServiceName() + " service started");
- while (!this.isStopped()) {
- // 每次尝试提交的间隔时间,默认200ms
- int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
-
- // 一次至少提交多少页,默认4
- int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
-
- // 上边两个参数,如果当前只有3页,它是不提交的
- // commitDataThoroughInterval这个参数默认200,是定义了如果超过200ms没有commit,将所有数据都提交一遍
- int commitDataThoroughInterval =
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
-
- long begin = System.currentTimeMillis();
- // 判断超时
- if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
- this.lastCommitTimestamp = begin;
- commitDataLeastPages = 0;
- }
-
- try {
- // 提交到fileChannel
- boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
- long end = System.currentTimeMillis();
- if (!result) {
- // 返回false表示有数据被提交了,更新时间戳,唤醒刷盘线程
- this.lastCommitTimestamp = end; // result = false means some data committed.
- //now wake up flush thread.
- flushCommitLogService.wakeup();
- }
-
- if (end - begin > 500) {
- log.info("Commit data to file costs {} ms", end - begin);
- }
- // 定时等待
- this.waitForRunning(interval);
- } catch (Throwable e) {
- CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- // 如果线程被停止了,最后尝试提交一波
- boolean result = false;
- for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
- result = CommitLog.this.mappedFileQueue.commit(0);
- CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
- }
- CommitLog.log.info(this.getServiceName() + " service end");
- }
- }
异步策略的刷盘线程,代码格式类似CommitRealTimeService
- class FlushRealTimeService extends FlushCommitLogService {
- private long lastFlushTimestamp = 0;
- private long printTimes = 0;
-
- public void run() {
- CommitLog.log.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- // true表示定时刷盘,false也是定时,但也会接收commit的唤醒,默认false
- boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
-
- // 间隔时间,默认500ms
- int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
- // 最少刷多少个页,默认4
- int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
-
- // 和commit一样,这个参数表示,超时之后刷所有
- int flushPhysicQueueThoroughInterval =
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
-
- boolean printFlushProgress = false;
-
- // Print flush progress
- long currentTimeMillis = System.currentTimeMillis();
- if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
- this.lastFlushTimestamp = currentTimeMillis;
- flushPhysicQueueLeastPages = 0;
- printFlushProgress = (printTimes++ % 10) == 0;
- }
-
- try {
- // 定时等待,waitForRunning会接收commit线程的唤醒
- if (flushCommitLogTimed) {
- Thread.sleep(interval);
- } else {
- this.waitForRunning(interval);
- }
- // 定时打印 忽略
- if (printFlushProgress) {
- this.printFlushProgress();
- }
- // 刷盘
- long begin = System.currentTimeMillis();
- CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
- long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
- if (storeTimestamp > 0) {
- CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
- }
- long past = System.currentTimeMillis() - begin;
- if (past > 500) {
- log.info("Flush data to disk costs {} ms", past);
- }
- } catch (Throwable e) {
- CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
- this.printFlushProgress();
- }
- }
-
- // shutdown之后的再次尝试刷盘
- // Normal shutdown, to ensure that all the flush before exit
- boolean result = false;
- for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
- result = CommitLog.this.mappedFileQueue.flush(0);
- CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
- }
-
- this.printFlushProgress();
-
- CommitLog.log.info(this.getServiceName() + " service end");
- }
往mappedFile写数据时的写入锁,两种实现,ReentrantLock和do..while循环自旋锁,默认自旋,因为默认是异步刷盘,冲突不大,所以自旋性能更好
- public class PutMessageReentrantLock implements PutMessageLock {
- private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
-
- @Override
- public void lock() {
- putMessageNormalLock.lock();
- }
-
- @Override
- public void unlock() {
- putMessageNormalLock.unlock();
- }
- }
-
- // 自旋,do..while..循环,cas操作
- public class PutMessageSpinLock implements PutMessageLock {
- //true: Can lock, false : in lock.
- private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
-
- @Override
- public void lock() {
- boolean flag;
- do {
- flag = this.putMessageSpinLock.compareAndSet(true, false);
- }
- while (!flag);
- }
-
- @Override
- public void unlock() {
- this.putMessageSpinLock.compareAndSet(false, true);
- }
- }