CommitLog的asyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略:
- public class CommitLog {
- public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
- // ...
- try {
- // 获取上一次写入的文件
- MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- // ...
- // 写入消息
- result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
- // ...
- } finally {
- beginTimeInLock = 0;
- putMessageLock.unlock();
- }
- // ...
- // 执行刷盘
- CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
- // ...
- }
- }
刷盘有两种策略:
- public class CommitLog {
- // 监控刷盘
- private final FlushDiskWatcher flushDiskWatcher;
- public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
- // 是否是同步刷盘
- if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- // 获取GroupCommitService
- final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
- // 是否等待
- if (messageExt.isWaitStoreMsgOK()) {
- // 构建组提交请求,传入本次刷盘后位置的偏移量:写入位置偏移量+写入数据字节数
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
- this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
- // 添加到wather中
- flushDiskWatcher.add(request);
- // 添加到service
- service.putRequest(request);
- // 返回
- return request.future();
- } else {
- service.wakeup();
- return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
- }
- }
- // 如果是异步刷盘
- else {
- if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- flushCommitLogService.wakeup();
- } else {
- commitLogService.wakeup();
- }
- return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
- }
- }
- }
如果使用的是同步刷盘,首先获取了GroupCommitService,然后构建GroupCommitRequest组提交请求,将请求添加到flushDiskWatcher和GroupCommitService中,其中flushDiskWatcher用于监控刷盘是否超时,GroupCommitService用于提交刷盘数据。
构建GroupCommitRequest提交请求
GroupCommitRequest是CommitLog的内部类:
- public class CommitLog {
- public static class GroupCommitRequest {
- private final long nextOffset;
- // 刷盘状态
- private CompletableFuture
flushOKFuture = new CompletableFuture<>(); - private final long deadLine;// 刷盘的限定时间,超过限定时间还未刷盘完毕会被认为超时
-
- public GroupCommitRequest(long nextOffset, long timeoutMillis) {
- this.nextOffset = nextOffset;
- // 设置限定时间:当前时间 + 超时时间
- this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
- }
-
- public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
- // 结束刷盘,设置刷盘状态
- this.flushOKFuture.complete(putMessageStatus);
- }
-
- public CompletableFuture
future() { - // 返回刷盘状态
- return flushOKFuture;
- }
-
- }
- }
GroupCommitService处理刷盘
GroupCommitService是CommitLog的内部类,从继承关系中可知它实现了Runnable接口,在run方法调用waitForRunning等待刷盘请求的提交,然后处理刷盘,不过这个线程是在什么时候启动的呢?
- public class CommitLog {
- /**
- * GroupCommit Service
- */
- class GroupCommitService extends FlushCommitLogService {
- // ...
- // run方法
- public void run() {
- CommitLog.log.info(this.getServiceName() + " service started");
- while (!this.isStopped()) {
- try {
- // 等待刷盘请求的到来
- this.waitForRunning(10);
- // 处理刷盘
- this.doCommit();
- } catch (Exception e) {
- CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
- // ...
- }
- }
- }
在BrokerController的启动方法中,可以看到调用了messageStore的start方法,前面可知使用的是DefaultMessageStore,进入到DefaultMessageStore的start方法,它又调用了commitLog的start方法,在CommitLog的start方法中,启动了刷盘的线程和监控刷盘的线程:
- public class BrokerController {
- public void start() throws Exception {
- if (this.messageStore != null) {
- // 启动
- this.messageStore.start();
- }
- // ...
- }
- }
-
- public class DefaultMessageStore implements MessageStore {
- /**
- * @throws Exception
- */
- public void start() throws Exception {
- // ...
- this.flushConsumeQueueService.start();
- // 调用CommitLog的启动方法
- this.commitLog.start();
- this.storeStatsService.start();
- // ...
- }
- }
-
- public class CommitLog {
- private final FlushCommitLogService flushCommitLogService; // 刷盘
- private final FlushDiskWatcher flushDiskWatcher; // 监控刷盘
- private final FlushCommitLogService commitLogService; // commitLogService
- public void start() {
- // 启动刷盘的线程
- this.flushCommitLogService.start();
- flushDiskWatcher.setDaemon(true);
- // 启动监控刷盘的线程
- flushDiskWatcher.start();
- if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- this.commitLogService.start();
- }
- }
- }
既然知道了线程在何时启动的,接下来详细看一下GroupCommitService是如何处理刷盘提交请求的。
前面知道在GroupCommitService的run方法中,调用了waitForRunning方法等待刷盘请求,waitForRunning在GroupCommitService父类ServiceThread中实现。ServiceThread是一个抽象类,实现了Runnable接口,里面使用了CountDownLatch进行线程间的通信,大小设为1。
waitForRunning方法在进入的时候先判断hasNotified是否为true(已通知),并尝试将其更新为false(未通知),由于hasNotified的初始化值为false,所以首次进入的时候条件不成立,不会进入到这个处理逻辑,会继续执行后面的代码。
接着调用 waitPoint的reset方法将其重置为1,并调用waitPoint的await方法进行等待:
- // ServiceThread
- public abstract class ServiceThread implements Runnable {
- // 是否通知,初始化为false
- protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
-
- // CountDownLatch用于线程间的通信
- protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
-
- // 等待运行
- protected void waitForRunning(long interval) {
- // 判断hasNotified是否为true,并尝试将其更新为false
- if (hasNotified.compareAndSet(true, fa