RocketMQ消息消费者是如何启动的,有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,
那具体怎么做,让我们我们跟着源码来学习一下~

这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK

入口:this.pullMessageService.start();
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
声明一个阻塞队列用来存放 PullRequest 对象
PullRequest 用于消息拉取任务,如果 pullRequestQueue 为空则会阻塞,直到拉取任务被放入
- private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
- 复制代码
将 stopped 用volatile来修饰,每次执行的时候都检测stopped的状态,线程只要修改了这个状态,其余线程就会马上知道
- protected volatile boolean stopped = false;
- @Override
- public void run() {
- log.info(this.getServiceName() + " service started");
- // 判断启动状态
- while (!this.isStopped()) {
- try {
- // 取出一个PullRequest对象
- PullRequest pullRequest = this.pullRequestQueue.take();
- this.pullMessage(pullRequest);
- } catch (InterruptedException ignored) {
- } catch (Exception e) {
- log.error("Pull Message Service Run Method exception", e);
- }
- }
-
- log.info(this.getServiceName() + " service end");
- }
- 复制代码
PullMessageService 从消息服务器默认拉取32条消息,按消息的偏移量顺序存放在 ProcessQueue 队列
- final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
- 复制代码
入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
- // 获取消费队列快照
- final ProcessQueue processQueue = pullRequest.getProcessQueue();
- if (processQueue.isDropped()) {
- log.info("the pull request[{}] is dropped.", pullRequest.toString());
- return;
- }
-
- // 设置最后一次拉取时间
- pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
- 复制代码
- // 校验状态
- this.makeSureStateOK();
- private void makeSureStateOK() throws MQClientException {
- if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The consumer service state not OK, "
- + this.serviceState
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- }
- }
- 复制代码
如果消费者状态不正确,则抛出异常,启动定时线程池过段时间回收 PullRequest 对象,以便pullMessageService能及时唤醒并再次执行消息拉取,这个逻辑在多个地方使用到了
- public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
- if (!isStopped()) {
- this.scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- PullMessageService.this.executePullRequestImmediately(pullRequest);
- }
- }, timeDelay, TimeUnit.MILLISECONDS);
- } else {
- log.warn("PullMessageServiceScheduledThread has shutdown");
- }
- }
- 复制代码
- public void executePullRequestImmediately(final PullRequest pullRequest) {
- try {
- // 最后将pullRequest放入pullRequestQueue中
- this.pullRequestQueue.put(pullRequest);
- } catch (InterruptedException e) {
- log.error("executePullRequestImmediately pullRequestQueue.put", e);
- }
- }
- 复制代码
如果触发流量控制,则延迟拉取消息,先将 PullRequest 对象进行回收,以便pullMessageService能及时唤醒并再次执行消息拉取
- // 缓存消息条数
- long cachedMessageCount = processQueue.getMsgCount().get();
- // 缓存消息的大小
- long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
- // 当队列中的消息跳过,超过设置 则延迟拉取消息
- if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
- this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
- }
- return;
- }
- 复制代码
- if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(
- "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
- this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
- }
- return;
- }
- 复制代码
这里通过查询 subscriptionInner Map容器,利用主题来获取对应的订阅关系,如果没有找到对应的订阅关系,则延迟拉取消息,先将 PullRequest 对象进行回收以便 pullMessageService 能及时唤醒并再次执行消息拉取
- protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
- new ConcurrentHashMap<String, SubscriptionData>();
- 复制代码
- final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (null == subscriptionData) {
- this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- log.warn("find the consumer's subscription failed, {}", pullRequest);
- return;
- }
- 复制代码
通过消费者启动的模块中,我们知道RocketMQ是根据不同模式,将消息进度存储在不同的地方
广播模式:消息进度存储在本地文件
集群模式:消息进度存储在Broker 服务器上
- boolean commitOffsetEnable = false;
- long commitOffsetValue = 0L;
- if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
- // 从内存中读取位置
- commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
- 复制代码
入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
- public PullResult pullKernelImpl(
- final MessageQueue mq,
- final String subExpression,
- final String expressionType,
- final long subVersion,
- final long offset,
- final int maxNums,
- final int sysFlag,
- final long commitOffset,
- final long brokerSuspendMaxTimeMillis,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final PullCallback pullCallback
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
- 复制代码
我们看到他有非常多的参数

- // step 1 通过BrokerName找到对应的Broker
- FindBrokerResult findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- 复制代码
- // step 2 如果没有找到对应的,则更新路由信息
- if (null == findBrokerResult) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- }
- 复制代码
- // check version
- if (!ExpressionType.isTagType(expressionType)
- && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
- throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
- + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
- }
- 复制代码
- PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
- requestHeader.setConsumerGroup(this.consumerGroup);
- requestHeader.setTopic(mq.getTopic());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setQueueOffset(offset);
- requestHeader.setMaxMsgNums(maxNums);
- requestHeader.setSysFlag(sysFlagInner);
- requestHeader.setCommitOffset(commitOffset);
- requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
- requestHeader.setSubscription(subExpression);
- requestHeader.setSubVersion(subVersion);
- requestHeader.setExpressionType(expressionType);
- 复制代码
- PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
- brokerAddr,
- requestHeader,
- timeoutMillis,
- communicationMode,
- pullCallback);
- 复制代码
因为 CommunicationMode 传递的是ASYNC,我们着重来看一下这个方法
入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
调用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()
这里我们就先不细看了
- // 处理pullResult数据
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
- 复制代码
主要做了三件事,转换消息格式、设置消息信息、放入msgFoundList
将pullResult 转成 PullResultExt,转换消息格式为List
- PullResultExt pullResultExt = (PullResultExt) pullResult;
-
- // 转换消息格式为List
- ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
- List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
- 复制代码
执行消息过滤,匹配符合的tag
- if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
- msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
- for (MessageExt msg : msgList) {
- if (msg.getTags() != null) {
- if (subscriptionData.getTagsSet().contains(msg.getTags())) {
- msgListFilterAgain.add(msg);
- }
- }
- }
- }
- 复制代码
设置消息的transactionId、扩展属性、BrokerName名称,放入List中
- for (MessageExt msg : msgListFilterAgain) {
- String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (Boolean.parseBoolean(traFlag)) {
- msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
- }
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
- Long.toString(pullResult.getMinOffset()));
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
- Long.toString(pullResult.getMaxOffset()));
- msg.setBrokerName(mq.getBrokerName());
- }
-
- pullResultExt.setMsgFoundList(msgListFilterAgain);
- 复制代码
- // 获取第一条消息的offset
- firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
- 复制代码
- boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- 复制代码
主要做了两件事,循环读取消息list,存入msgTreeMap和计算此次读取信息偏移量
- public boolean putMessage(final List<MessageExt> msgs) {
- boolean dispatchToConsume = false;
- try {
- // 上锁
- this.treeMapLock.writeLock().lockInterruptibly();
- try {
- int validMsgCnt = 0;
- // 循环读取消息list,存入msgTreeMap
- for (MessageExt msg : msgs) {
- MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
- if (null == old) {
- validMsgCnt++;
- this.queueOffsetMax = msg.getQueueOffset();
- msgSize.addAndGet(msg.getBody().length);
- }
- }
- msgCount.addAndGet(validMsgCnt);
-
- if (!msgTreeMap.isEmpty() && !this.consuming) {
- dispatchToConsume = true;
- this.consuming = true;
- }
-
- if (!msgs.isEmpty()) {
- // 获取最后一条消息
- MessageExt messageExt = msgs.get(msgs.size() - 1);
- // 获取最大偏移量
- String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
- ...
- }
- } finally {
- this.treeMapLock.writeLock().unlock();
- }
- }
- ...
- }
- 复制代码
- // 提交消费请求,消息提交到内部的线程池
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
- pullResult.getMsgFoundList(),
- processQueue,
- pullRequest.getMessageQueue(),
- dispatchToConsume);
- 复制代码
入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest
获取 ConsumeRequest对象,拿到当前主题的监听器
这里拿到的监听器,就是我们在启动消费者的时候所注册的,监听到消息后执行相关的业务逻辑
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, - ...
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- 复制代码
在这里触发我们在一开始重写的consumeMessage方法,这里msgs用Collections.unmodifiableList进行包装,意思就是不可以修改的,是一个只读的List
- ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
-
- status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
- 复制代码
- ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
- 复制代码
- public void processConsumeResult(
- final ConsumeConcurrentlyStatus status,
- final ConsumeConcurrentlyContext context,
- final ConsumeRequest consumeRequest
- ) {
- int ackIndex = context.getAckIndex();
-
- if (consumeRequest.getMsgs().isEmpty())
- return;
-
- switch (status) {
- case CONSUME_SUCCESS:
- if (ackIndex >= consumeRequest.getMsgs().size()) {
- ackIndex = consumeRequest.getMsgs().size() - 1;
- }
- int ok = ackIndex + 1;
- int failed = consumeRequest.getMsgs().size() - ok;
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
- break;
- ...
- }
-
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- ...
- case CLUSTERING:
- List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- boolean result = this.sendMessageBack(msg, context);
- if (!result) {
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- msgBackFailed.add(msg);
- }
- }
- // 如果存在失败消息,则过5秒在定时执行
- if (!msgBackFailed.isEmpty()) {
- consumeRequest.getMsgs().removeAll(msgBackFailed);
-
- this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
- }
- break;
- ...
- }
-
- long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
- // 更新Offset位置
- if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- this.defaultMQPushConsumerImpl.getOffsetStore()
- .updateOffset(consumeRequest.getMessageQueue(), offset, true);
- }
- }
- 复制代码
入口:org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- 复制代码
目前只是将整体的一个消费端监听消息的流程了解清楚,里面还有许多细节需要去推敲~
