- public class CommitLog implements Swappable {
- // Message's MAGIC CODE daa320a7
- //commitLog的魔数
- public final static int MESSAGE_MAGIC_CODE = -626843481;
- protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- // End of file empty MAGIC CODE cbd43194
- public final static int BLANK_MAGIC_CODE = -875286124;
- /**
- * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR]
- */
- public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;
- //核心:即MqppedFilequeue是真正存储消息的地方,可以看出是由多个MappedFile组成
- protected final MappedFileQueue mappedFileQueue;
- //当前commitLog所属的MessageStore组件
- protected final DefaultMessageStore defaultMessageStore;
- //进行flush的组件
- private final FlushManager flushManager;
- //不常用的数据的检查组件
- private final ColdDataCheckService coldDataCheckService;
- //真正写入数据的组件
- private final AppendMessageCallback appendMessageCallback;
- private final ThreadLocal
putMessageThreadLocal; -
- protected volatile long confirmOffset = -1L;
- private volatile long beginTimeInLock = 0;
- protected final PutMessageLock putMessageLock;
- protected final TopicQueueLock topicQueueLock;
- private volatile Set
fullStorePaths = Collections.emptySet(); - //刷新磁盘的监视器
- private final FlushDiskWatcher flushDiskWatcher;
- //commitlog大小,默认为1gb
- protected int commitLogSize;
- private final boolean enabledAppendPropCRC;
- //多路分发器,rocketmq为了支持mutt协议的组件
- protected final MultiDispatch multiDispatch;
- }
- public CommitLog(final DefaultMessageStore defaultMessageStore) {
- String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
- //初始化messagequeue,这个时候文件会与直接内存建立映射关系,并且完成文件预热
- if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
- this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
- defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
- } else {
- this.mappedFileQueue = new MappedFileQueue(storePath,
- defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- defaultMessageStore.getAllocateMappedFileService());
- }
- this.defaultMessageStore = defaultMessageStore;
- //如果是同步的时候,采用GroupCommitService进行flush
- if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- this.flushCommitLogService = new GroupCommitService();
- } else {
- //如果是异步刷盘,采用FlushRealTimeService进行flush
- this.flushCommitLogService = new FlushRealTimeService();
- }
- //开启瞬时缓存池技术的话,采用CommitRealTimeService进行commit
- this.commitLogService = new CommitRealTimeService();
- this.appendMessageCallback = new DefaultAppendMessageCallback();
- //每个线程都有一个消息的编码器,MessageExtEncoder负责将消息进行编码到bytebuffer中
- putMessageThreadLocal = new ThreadLocal
() { - @Override
- protected PutMessageThreadLocal initialValue() {
- return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
- }
- };
- this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
- //rocketmq实现,mutt协议的组件,它可以让消息只有一个commitlog但是分发到多个queue中
- this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
- //当采用同步刷盘的时候,调用线程会阻塞等待刷盘结果,FlushDiskWatcher会检测每一个flush请求,如果超过完成时间
- //便会自动的唤醒调用线程,防止调用线程阻塞过久的场景出现
- flushDiskWatcher = new FlushDiskWatcher();
- }
mappedfilequeue是commitLog的核心,在前面mappedfilequeue中已经仔细分析过,这里不再赘述。详情请看:深度解析RocketMq源码-持久化组件(二) MappedFileQueue-CSDN博客
- @Override
- public void run() {
- while (!isStopped()) {
- GroupCommitRequest request = null;
- try {
- //获取提交的flush请求
- request = commitRequests.take();
- } catch (InterruptedException e) {
- log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
- continue;
- }
- //
- while (!request.future().isDone()) {
- long now = System.nanoTime();
- //如果超过了请求的超时时间
- if (now - request.getDeadLine() >= 0) {
- //返回刷盘超时
- request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
- break;
- }
- // To avoid frequent thread switching, replace future.get with sleep here,
- long sleepTime = (request.getDeadLine() - now) / 1_000_000;
- sleepTime = Math.min(10, sleepTime);
- if (sleepTime == 0) {
- request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
- break;
- }
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- log.warn(
- "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
- break;
- }
- }
- }
- }
- public CompletableFuture
asyncPutMessage(final MessageExtBrokerInner msg) { - // Set the storage time
- //设置存储时间戳
- msg.setStoreTimestamp(System.currentTimeMillis());
- // Set the message body BODY CRC (consider the most appropriate setting
- // on the client)
- //将消息体进行crc编码,并存储到bodyCrc中
- msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
- // Back to Results
- AppendMessageResult result = null;
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
- //获取到对应的topic
- String topic = msg.getTopic();
- // int queueId msg.getQueueId();
- //如果是事务消息,获取到事务类型prepared或者commit或者rollback
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- //如果是非事务消息或者commit消息
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- // Delay Delivery
- if (msg.getDelayTimeLevel() > 0) {
- //如果是延迟消息的话,需要校验延迟消息的最大延迟是否在mq的要求的范围内
- if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- //并且将消息的延迟级别获取到延迟消息需要放入的queueid
- int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
- // Backup real topic, queueId
- //并且设置消息真正放入的地方也就是SCHEDULE_TOPIC_XXXX中
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
- //设置存储的时间戳
- InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
- if (bornSocketAddress.getAddress() instanceof Inet6Address) {
- msg.setBornHostV6Flag();
- }
- InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
- if (storeSocketAddress.getAddress() instanceof Inet6Address) {
- msg.setStoreHostAddressV6Flag();
- }
- PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
- updateMaxMessageSize(putMessageThreadLocal);
- if (!multiDispatch.isMultiDispatchMsg(msg)) {
- //获取到消息的编码器,并且编码,因为encode为一个公共类,为了防止锁竞争,所以放入到threadLocal中
- PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
- if (encodeResult != null) {
- return CompletableFuture.completedFuture(encodeResult);
- }
- msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
- }
- //构建消息上下文中
- PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
- long elapsedTimeInLock = 0;
- MappedFile unlockMappedFile = null;
- //开始写入消息
- putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
- try {
- MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
- this.beginTimeInLock = beginLockTimestamp;
- // Here settings are stored timestamp, in order to ensure an orderly
- // global
- msg.setStoreTimestamp(beginLockTimestamp);
- if (null == mappedFile || mappedFile.isFull()) {
- //获取到最后一个mappedfile
- mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
- }
- if (null == mappedFile) {
- log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
- }
- //调用mappedfile的appendMessage方法写入消息
- result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
- switch (result.getStatus()) {
- case PUT_OK:
- break;
- case END_OF_FILE:
- unlockMappedFile = mappedFile;
- // Create a new file, re-write the message
- mappedFile = this.mappedFileQueue.getLastMappedFile(0);
- if (null == mappedFile) {
- // XXX: warn and notify me
- log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
- }
- result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
- break;
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
- default:
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
- }
- elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
- } finally {
- beginTimeInLock = 0;
- putMessageLock.unlock();
- }
- if (elapsedTimeInLock > 500) {
- log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
- }
- if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
- this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
- }
- PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
- // Statistics
- storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
- storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
- CompletableFuture
flushResultFuture = submitFlushRequest(result, msg); - //设置返回结果
- CompletableFuture
replicaResultFuture = submitReplicaRequest(result, msg); - return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
- if (flushStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(flushStatus);
- }
- if (replicaStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(replicaStatus);
- }
- return putMessageResult;
- });
- }
- /**
- * @param result 向mappedfile中追加消息的结果
- * @param messageExt 具体的消息内容已经协议头等
- * @return
- */
- public CompletableFuture
submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { - // Synchronization flush
- //如果采用同步刷盘
- if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- //采用的是GroupCommitService进行刷盘
- final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
- //默认是需要等待消息刷盘成功的
- if (messageExt.isWaitStoreMsgOK()) {
- //构建刷盘请求,包括当前消息的在buffer中的写指针
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
- this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
- flushDiskWatcher.add(request);
- //调用GroupCommitService的putRequest推送刷盘请求
- service.putRequest(request);
- //当前线程阻塞等待刷盘结果
- return request.future();
- } else {
- //如果不需要等待刷盘线程成功,便直接唤醒刷盘线程,并且返回成功(此时也可能有丢失数据的风险)
- service.wakeup();
- return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
- }
- }
- // Asynchronous flush
- else {
- //如果采用异步刷盘策略
- if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- //如果未采用了瞬时缓存池技术,便唤醒flush服务线程
- flushCommitLogService.wakeup();
- } else {
- //如果未采用了瞬时缓存池技术,便唤醒commit服务线程
- commitLogService.wakeup();
- }
- //返回成功
- return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
- }
- }
- public CompletableFuture
submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) { - //如果节点为主节点,并且为同步复制
- if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
- HAService service = this.defaultMessageStore.getHaService();
- //如果消息已经同步完毕
- if (messageExt.isWaitStoreMsgOK()) {
- if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
- this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
- //会调用HAserver的putRequest向里面发送复制请求
- service.putRequest(request);
- service.getWaitNotifyObject().wakeupAll();
- return request.future();
- }
- else {
- return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
- }
- }
- }
- return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
- }
- public void run() {
- + " service started");
- while (!this.isStopped()) {
- try {
- Set
selected = this.selector.selectedKeys(); -
- if (selected != null) {
- for (SelectionKey k : selected) {
- if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
- //监听多路复用的请求
- SocketChannel sc = ((ServerSocketChannel);
- if (sc != null) {
-"HAService receive new connection, "
- + sc.socket().getRemoteSocketAddress());
- try {
- //拿到与从节点的channel过后,并且凑早HAconnection
- HAConnection conn = new HAConnection(HAService.this, sc);
- conn.start();
- //加入到链接池connectionList中
- HAService.this.addConnection(conn);
- } catch (Exception e) {
- log.error("new HAConnection exception", e);
- sc.close();
- }
- }
- } else {
- log.warn("Unexpected ops in select " + k.readyOps());
- }
- }
- selected.clear();
- }
- } catch (Exception e) {
- log.error(this.getServiceName() + " service has exception.", e);
- }
- }
- + " service end");
- }
- public void run() {
- + " service started");
- while (!this.isStopped()) {
- try {
- this.waitForRunning(10);
- this.doWaitTransfer();
- } catch (Exception e) {
- log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
- + " service end");
- }
- private boolean processReadEvent() {
- int readSizeZeroTimes = 0;
- while (this.byteBufferRead.hasRemaining()) {
- try {
- //从网络中读取消息到byteBufferRead中
- int readSize =;
- if (readSize > 0) {
- readSizeZeroTimes = 0;
- //将bytebuffer中的数据写入到磁盘中
- boolean result = this.dispatchReadRequest();
- if (!result) {
- log.error("HAClient, dispatchReadRequest error");
- return false;
- }
- } else if (readSize == 0) {
- if (++readSizeZeroTimes >= 3) {
- break;
- }
- } else {
-"HAClient, processReadEvent read socket < 0");
- return false;
- }
- } catch (IOException e) {
-"HAClient, processReadEvent read socket exception", e);
- return false;
- }
- }
- return true;
- }
- private boolean dispatchReadRequest() {
- //获取到消息头大小
- final int msgHeaderSize = 8 + 4; // phyoffset + size
- while (true) {
- int diff = this.byteBufferRead.position() - this.dispatchPosition;
- //获取到完整的消息内容
- if (diff >= msgHeaderSize) {
- //获取到master的物理偏移量
- long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
- //获取到消息体
- int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
- //当前同步的最大偏移量
- long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
- if (slavePhyOffset != 0) {
- //如果从节点偏移量和主节点偏移量不想当便直接返回
- if (slavePhyOffset != masterPhyOffset) {
- log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
- + slavePhyOffset + " MASTER: " + masterPhyOffset);
- return false;
- }
- }
- if (diff >= (msgHeaderSize + bodySize)) {
- byte[] bodyData = byteBufferRead.array();
- //获取到消息体开始的指针
- int dataStart = this.dispatchPosition + msgHeaderSize;
- //调用mappedfile的appendToCommitLog方法直接写入待commitLog中
- HAService.this.defaultMessageStore.appendToCommitLog(
- masterPhyOffset, bodyData, dataStart, bodySize);
- //更新同步偏移量
- this.dispatchPosition += msgHeaderSize + bodySize;
- if (!reportSlaveMaxOffsetPlus()) {
- return false;
- }
- continue;
- }
- }
- if (!this.byteBufferRead.hasRemaining()) {
- this.reallocateByteBuffer();
- }
- break;
- }
- return true;
- }