• 浅析RocketMQ-broker接收消息


    本文将主要介绍broker消息接收的流程

    1. 生产者发送请求

    开始broker接收前,先看看请求是怎么发送的。
    接着上篇文章的内容,核心逻辑在MQClientAPIImpl.sendMessage()里面

        public SendResult sendMessage(...) {
            long beginStartTime = System.currentTimeMillis();
            RemotingCommand request = null;
            String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
            boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
            // 1.初始化RemotingCommand对象
            if (isReply) {
                if (sendSmartMsg) {
                    SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                    request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
                } else {
                    request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
                }
            } else {
                if (sendSmartMsg || msg instanceof MessageBatch) {
                    SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                    request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
                } else {
                    request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
                }
            }
            // 持有请求消息
            request.setBody(msg.getBody());
    		// 2.根据请求类型发送对应方法
            switch (communicationMode) {
                case ONEWAY:
                    this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                    return null;
                case ASYNC:
                    this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, context, producer);
                    return null;
                case SYNC:
                    return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
                default:
                    break;
            }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    方法整体主要分成两部分

    • 在注释1处,进行初始化通用的RemotingCommand请求对象,这里有个点要注意,这里设置了RemotingCommand的code类型,broker接收到消息这个code执行具体逻辑
    • 在注释2处,根据请求的类型再区分执行相应方法,主要是执行发送请求前后进行相应的hook过滤之类的操作等

    2. 注册处理器

    1.initialize

    broker 启动时,会调用BrokerContoller的initialize方法,在这个方法内部会进行初始化操作,其中一步就是调用registerProcessor,注册处理器,代码如下:

    public void registerProcessor() {
            /**
             * SendMessageProcessor
             */
            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    		// 
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    		....
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    会将RequestCode和对应的Processor,执行线程池进行关联,存到一个Map中。这个map称为processorTable,remotingServer对象会持有它。

    2.start

    initialize执行完,接着执行BrokerContoller的start方法。

        public void   start() {
            ....,,
    		// 1.实例化公共的handle
            prepareSharableHandlers();
    		
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                	....
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                .addLast(defaultEventExecutorGroup,
                                    encoder,
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    connectionManageHandler,
                                    // 2.将serverHandler添加到pipeline中
                                    serverHandler
                                );
                        }
                    });
      		......
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    这里有两个注意点:

    1. prepareSharableHandlers方法会实例化一些公共的handle,包括了加密的处理器encoder和根据请求编码调用对应处理器的serverHandler等
    2. serverHandler是一个处理输入流的handle,有请求过来会调用channelRead0,最终会执行的NettyRemotingAbstract.processRequestCommand方法,代码如下:
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
            final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
           ...
           
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这个方法第一步就是根据RequestCode查找Processor,这些处理器都是之前在registerProcessor注册的,都不匹配则使用默认的处理器。在获取Processor之后,调用processRequest执行具体的逻辑。

    3. broker接收消息

    根据上面的分析,一个普通请求过来由SendMessageProcessor处理,并执行processRequest方法。最终流转到asyncSendMessage方法。

            private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                    SendMessageContext mqtraceContext,
                                                                    SendMessageRequestHeader requestHeader) {
            // 构造返回对象                                                        
            final RemotingCommand response = preSend(ctx, request, requestHeader);
            final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
    		... 
    		
            CompletableFuture<PutMessageResult> putMessageResult = null;
            String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            // 判断是否是事务类型
            if (transFlag != null && Boolean.parseBoolean(transFlag)) {
             	...
                putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
            } else {
            	// 消息存储
                putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
            }
            return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    asyncSendMessage方法分成三部分,上半部分构建返回对象,中间部分构建msgInner参数,并执行asyncPutMessage操作,下半部分根据结果设置返回数据。

    1. asyncPutMessage

    接着跳转来到CommitLog的asyncPutMessage

        public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    		... 
            AppendMessageResult result = null;
    
            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
            String topic = msg.getTopic();
    
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                // 如果是延迟消息的处理
                if (msg.getDelayTimeLevel() > 0) {
                	// 延迟消息最大有18个级别,超出最大级别则使用最大的
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
    				// 延迟消息主题
                    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                    // 根据延迟级别获取对应的延迟队列id
                    int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
    				// 保存消息原始的主题和队列id
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }
    
            ...
            // 这里的PutMessageThreadLocal对象是在broker启动时初始化的,所以不会是null
            PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
            // 这里会将消息按照一个格式存入ByteBuffer,之后会将ByteBuffer的数据持久到CommitLog文件中
            PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
            ...
            msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
            PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
            
            MappedFile unlockMappedFile = null;
    
            putMessageLock.lock();
            try {
            	// 获取最新的CommitLog文件
                MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
               	// ...省略不关注的设置时间
               	// 首次启动或者最新文件没有写入空间,则新建一个CommitLog文件
                if (null == mappedFile || mappedFile.isFull()) {
                	// 这个操作包含两个过程:1.计算要创建的CommitLog的偏移量 2.创建一个CommitLog文件
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
                }
    			// 追加消息
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                switch (result.getStatus()) {
                    case PUT_OK:
                        break;
                    case END_OF_FILE:
                    	// 进入这个分支,说明CommitLog剩余空间写不下了,要新建一个CommitLog写入内容
                        unlockMappedFile = mappedFile;
                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                   	... // 省略其他情况的处理
                }
            } finally {
                putMessageLock.unlock();
            }
    
            ...
            PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);			
    		... 
    		// 这里进行提交刷消息的的操作
            CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
            // 这是主从同步数据的操作
            CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
            return flushResultFuture.thenCombine(... );
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    2. appendMessage

    appendMessage 实际调用appendMessagesInner方法。方法内部会生成一个ByteBuffer ,并将msg数据写入到这个ByteBuffer 中

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
                PutMessageContext putMessageContext) {
            int currentPos = this.wrotePosition.get();
    		// 可写位置是否小于文件大小
            if (currentPos < this.fileSize) {
            	// slice操作生成一个共享的内存区,但是这个ByteBuffer 有单独的position等信息
                ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
                byteBuffer.position(currentPos);
                AppendMessageResult result;
                if (messageExt instanceof MessageExtBrokerInner) {
                	// getFileFromOffset 获取CommitLog文件的偏移量,其数值即为这个文件名大小
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                            (MessageExtBrokerInner) messageExt, putMessageContext);
                } else if (messageExt instanceof MessageExtBatch) {
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                            (MessageExtBatch) messageExt, putMessageContext);
                }
                ...
                return result;
            }
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
                // 物理偏移量,可以理解为这个消息在CommitLog中偏移了多少字节位置
                long wroteOffset = fileFromOffset + byteBuffer.position();
    			// 消息id
                Supplier<String> msgIdSupplier = () -> {
                    int sysflag = msgInner.getSysFlag();
                    int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
                    ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
                    MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
                    msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
                    msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
                    return UtilAll.bytes2string(msgIdBuffer.array());
                };
    
                ...
                // 下面的操作是补全之前putMessageThreadLocal.getEncoder().encode中缺少的序列化数据
                ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
                final int msgLen = preEncodeBuffer.getInt(0);
    
                // 判断剩余空间是否足够写入数据,END_FILE_MIN_BLANK_LENGTH是每个文件结尾的预留空间
                if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                    this.msgStoreItemMemory.clear();
                    // 1 TOTALSIZE
                    this.msgStoreItemMemory.putInt(maxBlank);
                    // 2 MAGICCODE
                    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                    // 3 The remaining space may be any value
                    // 这里是说虽然只写了 8个bytes,但声明它已经写满了
                    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
                            maxBlank,
                            msgIdSupplier, msgInner.getStoreTimestamp(),
                            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                }
    			// 下面几步都是把对应数据写到对应的ByteBuffer 位置
                int pos = 4 + 4 + 4 + 4 + 4;
                // 6 QUEUEOFFSET
                preEncodeBuffer.putLong(pos, queueOffset);
                pos += 8;
                // 7 PHYSICALOFFSET
                preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
                int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
                // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
                pos += 8 + 4 + 8 + ipLen;
                // refresh store time stamp in lock
                preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
    
                // 将完整的数据放入byteBuffer中
                byteBuffer.put(preEncodeBuffer);
                msgInner.setEncodedBuff(null);
                AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
                    msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    			...
                return result;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    3. submitFlushRequest

    上面的操作实际只把msg写入到了byteBuffer中,并没有写入到CommitLog,实际刷消息是通过commitLogService这些线程服务实现的

        public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
            // 同步刷盘
            if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (messageExt.isWaitStoreMsgOK()) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                            this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    flushDiskWatcher.add(request);
                    service.putRequest(request);
                    return request.future();
                } else {
                    service.wakeup();
                    return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
                }
            }
            // 异步刷盘
            else {
                if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    flushCommitLogService.wakeup();
                } else  {
                    commitLogService.wakeup();
                }
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    整体流程下来了,broker接收到msg,会先按顺序放入byteBuffer中,如果是同步刷盘则等待GroupCommitService 等服务将msg存入Commitlog中,异步则提前返回结果,后续由commitLogService等服务继续持久化操作

  • 相关阅读:
    多路转接IO模型(poll和epoll)
    LeetCode热题100——图论
    SSM框架集成
    UDP通信:快速入门
    Linux——基本指令
    火爆全网!用 Pyecharts 就能做出来“迁徙图“和“轮播图“
    【Python】Python 时域到频域的变换方法
    基于springboot实现贸易行业crm系统项目【项目源码+论文说明】
    高效正则匹配工具
    操作系统MIT6.S081:P7->Interrupts
  • 原文地址:https://blog.csdn.net/qq_34789577/article/details/126613055