RocketMQ中消息消费以消费组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组内消费者之间有集群模式和广播模式两种消费模式。
集群模式,主题下的同一条消息只允许被其中一个消费者消费。
广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。
消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。
所谓的拉模式即PullConsumer,是消费端主动拉起拉消息请求,
而推模式即PushConsumer,是消息达到消息服务器后,推送给消息消费者。
下面以PushConsumer模式为例,说明RocketMQ中消息消费的具体过程。
- public class Consumer {
-
- public static void main(String[] args) throws InterruptedException, MQClientException {
-
- // Instantiate with specified consumer group name.
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
-
- // Specify name server addresses.
- consumer.setNamesrvAddr("localhost:9876");
-
- // Subscribe one more more topics to consume.
- consumer.subscribe("TopicTest", "*");
- // Register callback to execute on arrival of messages fetched from brokers.
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, - ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- //Launch the consumer instance.
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
- }
首先,初始化一个DefaultMQPushConsumer,并设置group name。然后设置订阅的topic,一个consumer可以订阅多个topic。
设置Listener,当新消息到来时会回调consumeMessage()方法,用户通过实现这个方法来做业务逻辑处理。
最后启动consumer,开始接收消息。
下面我们看下Consumer的启动过程的代码。
DefaultPushConsumer是DefaultPushConsumerImpl的Facade类,启动直接调用DefaultPushConsumerImpl.start()方法
- public void start() throws MQClientException {
- switch (this.serviceState) {
- case CREATE_JUST:
- log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
- this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
- this.serviceState = ServiceState.START_FAILED;
-
- // 检查配置
- this.checkConfig();
-
- // Rebalance负载均衡 复制订阅数据
- this.copySubscription();
-
- // 设置instanceName,为一个字符串化的数字,比如10072
- if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
- this.defaultMQPushConsumer.changeInstanceNameToPID();
- }
-
- // 获取MQClient对象,clientId为ip@instanceName,比如192.168.0.1@10072
- this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
-
- // 设置负载均衡器
- this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
- //默认这是消费模式为集群模式,每条消息被同一组的消费者中的一个消费
- //还可以设置为广播模式,每条消息被同一个组的所有消费者都消费一次
- this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
- //默认是AllocateMessageQueueAveragely,均分策略
- this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
- this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
-
- // 拉取API封装
- this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
- this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
-
- //生成消费进度处理器,集群模式下消费进度保存在Broker上,因为同一组内的消费者要共享进度;广播模式下进度保存在消费者端
- if (this.defaultMQPushConsumer.getOffsetStore() != null) {
- this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
- } else {
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING:
- this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
- break;
- case CLUSTERING:
- this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
- break;
- default:
- break;
- }
- }
- this.offsetStore.load(); //若是广播模式,加载本地的消费进度文件
-
- // 根据监听是顺序模式还是并发模式来生成相应的ConsumerService
- if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
- this.consumeOrderly = true;
- this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
- } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
- this.consumeOrderly = false;
- this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
- }
- this.consumeMessageService.start();
-
- // 设置MQClient对象
- boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
- if (!registerOK) {
- this.serviceState = ServiceState.CREATE_JUST;
- this.consumeMessageService.shutdown();
- throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
- }
- mQClientFactory.start();
- log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
-
- // 设置服务状态
- this.serviceState = ServiceState.RUNNING;
- break;
- case RUNNING:
- case START_FAILED:
- case SHUTDOWN_ALREADY:
- throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- default:
- break;
- }
-
- // 从Namesrv获取TopicRouteData,更新TopicPublishInfo和MessageQueue (在Consumer start时马上调用,之后每隔一段时间调用一次)
- this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
- // 向TopicRouteData里的所有Broker发送心跳,注册Consumer/Producer信息到Broker上 (在Consumer start时马上调用,之后每隔一段时间调用一次)
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
- // 唤醒MessageQueue均衡服务,负载均衡后马上开启第一次拉取消息
- this.mQClientFactory.rebalanceImmediately();
- }
1、函数copySubscription:将订阅信息跟RebalanceImpl同步,这个类是consumer的核心逻辑实现类。如果consumer是cluster模式,并且订阅了TopicA的消息,那客户端会自动订阅%RETRY%TopicA。那这个%RETRY%开头的topic是做什么的呢?我们知道consumer消费消息处理失败的话,broker是会延时一定的时间重新推送的,重新推送不是跟其它新消息一起过来,而是通过单独的%RETRY%的topic过来。
2、getAndCreateMQClientInstance:初始化一个MQClientInstance,这个跟producer共用一个实现。
3、rebalanceImpl:设置负载均衡器
4、pullApiWrapper:封装实现类,封装了broker的通信接口
首先看下初始化的部分
- public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
- //前面的逻辑跟Producer相同
- ...
- ...
- //1、Pull请求服务,异步发送请求到broker并负责将返回结果放到缓存队列
- this.pullMessageService = new PullMessageService(this);
- //2、定时或者被触发做subscribe queue的re-balance
- this.rebalanceService = new RebalanceService(this);
- //3、初始化一个自用的producer,`CLIENT_INNER_PRODUCER`
- this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
- this.defaultMQProducer.resetClientConfig(clientConfig);
- ...
- }
其中第三步中的自用producer,主要用于在消费失败或者超时后发送重试的消息给broker。
下面看下启动的过程中Consumer相关的部分:
- public void start() throws MQClientException {
-
- synchronized (this) {
- switch (this.serviceState) {
- case CREATE_JUST:
- ...
- // 1、Start various schedule tasks
- this.startScheduledTask();
- // 2、Start pull service,开始处理PullRequest
- this.pullMessageService.start();
- // 3、Start rebalance service
- this.rebalanceService.start();
- // 4、Start push service,consumer预留的producer,发送要求重新的消息
- this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
- ...
- }
- }
- }
函数startScheduledTask启动的定时任务中,consumer相关的任务有两个
- //保存消费进度,广播消息存在本地,集群消息上传到所有的broker
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- MQClientInstance.this.persistAllConsumerOffset();
- } catch (Exception e) {
- log.error("ScheduledTask persistAllConsumerOffset exception", e);
- }
- }
- }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- //对于`PushConsumer`,根据负载调整本地处理消息的线程池corePool大小
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- MQClientInstance.this.adjustThreadPool();
- } catch (Exception e) {
- log.error("ScheduledTask adjustThreadPool exception", e);
- }
- }
- }, 1, 1, TimeUnit.MINUTES);
第3步,RebalanceService的任务主要是调用RebalanceImpl,来给consumer重新调整和分配queue。
从以上的PushConsumer启动逻辑可以看出,主要的消息读取逻辑都是由RebalanceImpl完成的,通过调用doRebalance()来触发,下面看下具体实现。
- public void doRebalance(final boolean isOrder) {
- //获取该consumer的订阅信息
- Map
subTable = this.getSubscriptionInner(); - if (subTable != null) {
- for (final Map.Entry
entry : subTable.entrySet()) { - final String topic = entry.getKey();
- try {
- //循环针对所有订阅的topic,做rebalance
- this.rebalanceByTopic(topic, isOrder);
- } catch (Throwable e) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("rebalanceByTopic Exception", e);
- }
- }
- }
- }
- //做完rebalance后,检查是否有的queue已经不归自己负责消费,是的话就释放缓存message的queue
- this.truncateMessageQueueNotMyTopic();
- }
主要的逻辑都是在rebalanceByTopic()中实现的:
- private void rebalanceByTopic(final String topic, final boolean isOrder) {
- switch (messageModel) {
- case BROADCASTING: {
- ...
- ...
- break;
- }
- case CLUSTERING: {
- //1、从路由信息中获取topic对应所有的Queue
- Set
mqSet = this.topicSubscribeInfoTable.get(topic); - //2、从broker获取所有同一个group的所有Consumer ID
- List
cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); - if (null == mqSet) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
- }
- }
-
- if (null == cidAll) {
- log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
- }
-
- if (mqSet != null && cidAll != null) {
- List
mqAll = new ArrayList(); - mqAll.addAll(mqSet);
- //3、将MQ和cid都排好序
- Collections.sort(mqAll);
- Collections.sort(cidAll);
-
- AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
- //4、按照初始化是指定的分配策略,获取分配的MQ列表
- List
allocateResult = null; - try {
- allocateResult = strategy.allocate(
- this.consumerGroup,
- this.mQClientFactory.getClientId(),
- mqAll,
- cidAll);
- } catch (Throwable e) {
- log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
- return;
- }
-
- Set
allocateResultSet = new HashSet(); - if (allocateResult != null) {
- allocateResultSet.addAll(allocateResult);
- }
- //5、更新rebalanceImpl中的processQueue用来缓存收到的消息,对于新加入的Queue,提交一次PullRequest
- boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
- if (changed) {
- ...
- ...
- //6、同步数据到broker,通过发送一次心跳实现
- this.messageQueueChanged(topic, mqSet, allocateResultSet);
- }
- }
- break;
- }
- default:
- break;
- }
- }
第4步,同一个topic的消息会分布于集群内的多个broker的不同queue上。同一个group下面会有多个consumer,分配策略AllocateMessageQueueStrategy的作用就是计算当前consumer应该消费哪几个queue的消息。
第5步,根据分配策略分配到queue之后,会查看是否是新增的queue,如果是则提交一次PullRequest去broker拉取消息。
下面来看下分配策略和Pull请求的提交过程。
AllocateMessageQueueStrategy系统默认使用AVG策略(AllocateMessageQueueAveragely),就是将该topic所有Queue按照broker和queueId从小到大做排列,按照consumer的数量平均分成几份。然后每个consumer分到一份,按照consumer排序后的顺序来领取。代码实现如下:
- @Override
- public List
allocate(String consumerGroup, String currentCID, List mqAll, - List
cidAll) { - int index = cidAll.indexOf(currentCID);
- int mod = mqAll.size() % cidAll.size();
- //AVG size计算方法,mq数量<=consumer数量,size=1,这种情况是很少的
- //否则size=mq数量/consumer数量,余数是几则前几个consumer的size+1,这样所有的queue都会有consumer消费
- int averageSize =
- mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
- + 1 : mqAll.size() / cidAll.size());
- int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
- int range = Math.min(averageSize, mqAll.size() - startIndex);
- //从第一个consumer开始分配,每个分avgSize个连续的Queue,
- for (int i = 0; i < range; i++) {
- result.add(mqAll.get((startIndex + i) % mqAll.size()));
- }
- return result;
- }
RocketMQ提供其它的queue分配策略:
通过上面的策略分配到queue之后,RebalanceImpl通过updateProcessQueueTableInRebalance()方法来检查新加入queue并提交pull请求。
- private boolean updateProcessQueueTableInRebalance(final String topic, final Set
mqSet, - final boolean isOrder) {
- boolean changed = false;
-
- Iterator
> it = this.processQueueTable.entrySet().iterator(); - while (it.hasNext()) {
- Entry
next = it.next(); - MessageQueue mq = next.getKey();
- ProcessQueue pq = next.getValue();
-
- if (mq.getTopic().equals(topic)) {
- if (!mqSet.contains(mq)) {//不再消费这个Queue的消息
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {//保存offset并
- it.remove();
- changed = true;
- log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
- }
- } else if (pq.isPullExpired()) {//超过max idle时间
- switch (this.consumeType()) {
- case CONSUME_ACTIVELY:
- break;
- case CONSUME_PASSIVELY:
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
- consumerGroup, mq);
- }
- break;
- default:
- break;
- }
- }
- }
- }
-
- List
pullRequestList = new ArrayList(); - for (MessageQueue mq : mqSet) {
- if (!this.processQueueTable.containsKey(mq)) {//如果是新加入的Queue
- if (isOrder && !this.lock(mq)) {
- log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
- continue;
- }
- //从offset store中移除过时的数据
- this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
- //获取起始消费offset
- long nextOffset = this.computePullFromWhere(mq);
- if (nextOffset >= 0) {
- //为新的Queue初始化一个ProcessQueue,用来缓存收到的消息
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- //对新加的queue初始化一个PullRequest
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
- }
- }
- //分发pull request到PullMessageService,拉取消息
- this.dispatchPullRequest(pullRequestList);
- return changed;
- }
从以上的代码可以看出,RebalanceImpl每次都会检查分配到的queue列表,如果发现有新的queue加入,就会给这个queue初始化一个缓存队列,然后新发起一个PullRequest给PullMessageService执行。由此可见,新增的queue只有第一次Pull请求时RebalanceImpl发起的,后续请求是在broker返回数据后,处理线程发起的。
PullMessageService这个服务就是一个单独运行的线程,在收到Pull请求后异步执行。
- private void pullMessage(final PullRequest pullRequest) {
- final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
- if (consumer != null) {
- DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
- impl.pullMessage(pullRequest);
- } else {
- log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
- }
- }
上面的真正的实现是在DefaultMQPushConsumerImpl.pullMessage()里面。
- public void pullMessage(final PullRequest pullRequest) {
- final ProcessQueue processQueue = pullRequest.getProcessQueue();
- ...
- ...
- pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
- ...
- ...
- long cachedMessageCount = processQueue.getMsgCount().get();
- long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
- //1、如果堆积未处理的消息过多,则扔回PullMessageService,延时执行(默认50ms)
- if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(...);
- }
- return;
- }
- //2、如果堆积消息的size过大,同上面的逻辑
- if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueFlowControlTimes++ % 1000) == 0) {
- log.warn(...);
- }
- return;
- }
- //3、无序消息,消息offset跨度过大,同上面的流控逻辑
- if (!this.consumeOrderly) {
- if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
- log.warn(...);
- }
- return;
- }
- } else {
- if (processQueue.isLocked()) {
- if (!pullRequest.isLockedFirst()) {
- final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
- boolean brokerBusy = offset < pullRequest.getNextOffset();
- log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
- pullRequest, offset, brokerBusy);
- if (brokerBusy) {
- log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
- pullRequest, offset);
- }
-
- pullRequest.setLockedFirst(true);
- pullRequest.setNextOffset(offset);
- }
- } else {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- log.info("pull message later because not locked in broker, {}", pullRequest);
- return;
- }
- }
- //4、检查订阅关系有没有变化,有可能在延时期间,topic或者consumer的配置都发生了变化
- final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (null == subscriptionData) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- log.warn("find the consumer's subscription failed, {}", pullRequest);
- return;
- }
-
- final long beginTimestamp = System.currentTimeMillis();
- //5、Pull Command发送后,返回结果处理
- PullCallback pullCallback = new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- if (pullResult != null) {
- //6、消息预处理,客户端再次过滤,set minOffset和maxOffset
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
-
- switch (pullResult.getPullStatus()) {
- case FOUND:
- long prevRequestOffset = pullRequest.getNextOffset();
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- long pullRT = System.currentTimeMillis() - beginTimestamp;
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullRT);
-
- long firstMsgOffset = Long.MAX_VALUE;
- //7、如果获取到的消息数为0,则立即发起下一次pull
- if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- } else {
- firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
-
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
- //8、消息放入ProcessQueue
- boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- //9、消费消息,调用messageListener处理,处理完成会通知ProcessQueue
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
- pullResult.getMsgFoundList(),
- processQueue,
- pullRequest.getMessageQueue(),
- dispatchToConsume);
- //10、再次提交pull request
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
- } else {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- }
- }
-
- if (pullResult.getNextBeginOffset() < prevRequestOffset
- || firstMsgOffset < prevRequestOffset) {
- log.warn(
- "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
- pullResult.getNextBeginOffset(),
- firstMsgOffset,
- prevRequestOffset);
- }
-
- break;
- case NO_NEW_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
-
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case NO_MATCHED_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
-
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case OFFSET_ILLEGAL://Queue已经不存在了
- log.warn("the pull request offset illegal, {} {}",
- pullRequest.toString(), pullResult.toString());
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
- pullRequest.getProcessQueue().setDropped(true);
- DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
- //存储消费offset,从rebalance中移除ProcessQueue
- @Override
- public void run() {
- try {
- DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
- pullRequest.getNextOffset(), false);
-
- DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
-
- DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
-
- log.warn("fix the pull request offset, {}", pullRequest);
- } catch (Throwable e) {
- log.error("executeTaskLater Exception", e);
- }
- }
- }, 10000);
- break;
- default:
- break;
- }
- }
- }
-
- @Override
- public void onException(Throwable e) {
- if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("execute the pull request exception", e);
- }
-
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- }
- };
-
- boolean commitOffsetEnable = false;
- long commitOffsetValue = 0L;
- if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
- commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
-
- String subExpression = null;
- boolean classFilter = false;
- SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (sd != null) {
- if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
- subExpression = sd.getSubString();
- }
-
- classFilter = sd.isClassFilterMode();
- }
-
- int sysFlag = PullSysFlag.buildSysFlag(
- commitOffsetEnable, // commitOffset
- true, // suspend
- subExpression != null, // subscription
- classFilter // class filter
- );
- try {
- this.pullAPIWrapper.pullKernelImpl(
- pullRequest.getMessageQueue(),
- subExpression,
- subscriptionData.getExpressionType(),
- subscriptionData.getSubVersion(),
- pullRequest.getNextOffset(),
- this.defaultMQPushConsumer.getPullBatchSize(),
- sysFlag,
- commitOffsetValue,
- BROKER_SUSPEND_MAX_TIME_MILLIS,
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
- CommunicationMode.ASYNC,
- pullCallback
- );
- } catch (Exception e) {//错误处理,延时重试(默认3sec)
- log.error("pullKernelImpl exception", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- }
- }
以上逻辑中主要关注第5步,在消息返回后,会将消息放入ProcessQueue,然后通知ConsumeMessageService来异步处理消息,然后再次提交Pull请求。这样对于用户端来说,只有ConsumeMessageService回调listener这一步是可见的,其它都是透明的。
ConsumeMessageService消息处理的逻辑比较简单,就是回调Consumer启动时注册的Listener。无论Listener是否处理成功,消息都会从ProcessQueue中移除掉。我们看下对于Listener返回结果的处理方法。ConsumeMessageConcurrentlyService.processConsumeResult()
- /**
- * 处理消费结果
- *
- * @param status 消费结果
- * @param context 消费Context
- * @param consumeRequest 提交请求
- */
- public void processConsumeResult(
- final ConsumeConcurrentlyStatus status,
- final ConsumeConcurrentlyContext context,
- final ConsumeRequest consumeRequest
- ) {
- int ackIndex = context.getAckIndex();
-
- if (consumeRequest.getMsgs().isEmpty())
- return;
-
- ...
- ...
-
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- //broadcast模式,处理失败,不做处理
- case BROADCASTING:
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
- }
- break;
- case CLUSTERING:
- List
msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); - for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- //Cluster模式,将消息发回broker重新发送
- boolean result = this.sendMessageBack(msg, context);
- if (!result) {
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- msgBackFailed.add(msg);
- }
- }
-
- if (!msgBackFailed.isEmpty()) {
- consumeRequest.getMsgs().removeAll(msgBackFailed);
- //发回broker失败,则再次尝试本地消费
- this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
- }
- break;
- default:
- break;
- }
- //将消费前缓存的消息清除
- long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
- //更新offset
- if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
- }
- }
消息处理失败后,consumer会将消息发给broker,broker会根据重试次数来重新投递消息。sendback方法的实现如下
- public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
- int delayLevel = context.getDelayLevelWhenNextConsume();
-
- try {
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
- return true;
- } catch (Exception e) {
- log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
- }
-
- return false;
- }
首先会根据这是第几次消费失败,设置延时多长时间重新投递,然后调用DefaultMQPushConsumerImpl.sendMessageBack()的方法。默认设置下,最多会重新投递16次。
- //consumer把没有消费的消息提交给broker,broker会延时一段时间后重新发送
- public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
- //首先尝试直接发送CONSUMER_SEND_MSG_BACK命令给broker
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
- this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
- } catch (Exception e) {
- log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
- //如果发送失败,则把消息发送到%RETRY%topic,重新发送
- Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
-
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
-
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
- MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
-
- this.mQClientFactory.getDefaultMQProducer().send(newMsg);
- }
- }
一共有两种方式让broker重发,先尝试给broker发送send_msg_back的命令,如果失败了,则通过consumer预留的producer给%RETRY%topic发送消息,前面consumer启动的时候已经讲过,所有consumer都订阅%RETRY%topic,所以等于是自己给自己发一条消息。