• 「RocketMQ」消息的刷盘机制


    刷盘策略

    CommitLog的asyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略:

    1. public class CommitLog {
    2. public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    3. // ...
    4. try {
    5. // 获取上一次写入的文件
    6. MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    7. // ...
    8. // 写入消息
    9. result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    10. // ...
    11. } finally {
    12. beginTimeInLock = 0;
    13. putMessageLock.unlock();
    14. }
    15. // ...
    16. // 执行刷盘
    17. CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    18. // ...
    19. }
    20. }

    刷盘有两种策略:

    • 同步刷盘,表示消息写入到内存之后需要立刻刷到磁盘文件中。
    • 同步刷盘会构建GroupCommitRequest组提交请求并设置本次刷盘后的位置偏移量的值(写入位置偏移量+写入数据字节数),然后将请求添加到flushDiskWatcher和GroupCommitService中进行刷盘。
    • 异步刷盘,表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。
    1. public class CommitLog {
    2. // 监控刷盘
    3. private final FlushDiskWatcher flushDiskWatcher;
    4. public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    5. // 是否是同步刷盘
    6. if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    7. // 获取GroupCommitService
    8. final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
    9. // 是否等待
    10. if (messageExt.isWaitStoreMsgOK()) {
    11. // 构建组提交请求,传入本次刷盘后位置的偏移量:写入位置偏移量+写入数据字节数
    12. GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
    13. this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
    14. // 添加到wather中
    15. flushDiskWatcher.add(request);
    16. // 添加到service
    17. service.putRequest(request);
    18. // 返回
    19. return request.future();
    20. } else {
    21. service.wakeup();
    22. return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    23. }
    24. }
    25. // 如果是异步刷盘
    26. else {
    27. if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    28. flushCommitLogService.wakeup();
    29. } else {
    30. commitLogService.wakeup();
    31. }
    32. return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    33. }
    34. }
    35. }

    同步刷盘

    如果使用的是同步刷盘,首先获取了GroupCommitService,然后构建GroupCommitRequest组提交请求,将请求添加到flushDiskWatcher和GroupCommitService中,其中flushDiskWatcher用于监控刷盘是否超时,GroupCommitService用于提交刷盘数据。

    构建GroupCommitRequest提交请求

    GroupCommitRequest是CommitLog的内部类:

    • nextOffset:写入位置偏移量+写入数据字节数,也就是本次刷盘成功后应该对应的flush偏移量
    • flushOKFuture:刷盘结果
    • deadLine:刷盘的限定时间,值为当前时间 + 传入的超时时间,超过限定时间还未刷盘完毕会被认为超时
    1. public class CommitLog {
    2. public static class GroupCommitRequest {
    3. private final long nextOffset;
    4. // 刷盘状态
    5. private CompletableFuture flushOKFuture = new CompletableFuture<>();
    6. private final long deadLine;// 刷盘的限定时间,超过限定时间还未刷盘完毕会被认为超时
    7. public GroupCommitRequest(long nextOffset, long timeoutMillis) {
    8. this.nextOffset = nextOffset;
    9. // 设置限定时间:当前时间 + 超时时间
    10. this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
    11. }
    12. public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
    13. // 结束刷盘,设置刷盘状态
    14. this.flushOKFuture.complete(putMessageStatus);
    15. }
    16. public CompletableFuture future() {
    17. // 返回刷盘状态
    18. return flushOKFuture;
    19. }
    20. }
    21. }

    GroupCommitService处理刷盘

    GroupCommitService是CommitLog的内部类,从继承关系中可知它实现了Runnable接口,在run方法调用waitForRunning等待刷盘请求的提交,然后处理刷盘,不过这个线程是在什么时候启动的呢?

    1. public class CommitLog {
    2. /**
    3. * GroupCommit Service
    4. */
    5. class GroupCommitService extends FlushCommitLogService {
    6. // ...
    7. // run方法
    8. public void run() {
    9. CommitLog.log.info(this.getServiceName() + " service started");
    10. while (!this.isStopped()) {
    11. try {
    12. // 等待刷盘请求的到来
    13. this.waitForRunning(10);
    14. // 处理刷盘
    15. this.doCommit();
    16. } catch (Exception e) {
    17. CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
    18. }
    19. }
    20. // ...
    21. }
    22. }
    23. }

    刷盘线程的启动

    在BrokerController的启动方法中,可以看到调用了messageStore的start方法,前面可知使用的是DefaultMessageStore,进入到DefaultMessageStore的start方法,它又调用了commitLog的start方法,在CommitLog的start方法中,启动了刷盘的线程和监控刷盘的线程:

    1. public class BrokerController {
    2. public void start() throws Exception {
    3. if (this.messageStore != null) {
    4. // 启动
    5. this.messageStore.start();
    6. }
    7. // ...
    8. }
    9. }
    10. public class DefaultMessageStore implements MessageStore {
    11. /**
    12. * @throws Exception
    13. */
    14. public void start() throws Exception {
    15. // ...
    16. this.flushConsumeQueueService.start();
    17. // 调用CommitLog的启动方法
    18. this.commitLog.start();
    19. this.storeStatsService.start();
    20. // ...
    21. }
    22. }
    23. public class CommitLog {
    24. private final FlushCommitLogService flushCommitLogService; // 刷盘
    25. private final FlushDiskWatcher flushDiskWatcher; // 监控刷盘
    26. private final FlushCommitLogService commitLogService; // commitLogService
    27. public void start() {
    28. // 启动刷盘的线程
    29. this.flushCommitLogService.start();
    30. flushDiskWatcher.setDaemon(true);
    31. // 启动监控刷盘的线程
    32. flushDiskWatcher.start();
    33. if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    34. this.commitLogService.start();
    35. }
    36. }
    37. }

    刷盘请求的处理

    既然知道了线程在何时启动的,接下来详细看一下GroupCommitService是如何处理刷盘提交请求的。

    前面知道在GroupCommitService的run方法中,调用了waitForRunning方法等待刷盘请求,waitForRunning在GroupCommitService父类ServiceThread中实现。ServiceThread是一个抽象类,实现了Runnable接口,里面使用了CountDownLatch进行线程间的通信,大小设为1。

    waitForRunning方法在进入的时候先判断hasNotified是否为true(已通知),并尝试将其更新为false(未通知),由于hasNotified的初始化值为false,所以首次进入的时候条件不成立,不会进入到这个处理逻辑,会继续执行后面的代码。

    接着调用 waitPoint的reset方法将其重置为1,并调用waitPoint的await方法进行等待:

    1. // ServiceThread
    2. public abstract class ServiceThread implements Runnable {
    3. // 是否通知,初始化为false
    4. protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    5. // CountDownLatch用于线程间的通信
    6. protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    7. // 等待运行
    8. protected void waitForRunning(long interval) {
    9. // 判断hasNotified是否为true,并尝试将其更新为false
    10. if (hasNotified.compareAndSet(true, fa
  • 相关阅读:
    计算机视觉的相机选型
    DX-11Q信号继电器
    内网综合扫描工具-fscan的安装和使用
    Flink(五)【DataStream 转换算子(上)】
    如何用python高效摸鱼?日常办公小技巧GET!
    目标检测——UCF50动作识别数据集
    MySQL高级:函数(二)自定义函数
    阿里云体验有奖:使用PolarDB-X与Flink搭建实时数据大屏
    记一次阿里云oss文件上传服务假死
    web实现usb扫码枪读取二维码数据功能
  • 原文地址:https://blog.csdn.net/Java_zhujia/article/details/128146349