• RocketMQ消息消费(Consumer)源码解析


    RocketMQ中消息消费以消费组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组内消费者之间有集群模式和广播模式两种消费模式。

    集群模式,主题下的同一条消息只允许被其中一个消费者消费。

    广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。

    消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。

    所谓的拉模式即PullConsumer,是消费端主动拉起拉消息请求,
    而推模式即PushConsumer,是消息达到消息服务器后,推送给消息消费者。

    下面以PushConsumer模式为例,说明RocketMQ中消息消费的具体过程。

    Consumer 例子

    1. public class Consumer {
    2. public static void main(String[] args) throws InterruptedException, MQClientException {
    3. // Instantiate with specified consumer group name.
    4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    5. // Specify name server addresses.
    6. consumer.setNamesrvAddr("localhost:9876");
    7. // Subscribe one more more topics to consume.
    8. consumer.subscribe("TopicTest", "*");
    9. // Register callback to execute on arrival of messages fetched from brokers.
    10. consumer.registerMessageListener(new MessageListenerConcurrently() {
    11. @Override
    12. public ConsumeConcurrentlyStatus consumeMessage(List msgs,
    13. ConsumeConcurrentlyContext context) {
    14. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    16. }
    17. });
    18. //Launch the consumer instance.
    19. consumer.start();
    20. System.out.printf("Consumer Started.%n");
    21. }
    22. }

    首先,初始化一个DefaultMQPushConsumer,并设置group name。然后设置订阅的topic,一个consumer可以订阅多个topic。
    设置Listener,当新消息到来时会回调consumeMessage()方法,用户通过实现这个方法来做业务逻辑处理。
    最后启动consumer,开始接收消息。
    下面我们看下Consumer的启动过程的代码。

    Consumer启动

    DefaultPushConsumerDefaultPushConsumerImpl的Facade类,启动直接调用DefaultPushConsumerImpl.start()方法

    1. public void start() throws MQClientException {
    2. switch (this.serviceState) {
    3. case CREATE_JUST:
    4. log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
    5. this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
    6. this.serviceState = ServiceState.START_FAILED;
    7. // 检查配置
    8. this.checkConfig();
    9. // Rebalance负载均衡 复制订阅数据
    10. this.copySubscription();
    11. // 设置instanceName,为一个字符串化的数字,比如10072
    12. if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    13. this.defaultMQPushConsumer.changeInstanceNameToPID();
    14. }
    15. // 获取MQClient对象,clientId为ip@instanceName,比如192.168.0.1@10072
    16. this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    17. // 设置负载均衡器
    18. this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
    19. //默认这是消费模式为集群模式,每条消息被同一组的消费者中的一个消费
    20. //还可以设置为广播模式,每条消息被同一个组的所有消费者都消费一次
    21. this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
    22. //默认是AllocateMessageQueueAveragely,均分策略
    23. this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
    24. this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    25. // 拉取API封装
    26. this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
    27. this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    28. //生成消费进度处理器,集群模式下消费进度保存在Broker上,因为同一组内的消费者要共享进度;广播模式下进度保存在消费者端
    29. if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    30. this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
    31. } else {
    32. switch (this.defaultMQPushConsumer.getMessageModel()) {
    33. case BROADCASTING:
    34. this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    35. break;
    36. case CLUSTERING:
    37. this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    38. break;
    39. default:
    40. break;
    41. }
    42. }
    43. this.offsetStore.load(); //若是广播模式,加载本地的消费进度文件
    44. // 根据监听是顺序模式还是并发模式来生成相应的ConsumerService
    45. if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    46. this.consumeOrderly = true;
    47. this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
    48. } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    49. this.consumeOrderly = false;
    50. this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
    51. }
    52. this.consumeMessageService.start();
    53. // 设置MQClient对象
    54. boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
    55. if (!registerOK) {
    56. this.serviceState = ServiceState.CREATE_JUST;
    57. this.consumeMessageService.shutdown();
    58. throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
    59. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
    60. null);
    61. }
    62. mQClientFactory.start();
    63. log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
    64. // 设置服务状态
    65. this.serviceState = ServiceState.RUNNING;
    66. break;
    67. case RUNNING:
    68. case START_FAILED:
    69. case SHUTDOWN_ALREADY:
    70. throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
    71. + this.serviceState//
    72. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
    73. null);
    74. default:
    75. break;
    76. }
    77. // 从Namesrv获取TopicRouteData,更新TopicPublishInfo和MessageQueue (在Consumer start时马上调用,之后每隔一段时间调用一次)
    78. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    79. // 向TopicRouteData里的所有Broker发送心跳,注册Consumer/Producer信息到Broker上 (在Consumer start时马上调用,之后每隔一段时间调用一次)
    80. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    81. // 唤醒MessageQueue均衡服务,负载均衡后马上开启第一次拉取消息
    82. this.mQClientFactory.rebalanceImmediately();
    83. }

    1、函数copySubscription:将订阅信息跟RebalanceImpl同步,这个类是consumer的核心逻辑实现类。如果consumer是cluster模式,并且订阅了TopicA的消息,那客户端会自动订阅%RETRY%TopicA。那这个%RETRY%开头的topic是做什么的呢?我们知道consumer消费消息处理失败的话,broker是会延时一定的时间重新推送的,重新推送不是跟其它新消息一起过来,而是通过单独的%RETRY%的topic过来。

    2、getAndCreateMQClientInstance:初始化一个MQClientInstance,这个跟producer共用一个实现。

    3、rebalanceImpl:设置负载均衡器

    4、pullApiWrapper:封装实现类,封装了broker的通信接口

    MQClientInstance启动

    首先看下初始化的部分

    1. public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    2. //前面的逻辑跟Producer相同
    3. ...
    4. ...
    5. //1、Pull请求服务,异步发送请求到broker并负责将返回结果放到缓存队列
    6. this.pullMessageService = new PullMessageService(this);
    7. //2、定时或者被触发做subscribe queue的re-balance
    8. this.rebalanceService = new RebalanceService(this);
    9. //3、初始化一个自用的producer,`CLIENT_INNER_PRODUCER`
    10. this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    11. this.defaultMQProducer.resetClientConfig(clientConfig);
    12. ...
    13. }

    其中第三步中的自用producer,主要用于在消费失败或者超时后发送重试的消息给broker。
    下面看下启动的过程中Consumer相关的部分:

    1. public void start() throws MQClientException {
    2. synchronized (this) {
    3. switch (this.serviceState) {
    4. case CREATE_JUST:
    5. ...
    6. // 1、Start various schedule tasks
    7. this.startScheduledTask();
    8. // 2、Start pull service,开始处理PullRequest
    9. this.pullMessageService.start();
    10. // 3、Start rebalance service
    11. this.rebalanceService.start();
    12. // 4、Start push service,consumer预留的producer,发送要求重新的消息
    13. this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    14. ...
    15. }
    16. }
    17. }

    函数startScheduledTask启动的定时任务中,consumer相关的任务有两个

    1. //保存消费进度,广播消息存在本地,集群消息上传到所有的broker
    2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    3. @Override
    4. public void run() {
    5. try {
    6. MQClientInstance.this.persistAllConsumerOffset();
    7. } catch (Exception e) {
    8. log.error("ScheduledTask persistAllConsumerOffset exception", e);
    9. }
    10. }
    11. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    12. //对于`PushConsumer`,根据负载调整本地处理消息的线程池corePool大小
    13. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    14. @Override
    15. public void run() {
    16. try {
    17. MQClientInstance.this.adjustThreadPool();
    18. } catch (Exception e) {
    19. log.error("ScheduledTask adjustThreadPool exception", e);
    20. }
    21. }
    22. }, 1, 1, TimeUnit.MINUTES);

    第3步,RebalanceService的任务主要是调用RebalanceImpl,来给consumer重新调整和分配queue。

    • 定时触发(20sec)做rebalance
    • 接口触发,1)收到broker的consumer list发生变化通知后需要重新做负载均衡,比如同一个group中新加入了consumer或者有consumer下线;2)consumer启动的时候

    从以上的PushConsumer启动逻辑可以看出,主要的消息读取逻辑都是由RebalanceImpl完成的,通过调用doRebalance()来触发,下面看下具体实现。

    RebalanceImpl触发Pull消息

    1. public void doRebalance(final boolean isOrder) {
    2. //获取该consumer的订阅信息
    3. Map subTable = this.getSubscriptionInner();
    4. if (subTable != null) {
    5. for (final Map.Entry entry : subTable.entrySet()) {
    6. final String topic = entry.getKey();
    7. try {
    8. //循环针对所有订阅的topic,做rebalance
    9. this.rebalanceByTopic(topic, isOrder);
    10. } catch (Throwable e) {
    11. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    12. log.warn("rebalanceByTopic Exception", e);
    13. }
    14. }
    15. }
    16. }
    17. //做完rebalance后,检查是否有的queue已经不归自己负责消费,是的话就释放缓存message的queue
    18. this.truncateMessageQueueNotMyTopic();
    19. }

    主要的逻辑都是在rebalanceByTopic()中实现的:

    1. private void rebalanceByTopic(final String topic, final boolean isOrder) {
    2. switch (messageModel) {
    3. case BROADCASTING: {
    4. ...
    5. ...
    6. break;
    7. }
    8. case CLUSTERING: {
    9. //1、从路由信息中获取topic对应所有的Queue
    10. Set mqSet = this.topicSubscribeInfoTable.get(topic);
    11. //2、从broker获取所有同一个group的所有Consumer ID
    12. List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    13. if (null == mqSet) {
    14. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    15. log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    16. }
    17. }
    18. if (null == cidAll) {
    19. log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
    20. }
    21. if (mqSet != null && cidAll != null) {
    22. List mqAll = new ArrayList();
    23. mqAll.addAll(mqSet);
    24. //3、将MQ和cid都排好序
    25. Collections.sort(mqAll);
    26. Collections.sort(cidAll);
    27. AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    28. //4、按照初始化是指定的分配策略,获取分配的MQ列表
    29. List allocateResult = null;
    30. try {
    31. allocateResult = strategy.allocate(
    32. this.consumerGroup,
    33. this.mQClientFactory.getClientId(),
    34. mqAll,
    35. cidAll);
    36. } catch (Throwable e) {
    37. log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
    38. e);
    39. return;
    40. }
    41. Set allocateResultSet = new HashSet();
    42. if (allocateResult != null) {
    43. allocateResultSet.addAll(allocateResult);
    44. }
    45. //5、更新rebalanceImpl中的processQueue用来缓存收到的消息,对于新加入的Queue,提交一次PullRequest
    46. boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    47. if (changed) {
    48. ...
    49. ...
    50. //6、同步数据到broker,通过发送一次心跳实现
    51. this.messageQueueChanged(topic, mqSet, allocateResultSet);
    52. }
    53. }
    54. break;
    55. }
    56. default:
    57. break;
    58. }
    59. }

    第4步,同一个topic的消息会分布于集群内的多个broker的不同queue上。同一个group下面会有多个consumer,分配策略AllocateMessageQueueStrategy的作用就是计算当前consumer应该消费哪几个queue的消息。

    第5步,根据分配策略分配到queue之后,会查看是否是新增的queue,如果是则提交一次PullRequest去broker拉取消息。

    下面来看下分配策略和Pull请求的提交过程。

    Queue分配策略AllocateMessageQueueStrategy

    系统默认使用AVG策略(AllocateMessageQueueAveragely),就是将该topic所有Queue按照broker和queueId从小到大做排列,按照consumer的数量平均分成几份。然后每个consumer分到一份,按照consumer排序后的顺序来领取。代码实现如下:

    1. @Override
    2. public List allocate(String consumerGroup, String currentCID, List mqAll,
    3. List cidAll) {
    4. int index = cidAll.indexOf(currentCID);
    5. int mod = mqAll.size() % cidAll.size();
    6. //AVG size计算方法,mq数量<=consumer数量,size=1,这种情况是很少的
    7. //否则size=mq数量/consumer数量,余数是几则前几个consumer的size+1,这样所有的queue都会有consumer消费
    8. int averageSize =
    9. mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
    10. + 1 : mqAll.size() / cidAll.size());
    11. int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    12. int range = Math.min(averageSize, mqAll.size() - startIndex);
    13. //从第一个consumer开始分配,每个分avgSize个连续的Queue,
    14. for (int i = 0; i < range; i++) {
    15. result.add(mqAll.get((startIndex + i) % mqAll.size()));
    16. }
    17. return result;
    18. }

    RocketMQ提供其它的queue分配策略:

    • AVG_BY_CIRCLE, 跟AVG类似,只是分到的queue不是连续的。比如一共12个Queue,3个consumer,则第一个consumer接收queue1,4,7,9的消息。
    • CONSISTENT_HASH,使用一致性hash算法来分配Queue,用户需自定义虚拟节点的数量
    • MACHINE_ROOM,将queue先按照broker划分几个computer room,不同的consumer只消费某几个broker上的消息
    • CONFIG,用户启动时指定消费哪些Queue的消息

    提交Pull请求

    通过上面的策略分配到queue之后,RebalanceImpl通过updateProcessQueueTableInRebalance()方法来检查新加入queue并提交pull请求。

    1. private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,
    2. final boolean isOrder) {
    3. boolean changed = false;
    4. Iterator> it = this.processQueueTable.entrySet().iterator();
    5. while (it.hasNext()) {
    6. Entry next = it.next();
    7. MessageQueue mq = next.getKey();
    8. ProcessQueue pq = next.getValue();
    9. if (mq.getTopic().equals(topic)) {
    10. if (!mqSet.contains(mq)) {//不再消费这个Queue的消息
    11. pq.setDropped(true);
    12. if (this.removeUnnecessaryMessageQueue(mq, pq)) {//保存offset并
    13. it.remove();
    14. changed = true;
    15. log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
    16. }
    17. } else if (pq.isPullExpired()) {//超过max idle时间
    18. switch (this.consumeType()) {
    19. case CONSUME_ACTIVELY:
    20. break;
    21. case CONSUME_PASSIVELY:
    22. pq.setDropped(true);
    23. if (this.removeUnnecessaryMessageQueue(mq, pq)) {
    24. it.remove();
    25. changed = true;
    26. log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
    27. consumerGroup, mq);
    28. }
    29. break;
    30. default:
    31. break;
    32. }
    33. }
    34. }
    35. }
    36. List pullRequestList = new ArrayList();
    37. for (MessageQueue mq : mqSet) {
    38. if (!this.processQueueTable.containsKey(mq)) {//如果是新加入的Queue
    39. if (isOrder && !this.lock(mq)) {
    40. log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
    41. continue;
    42. }
    43. //从offset store中移除过时的数据
    44. this.removeDirtyOffset(mq);
    45. ProcessQueue pq = new ProcessQueue();
    46. //获取起始消费offset
    47. long nextOffset = this.computePullFromWhere(mq);
    48. if (nextOffset >= 0) {
    49. //为新的Queue初始化一个ProcessQueue,用来缓存收到的消息
    50. ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    51. if (pre != null) {
    52. log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
    53. } else {
    54. log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    55. //对新加的queue初始化一个PullRequest
    56. PullRequest pullRequest = new PullRequest();
    57. pullRequest.setConsumerGroup(consumerGroup);
    58. pullRequest.setNextOffset(nextOffset);
    59. pullRequest.setMessageQueue(mq);
    60. pullRequest.setProcessQueue(pq);
    61. pullRequestList.add(pullRequest);
    62. changed = true;
    63. }
    64. } else {
    65. log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    66. }
    67. }
    68. }
    69. //分发pull request到PullMessageService,拉取消息
    70. this.dispatchPullRequest(pullRequestList);
    71. return changed;
    72. }

    从以上的代码可以看出,RebalanceImpl每次都会检查分配到的queue列表,如果发现有新的queue加入,就会给这个queue初始化一个缓存队列,然后新发起一个PullRequestPullMessageService执行。由此可见,新增的queue只有第一次Pull请求时RebalanceImpl发起的,后续请求是在broker返回数据后,处理线程发起的。

    消息拉取服务PullMessageService

    这个服务就是一个单独运行的线程,在收到Pull请求后异步执行。

    1. private void pullMessage(final PullRequest pullRequest) {
    2. final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    3. if (consumer != null) {
    4. DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
    5. impl.pullMessage(pullRequest);
    6. } else {
    7. log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    8. }
    9. }

    上面的真正的实现是在DefaultMQPushConsumerImpl.pullMessage()里面。

    1. public void pullMessage(final PullRequest pullRequest) {
    2. final ProcessQueue processQueue = pullRequest.getProcessQueue();
    3. ...
    4. ...
    5. pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    6. ...
    7. ...
    8. long cachedMessageCount = processQueue.getMsgCount().get();
    9. long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    10. //1、如果堆积未处理的消息过多,则扔回PullMessageService,延时执行(默认50ms)
    11. if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    12. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    13. if ((queueFlowControlTimes++ % 1000) == 0) {
    14. log.warn(...);
    15. }
    16. return;
    17. }
    18. //2、如果堆积消息的size过大,同上面的逻辑
    19. if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    20. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    21. if ((queueFlowControlTimes++ % 1000) == 0) {
    22. log.warn(...);
    23. }
    24. return;
    25. }
    26. //3、无序消息,消息offset跨度过大,同上面的流控逻辑
    27. if (!this.consumeOrderly) {
    28. if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
    29. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    30. if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
    31. log.warn(...);
    32. }
    33. return;
    34. }
    35. } else {
    36. if (processQueue.isLocked()) {
    37. if (!pullRequest.isLockedFirst()) {
    38. final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
    39. boolean brokerBusy = offset < pullRequest.getNextOffset();
    40. log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
    41. pullRequest, offset, brokerBusy);
    42. if (brokerBusy) {
    43. log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
    44. pullRequest, offset);
    45. }
    46. pullRequest.setLockedFirst(true);
    47. pullRequest.setNextOffset(offset);
    48. }
    49. } else {
    50. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    51. log.info("pull message later because not locked in broker, {}", pullRequest);
    52. return;
    53. }
    54. }
    55. //4、检查订阅关系有没有变化,有可能在延时期间,topic或者consumer的配置都发生了变化
    56. final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    57. if (null == subscriptionData) {
    58. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    59. log.warn("find the consumer's subscription failed, {}", pullRequest);
    60. return;
    61. }
    62. final long beginTimestamp = System.currentTimeMillis();
    63. //5、Pull Command发送后,返回结果处理
    64. PullCallback pullCallback = new PullCallback() {
    65. @Override
    66. public void onSuccess(PullResult pullResult) {
    67. if (pullResult != null) {
    68. //6、消息预处理,客户端再次过滤,set minOffset和maxOffset
    69. pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    70. subscriptionData);
    71. switch (pullResult.getPullStatus()) {
    72. case FOUND:
    73. long prevRequestOffset = pullRequest.getNextOffset();
    74. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    75. long pullRT = System.currentTimeMillis() - beginTimestamp;
    76. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
    77. pullRequest.getMessageQueue().getTopic(), pullRT);
    78. long firstMsgOffset = Long.MAX_VALUE;
    79. //7、如果获取到的消息数为0,则立即发起下一次pull
    80. if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
    81. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    82. } else {
    83. firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    84. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
    85. pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    86. //8、消息放入ProcessQueue
    87. boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
    88. //9、消费消息,调用messageListener处理,处理完成会通知ProcessQueue
    89. DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    90. pullResult.getMsgFoundList(),
    91. processQueue,
    92. pullRequest.getMessageQueue(),
    93. dispatchToConsume);
    94. //10、再次提交pull request
    95. if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
    96. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
    97. DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    98. } else {
    99. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    100. }
    101. }
    102. if (pullResult.getNextBeginOffset() < prevRequestOffset
    103. || firstMsgOffset < prevRequestOffset) {
    104. log.warn(
    105. "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
    106. pullResult.getNextBeginOffset(),
    107. firstMsgOffset,
    108. prevRequestOffset);
    109. }
    110. break;
    111. case NO_NEW_MSG:
    112. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    113. DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    114. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    115. break;
    116. case NO_MATCHED_MSG:
    117. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    118. DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    119. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    120. break;
    121. case OFFSET_ILLEGAL://Queue已经不存在了
    122. log.warn("the pull request offset illegal, {} {}",
    123. pullRequest.toString(), pullResult.toString());
    124. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    125. pullRequest.getProcessQueue().setDropped(true);
    126. DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
    127. //存储消费offset,从rebalance中移除ProcessQueue
    128. @Override
    129. public void run() {
    130. try {
    131. DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
    132. pullRequest.getNextOffset(), false);
    133. DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
    134. DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
    135. log.warn("fix the pull request offset, {}", pullRequest);
    136. } catch (Throwable e) {
    137. log.error("executeTaskLater Exception", e);
    138. }
    139. }
    140. }, 10000);
    141. break;
    142. default:
    143. break;
    144. }
    145. }
    146. }
    147. @Override
    148. public void onException(Throwable e) {
    149. if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    150. log.warn("execute the pull request exception", e);
    151. }
    152. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    153. }
    154. };
    155. boolean commitOffsetEnable = false;
    156. long commitOffsetValue = 0L;
    157. if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    158. commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    159. if (commitOffsetValue > 0) {
    160. commitOffsetEnable = true;
    161. }
    162. }
    163. String subExpression = null;
    164. boolean classFilter = false;
    165. SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    166. if (sd != null) {
    167. if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
    168. subExpression = sd.getSubString();
    169. }
    170. classFilter = sd.isClassFilterMode();
    171. }
    172. int sysFlag = PullSysFlag.buildSysFlag(
    173. commitOffsetEnable, // commitOffset
    174. true, // suspend
    175. subExpression != null, // subscription
    176. classFilter // class filter
    177. );
    178. try {
    179. this.pullAPIWrapper.pullKernelImpl(
    180. pullRequest.getMessageQueue(),
    181. subExpression,
    182. subscriptionData.getExpressionType(),
    183. subscriptionData.getSubVersion(),
    184. pullRequest.getNextOffset(),
    185. this.defaultMQPushConsumer.getPullBatchSize(),
    186. sysFlag,
    187. commitOffsetValue,
    188. BROKER_SUSPEND_MAX_TIME_MILLIS,
    189. CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
    190. CommunicationMode.ASYNC,
    191. pullCallback
    192. );
    193. } catch (Exception e) {//错误处理,延时重试(默认3sec)
    194. log.error("pullKernelImpl exception", e);
    195. this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    196. }
    197. }

    以上逻辑中主要关注第5步,在消息返回后,会将消息放入ProcessQueue,然后通知ConsumeMessageService来异步处理消息,然后再次提交Pull请求。这样对于用户端来说,只有ConsumeMessageService回调listener这一步是可见的,其它都是透明的。

    消息处理ConsumeMessageService

    消息处理的逻辑比较简单,就是回调Consumer启动时注册的Listener。无论Listener是否处理成功,消息都会从ProcessQueue中移除掉。我们看下对于Listener返回结果的处理方法。
    ConsumeMessageConcurrentlyService.processConsumeResult()

    1. /**
    2. * 处理消费结果
    3. *
    4. * @param status 消费结果
    5. * @param context 消费Context
    6. * @param consumeRequest 提交请求
    7. */
    8. public void processConsumeResult(
    9. final ConsumeConcurrentlyStatus status,
    10. final ConsumeConcurrentlyContext context,
    11. final ConsumeRequest consumeRequest
    12. ) {
    13. int ackIndex = context.getAckIndex();
    14. if (consumeRequest.getMsgs().isEmpty())
    15. return;
    16. ...
    17. ...
    18. switch (this.defaultMQPushConsumer.getMessageModel()) {
    19. //broadcast模式,处理失败,不做处理
    20. case BROADCASTING:
    21. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
    22. MessageExt msg = consumeRequest.getMsgs().get(i);
    23. log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
    24. }
    25. break;
    26. case CLUSTERING:
    27. List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
    28. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
    29. MessageExt msg = consumeRequest.getMsgs().get(i);
    30. //Cluster模式,将消息发回broker重新发送
    31. boolean result = this.sendMessageBack(msg, context);
    32. if (!result) {
    33. msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
    34. msgBackFailed.add(msg);
    35. }
    36. }
    37. if (!msgBackFailed.isEmpty()) {
    38. consumeRequest.getMsgs().removeAll(msgBackFailed);
    39. //发回broker失败,则再次尝试本地消费
    40. this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
    41. }
    42. break;
    43. default:
    44. break;
    45. }
    46. //将消费前缓存的消息清除
    47. long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    48. //更新offset
    49. if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    50. this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    51. }
    52. }

    消息处理失败后,consumer会将消息发给broker,broker会根据重试次数来重新投递消息。sendback方法的实现如下

    1. public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
    2. int delayLevel = context.getDelayLevelWhenNextConsume();
    3. try {
    4. this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
    5. return true;
    6. } catch (Exception e) {
    7. log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
    8. }
    9. return false;
    10. }

    首先会根据这是第几次消费失败,设置延时多长时间重新投递,然后调用DefaultMQPushConsumerImpl.sendMessageBack()的方法。默认设置下,最多会重新投递16次。

    1. //consumer把没有消费的消息提交给broker,broker会延时一段时间后重新发送
    2. public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
    3. throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    4. try {
    5. String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
    6. : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
    7. //首先尝试直接发送CONSUMER_SEND_MSG_BACK命令给broker
    8. this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
    9. this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
    10. } catch (Exception e) {
    11. log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
    12. //如果发送失败,则把消息发送到%RETRY%topic,重新发送
    13. Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
    14. String originMsgId = MessageAccessor.getOriginMessageId(msg);
    15. MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
    16. newMsg.setFlag(msg.getFlag());
    17. MessageAccessor.setProperties(newMsg, msg.getProperties());
    18. MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
    19. MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
    20. MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
    21. newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
    22. this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    23. }
    24. }

    一共有两种方式让broker重发,先尝试给broker发送send_msg_back的命令,如果失败了,则通过consumer预留的producer给%RETRY%topic发送消息,前面consumer启动的时候已经讲过,所有consumer都订阅%RETRY%topic,所以等于是自己给自己发一条消息。

  • 相关阅读:
    短时间内防止多次点击
    【Python/Pytorch - 网络模型】-- 高阶SVD算法
    60页5G+智慧农业大数据 2022
    基于node.js+Vue学院会议纪要管理系统 element
    Python configparser模块详解:配置文件解析利器
    这份阿里强推的并发编程知识点笔记,将是你拿大厂offer的突破口
    【Head First 设计模式】-- 观察者模式
    高通平台Android 蓝牙调试和配置手册-- Bluetooth ON/OFF Failures
    CP&FT测试介绍
    Kratos战神微服务框架(二)
  • 原文地址:https://blog.csdn.net/bao2901203013/article/details/126474661