本文将主要介绍broker消息接收的流程
开始broker接收前,先看看请求是怎么发送的。
接着上篇文章的内容,核心逻辑在MQClientAPIImpl.sendMessage()里面
public SendResult sendMessage(...) {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
// 1.初始化RemotingCommand对象
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
// 持有请求消息
request.setBody(msg.getBody());
// 2.根据请求类型发送对应方法
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
break;
}
return null;
}
方法整体主要分成两部分
broker 启动时,会调用BrokerContoller的initialize方法,在这个方法内部会进行初始化操作,其中一步就是调用registerProcessor,注册处理器,代码如下:
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
//
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
....
}
会将RequestCode和对应的Processor,执行线程池进行关联,存到一个Map中。这个map称为processorTable,remotingServer对象会持有它。
initialize执行完,接着执行BrokerContoller的start方法。
public void start() {
....,,
// 1.实例化公共的handle
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
....
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
// 2.将serverHandler添加到pipeline中
serverHandler
);
}
});
......
这里有两个注意点:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
...
}
这个方法第一步就是根据RequestCode查找Processor,这些处理器都是之前在registerProcessor注册的,都不匹配则使用默认的处理器。在获取Processor之后,调用processRequest执行具体的逻辑。
根据上面的分析,一个普通请求过来由SendMessageProcessor处理,并执行processRequest方法。最终流转到asyncSendMessage方法。
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// 构造返回对象
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
...
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 判断是否是事务类型
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
...
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 消息存储
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
asyncSendMessage方法分成三部分,上半部分构建返回对象,中间部分构建msgInner参数,并执行asyncPutMessage操作,下半部分根据结果设置返回数据。
接着跳转来到CommitLog的asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
...
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 如果是延迟消息的处理
if (msg.getDelayTimeLevel() > 0) {
// 延迟消息最大有18个级别,超出最大级别则使用最大的
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 延迟消息主题
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 根据延迟级别获取对应的延迟队列id
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 保存消息原始的主题和队列id
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
...
// 这里的PutMessageThreadLocal对象是在broker启动时初始化的,所以不会是null
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 这里会将消息按照一个格式存入ByteBuffer,之后会将ByteBuffer的数据持久到CommitLog文件中
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
...
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
MappedFile unlockMappedFile = null;
putMessageLock.lock();
try {
// 获取最新的CommitLog文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// ...省略不关注的设置时间
// 首次启动或者最新文件没有写入空间,则新建一个CommitLog文件
if (null == mappedFile || mappedFile.isFull()) {
// 这个操作包含两个过程:1.计算要创建的CommitLog的偏移量 2.创建一个CommitLog文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
// 追加消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// 进入这个分支,说明CommitLog剩余空间写不下了,要新建一个CommitLog写入内容
unlockMappedFile = mappedFile;
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
... // 省略其他情况的处理
}
} finally {
putMessageLock.unlock();
}
...
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
...
// 这里进行提交刷消息的的操作
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 这是主从同步数据的操作
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(... );
}
appendMessage 实际调用appendMessagesInner方法。方法内部会生成一个ByteBuffer ,并将msg数据写入到这个ByteBuffer 中
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
int currentPos = this.wrotePosition.get();
// 可写位置是否小于文件大小
if (currentPos < this.fileSize) {
// slice操作生成一个共享的内存区,但是这个ByteBuffer 有单独的position等信息
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
// getFileFromOffset 获取CommitLog文件的偏移量,其数值即为这个文件名大小
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
}
...
return result;
}
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// 物理偏移量,可以理解为这个消息在CommitLog中偏移了多少字节位置
long wroteOffset = fileFromOffset + byteBuffer.position();
// 消息id
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
...
// 下面的操作是补全之前putMessageThreadLocal.getEncoder().encode中缺少的序列化数据
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
final int msgLen = preEncodeBuffer.getInt(0);
// 判断剩余空间是否足够写入数据,END_FILE_MIN_BLANK_LENGTH是每个文件结尾的预留空间
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// 这里是说虽然只写了 8个bytes,但声明它已经写满了
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank,
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// 下面几步都是把对应数据写到对应的ByteBuffer 位置
int pos = 4 + 4 + 4 + 4 + 4;
// 6 QUEUEOFFSET
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7 PHYSICALOFFSET
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen;
// refresh store time stamp in lock
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
// 将完整的数据放入byteBuffer中
byteBuffer.put(preEncodeBuffer);
msgInner.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
...
return result;
}
上面的操作实际只把msg写入到了byteBuffer中,并没有写入到CommitLog,实际刷消息是通过commitLogService这些线程服务实现的
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// 异步刷盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
整体流程下来了,broker接收到msg,会先按顺序放入byteBuffer中,如果是同步刷盘则等待GroupCommitService 等服务将msg存入Commitlog中,异步则提前返回结果,后续由commitLogService等服务继续持久化操作