• RocketMQ源码阅读(六)CommitLog


    目录

    CommitLog

    load/start

    asyncPutMessage

    GroupCommitService

    waitForRunning

    onWaitEnd

    putRequest

    doCommit

    CommitRealTimeService

    FlushRealTimeService

    PutMessageLock


    CommitLog

    MappedFile是一个文件的映射,MappedFileQueue是一组文件的列表,而CommitLog类描述的是整个CommitLog文件目录

    源码4.8.0

    1. public CommitLog(final DefaultMessageStore defaultMessageStore) {
    2. // 文件列表
    3. this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
    4. defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
    5. this.defaultMessageStore = defaultMessageStore;
    6. // 刷盘策略,同步和异步刷盘
    7. if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    8. this.flushCommitLogService = new GroupCommitService();
    9. } else {
    10. this.flushCommitLogService = new FlushRealTimeService();
    11. }
    12. // 异步刷盘时,定时将直接内存暂存区的数据提交到fileChannel,如果是同步刷盘那没啥用
    13. this.commitLogService = new CommitRealTimeService();
    14. // 用来写入数据,将数据在文件末尾append
    15. this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
    16. batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
    17. @Override
    18. protected MessageExtBatchEncoder initialValue() {
    19. return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
    20. }
    21. };
    22. // 因为是单个文件,需要考虑并发,这是写入的时候加的锁
    23. // 有ReentrantLock和自旋两种,默认自旋
    24. this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    25. }

    load/start

    加载文件列表(将所有文件映射为MappedFile)

    启动刷盘线程,如果是异步刷盘,启动异步提交线程

    1. public boolean load() {
    2. boolean result = this.mappedFileQueue.load();
    3. log.info("load commit log " + (result ? "OK" : "Failed"));
    4. return result;
    5. }
    6. public void start() {
    7. this.flushCommitLogService.start();
    8. if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    9. this.commitLogService.start();
    10. }
    11. }

    asyncPutMessage

    异步写入消息

    加写锁(自旋或ReentrantLock)、调用MappedFile.appendMessage在文件末尾append消息、解锁、刷盘和同步slave申请

    1. public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    2. // Set the storage time
    3. msg.setStoreTimestamp(System.currentTimeMillis());
    4. // Set the message body BODY CRC (consider the most appropriate setting
    5. // on the client)
    6. msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    7. // Back to Results
    8. AppendMessageResult result = null;
    9. StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    10. String topic = msg.getTopic();
    11. int queueId = msg.getQueueId();
    12. // 应该是事务消息相关的,先忽略
    13. final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    14. if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
    15. || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    16. // Delay Delivery
    17. if (msg.getDelayTimeLevel() > 0) {
    18. if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
    19. msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    20. }
    21. topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    22. queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    23. // Backup real topic, queueId
    24. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    25. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    26. msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    27. msg.setTopic(topic);
    28. msg.setQueueId(queueId);
    29. }
    30. }
    31. long elapsedTimeInLock = 0;
    32. MappedFile unlockMappedFile = null;
    33. // 获取文件列表的最后一个文件准备写入
    34. MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    35. // 写入锁,异步刷盘默认自旋
    36. putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    37. try {
    38. long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    39. this.beginTimeInLock = beginLockTimestamp;
    40. // Here settings are stored timestamp, in order to ensure an orderly
    41. // global
    42. msg.setStoreTimestamp(beginLockTimestamp);
    43. // 如果文件写满了,创建新文件返回
    44. if (null == mappedFile || mappedFile.isFull()) {
    45. mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    46. }
    47. if (null == mappedFile) {
    48. log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
    49. beginTimeInLock = 0;
    50. return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
    51. }
    52. // 同步写mappedByteBuffer,异步写writeBuffer暂存区
    53. result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    54. switch (result.getStatus()) {
    55. case PUT_OK:
    56. break;
    57. case END_OF_FILE:
    58. // 如果这个mappedFile剩余空间无法写入这条消息,会返回END_OF_FILE
    59. // 1、unlockMappedFile为了下边释放内存锁定
    60. // 2、mappedFileQueue.getLastMappedFile(0);为了创建一个新文件,然后重新写入
    61. unlockMappedFile = mappedFile;
    62. // Create a new file, re-write the message
    63. mappedFile = this.mappedFileQueue.getLastMappedFile(0);
    64. if (null == mappedFile) {
    65. // XXX: warn and notify me
    66. log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
    67. beginTimeInLock = 0;
    68. return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
    69. }
    70. result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    71. break;
    72. case MESSAGE_SIZE_EXCEEDED:
    73. case PROPERTIES_SIZE_EXCEEDED:
    74. beginTimeInLock = 0;
    75. return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
    76. case UNKNOWN_ERROR:
    77. beginTimeInLock = 0;
    78. return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
    79. default:
    80. beginTimeInLock = 0;
    81. return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
    82. }
    83. elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    84. beginTimeInLock = 0;
    85. } finally {
    86. // 释放消息写入锁
    87. putMessageLock.unlock();
    88. }
    89. if (elapsedTimeInLock > 500) {
    90. log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    91. }
    92. // 如果WarmMapedFileEnable这个配置开启,那么MappedFile会进行内存锁定,防止在内存不足时操作系统将mappedFile部分页回收而导致使用时的缺页中断
    93. // 这里unlock即释放这个文件的内存锁定
    94. if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
    95. this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    96. }
    97. PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    98. // Statistics 记录该主题的消息写入数量
    99. storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    100. storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
    101. // 异步刷盘MappedByteBuffer 和 暂存区+fileChannel
    102. CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    103. // 提交同步slave申请,如果是 SYNC_MASTER,马上将信息同步至SLAVE; 若ASYNC_MASTER,则每隔1S唤醒SLAVE同步请求
    104. CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    105. return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    106. if (flushStatus != PutMessageStatus.PUT_OK) {
    107. putMessageResult.setPutMessageStatus(flushStatus);
    108. }
    109. if (replicaStatus != PutMessageStatus.PUT_OK) {
    110. putMessageResult.setPutMessageStatus(replicaStatus);
    111. if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
    112. log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
    113. msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
    114. }
    115. }
    116. return putMessageResult;
    117. });
    118. }

    GroupCommitService

    同步刷盘线程,有两个列表,读写分离

    消息写入时提交刷盘申请写入到requestsWrite ,在GroupCommitService准备刷盘时将这两个列表互换,使用requestsRead来进行刷盘,requestsWrite继续接收写入请求

    1. class GroupCommitService extends FlushCommitLogService {
    2. private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    3. private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
    4. }
    1. public void run() {
    2. CommitLog.log.info(this.getServiceName() + " service started");
    3. while (!this.isStopped()) {
    4. try {
    5. // 等待10ms,中途如果来刷盘请求,会被唤醒
    6. this.waitForRunning(10);
    7. // 刷盘
    8. this.doCommit();
    9. } catch (Exception e) {
    10. CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
    11. }
    12. }
    13. // 终止线程之前在进行一次刷盘
    14. // Under normal circumstances shutdown, wait for the arrival of the
    15. // request, and then flush
    16. try {
    17. Thread.sleep(10);
    18. } catch (InterruptedException e) {
    19. CommitLog.log.warn("GroupCommitService Exception, ", e);
    20. }
    21. synchronized (this) {
    22. this.swapRequests();
    23. }
    24. this.doCommit();
    25. CommitLog.log.info(this.getServiceName() + " service end");
    26. }

    waitForRunning

    1. protected void waitForRunning(long interval) {
    2. // 如果来请求调用了wakeup,hasNotified标志位会设置为true
    3. if (hasNotified.compareAndSet(true, false)) {
    4. // wait结束后的操作
    5. this.onWaitEnd();
    6. return;
    7. }
    8. //entry to wait
    9. // waitPoint就是CountDownLatch,不过增加了个reset方法来重置state标志位
    10. waitPoint.reset();
    11. try {
    12. // 等待
    13. waitPoint.await(interval, TimeUnit.MILLISECONDS);
    14. } catch (InterruptedException e) {
    15. log.error("Interrupted", e);
    16. } finally {
    17. // 等待超时或者被唤醒之后,调用onWaitEnd
    18. hasNotified.set(false);
    19. this.onWaitEnd();
    20. }
    21. }

    onWaitEnd

    互换两个列表 

    1. protected void onWaitEnd() {
    2. this.swapRequests();
    3. }
    4. private void swapRequests() {
    5. List<GroupCommitRequest> tmp = this.requestsWrite;
    6. this.requestsWrite = this.requestsRead;
    7. this.requestsRead = tmp;
    8. }

    putRequest

    来新消息后,写入请求列表,唤醒run()方法 

    1. public synchronized void putRequest(final GroupCommitRequest request) {
    2. synchronized (this.requestsWrite) {
    3. this.requestsWrite.add(request);
    4. }
    5. this.wakeup();
    6. }

    doCommit

    刷盘方法 

    1. private void doCommit() {
    2. synchronized (this.requestsRead) {
    3. if (!this.requestsRead.isEmpty()) {
    4. for (GroupCommitRequest req : this.requestsRead) {
    5. // There may be a message in the next file, so a maximum of
    6. // two times the flush
    7. // 因为这里的requestsRead可能是多条消息,并且可能跨了两个文件(第一条消息在file1末尾,第2条在file2头)
    8. // 所以如果FlushedWhere还没到最大的消息偏移量位置,尝试再次刷新
    9. boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
    10. for (int i = 0; i < 2 && !flushOK; i++) {
    11. CommitLog.this.mappedFileQueue.flush(0);
    12. flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
    13. }
    14. // 唤醒消费者
    15. req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
    16. }
    17. // 记录刷盘时间
    18. long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
    19. if (storeTimestamp > 0) {
    20. CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
    21. }
    22. this.requestsRead.clear();
    23. } else {
    24. // Because of individual messages is set to not sync flush, it
    25. // will come to this process
    26. CommitLog.this.mappedFileQueue.flush(0);
    27. }
    28. }
    29. }

    CommitRealTimeService

    异步策略的提交线程,定时将暂存区的数据提交到fileChannel

    1. class CommitRealTimeService extends FlushCommitLogService {
    2. private long lastCommitTimestamp = 0;
    3. @Override
    4. public String getServiceName() {
    5. return CommitRealTimeService.class.getSimpleName();
    6. }
    7. @Override
    8. public void run() {
    9. CommitLog.log.info(this.getServiceName() + " service started");
    10. while (!this.isStopped()) {
    11. // 每次尝试提交的间隔时间,默认200ms
    12. int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
    13. // 一次至少提交多少页,默认4
    14. int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
    15. // 上边两个参数,如果当前只有3页,它是不提交的
    16. // commitDataThoroughInterval这个参数默认200,是定义了如果超过200ms没有commit,将所有数据都提交一遍
    17. int commitDataThoroughInterval =
    18. CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
    19. long begin = System.currentTimeMillis();
    20. // 判断超时
    21. if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
    22. this.lastCommitTimestamp = begin;
    23. commitDataLeastPages = 0;
    24. }
    25. try {
    26. // 提交到fileChannel
    27. boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
    28. long end = System.currentTimeMillis();
    29. if (!result) {
    30. // 返回false表示有数据被提交了,更新时间戳,唤醒刷盘线程
    31. this.lastCommitTimestamp = end; // result = false means some data committed.
    32. //now wake up flush thread.
    33. flushCommitLogService.wakeup();
    34. }
    35. if (end - begin > 500) {
    36. log.info("Commit data to file costs {} ms", end - begin);
    37. }
    38. // 定时等待
    39. this.waitForRunning(interval);
    40. } catch (Throwable e) {
    41. CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
    42. }
    43. }
    44. // 如果线程被停止了,最后尝试提交一波
    45. boolean result = false;
    46. for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
    47. result = CommitLog.this.mappedFileQueue.commit(0);
    48. CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    49. }
    50. CommitLog.log.info(this.getServiceName() + " service end");
    51. }
    52. }

    FlushRealTimeService

    异步策略的刷盘线程,代码格式类似CommitRealTimeService

    1. class FlushRealTimeService extends FlushCommitLogService {
    2. private long lastFlushTimestamp = 0;
    3. private long printTimes = 0;
    4. public void run() {
    5. CommitLog.log.info(this.getServiceName() + " service started");
    6. while (!this.isStopped()) {
    7. // true表示定时刷盘,false也是定时,但也会接收commit的唤醒,默认false
    8. boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    9. // 间隔时间,默认500ms
    10. int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
    11. // 最少刷多少个页,默认4
    12. int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    13. // 和commit一样,这个参数表示,超时之后刷所有
    14. int flushPhysicQueueThoroughInterval =
    15. CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    16. boolean printFlushProgress = false;
    17. // Print flush progress
    18. long currentTimeMillis = System.currentTimeMillis();
    19. if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
    20. this.lastFlushTimestamp = currentTimeMillis;
    21. flushPhysicQueueLeastPages = 0;
    22. printFlushProgress = (printTimes++ % 10) == 0;
    23. }
    24. try {
    25. // 定时等待,waitForRunning会接收commit线程的唤醒
    26. if (flushCommitLogTimed) {
    27. Thread.sleep(interval);
    28. } else {
    29. this.waitForRunning(interval);
    30. }
    31. // 定时打印 忽略
    32. if (printFlushProgress) {
    33. this.printFlushProgress();
    34. }
    35. // 刷盘
    36. long begin = System.currentTimeMillis();
    37. CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
    38. long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
    39. if (storeTimestamp > 0) {
    40. CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
    41. }
    42. long past = System.currentTimeMillis() - begin;
    43. if (past > 500) {
    44. log.info("Flush data to disk costs {} ms", past);
    45. }
    46. } catch (Throwable e) {
    47. CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
    48. this.printFlushProgress();
    49. }
    50. }
    51. // shutdown之后的再次尝试刷盘
    52. // Normal shutdown, to ensure that all the flush before exit
    53. boolean result = false;
    54. for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
    55. result = CommitLog.this.mappedFileQueue.flush(0);
    56. CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    57. }
    58. this.printFlushProgress();
    59. CommitLog.log.info(this.getServiceName() + " service end");
    60. }

    PutMessageLock

    往mappedFile写数据时的写入锁,两种实现,ReentrantLock和do..while循环自旋锁,默认自旋,因为默认是异步刷盘,冲突不大,所以自旋性能更好

    1. public class PutMessageReentrantLock implements PutMessageLock {
    2. private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
    3. @Override
    4. public void lock() {
    5. putMessageNormalLock.lock();
    6. }
    7. @Override
    8. public void unlock() {
    9. putMessageNormalLock.unlock();
    10. }
    11. }
    12. // 自旋,do..while..循环,cas操作
    13. public class PutMessageSpinLock implements PutMessageLock {
    14. //true: Can lock, false : in lock.
    15. private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
    16. @Override
    17. public void lock() {
    18. boolean flag;
    19. do {
    20. flag = this.putMessageSpinLock.compareAndSet(true, false);
    21. }
    22. while (!flag);
    23. }
    24. @Override
    25. public void unlock() {
    26. this.putMessageSpinLock.compareAndSet(false, true);
    27. }
    28. }

  • 相关阅读:
    MySQL唯一约束(UNIQUE KEY)
    postgreSQL中将字段从smallint类型改成boolean类型
    在华为云服务器上CentOS 7安装单机版Redis
    聊聊分布式架构02——Http到Https
    kafka权限认证 topic权限认证 权限动态认证-亲测成功
    Java 8 中Stream流的一些用法
    Mysql应用日志时间与系统时间相差八小时
    python学习05协程_2
    【全开源】JAVA打车小程序APP打车顺风车滴滴车跑腿源码微信小程序打车源码
    【python】OpenCV—Color Detection
  • 原文地址:https://blog.csdn.net/xyjy11/article/details/125398699