基于rocketmq-4.9.0 版本分析rocketmq
我们先看一幅图
消费偏移量offset就是记录消费者的消费进度的。也是rocketmq保证消息不会重复消费的核心(当然,极端情况下还是可能会导致重复消费)。
consumequeue中一个消息的索引单元就是一个offset值。
在分析rocketmq的消费者是如何利用这个offset完成消息消费的之前,我们先看下broker端是如何管理这些offset值的。
这里的服务端就是broker
initialize()
)时会通过消费者offset管理类ConsumerOffsetManager
来加载配置文件中的offset值,然后设置到offsetTable
属性中。- public class ConsumerOffsetManager extends ConfigManager {
-
- //TODO: key = topic@group
- //TODO: value = Map[key = queueId, value = offset]
- private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
- new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
-
- //TOOD:...other
- }
-
offset文件默认路径:$user.home/store/config/consumerOffset.json 文件内容例子:
- {
- "offsetTable":{
- //topic@consumerGroupName
- "batchTopic@test_cg_batch":{0:0,1:0,2:0,3:1
- },
- //key是queueid,value是offset值,就是我们今天讨论的主角
- "ordered_topic@CG":{0:0,1:15,2:0,3:35
- },
- "qiuguan_topic@qiuguan_group_1":{0:2533,1:2534,2:2531,3:2531
- },
- "hacker_topic@fuyuanhui_group_2":{0:64035,1:64034,2:64034,3:64034
- },
- "qiuguan_topic_2@qiuguan_group":{0:2,1:1,2:7,3:6
- },
- "qiuguan_topic@qiuguan_group":{0:2533,1:2534,2:2531,3:2531
- }
- }
- }
-
ConsumerOffsetManager
的offsetTable
中,然后通过定时任务将其刷盘到文件中。属性中。然后通过3将数据持久化到文件中稍后我们分析消费者时还会看到
initialize()
)时,会启动定时任务,每隔5秒执行一次初始化,将ConsumerOffsetManager
的offsetTable
属性中的值持久化到文件中。- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.consumerOffsetManager.persist();
- } catch (Throwable e) {
- log.error("schedule persist consumerOffset error.", e);
- }
- }
- }, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS);
-
那么服务端对于offset值的管理大致就这些,那么我们来看下消费者是如何利用offset来进行消息消费的。
总的来说就是,消费者定时的将消费过的offset值上传到broker的内存
offsetTable
中,然后通过定时任务将其刷盘到文件中。
那么接下来就看看消费者是如何使用这个offset值的。
前面在分析consumer消费时我们知道,它会启动一个消息拉取服务PullMessageService
对象,还有一个是在拉取消息之前要完成的重平衡RebalanceService
对象。offset初始化就和重平衡息息相关,那么我们就看下重平衡是如何完成offset初始化的。
我们这里还是只讨论集群消费模式。它和广播模式的区别就是,广播模式每个消费者都要消费topic下的所有队列,集群模式通过分配算法(默认是平均)来将topic下的所有队列分配给消费者。既然这里我们主要讨论的是offset,那么就以集群模式进行分析即可。
前面我们分析了重平衡,那么这里我们就只看和offset初始化相关的部分
RebalanceService#run()
一步一步来到初始化offset的地方
- private boolean updateProcessQueueTableInRebalance(final String topic, final Set
mqSet, - final boolean isOrder) {
- boolean changed = false;
-
- //TODO: 省略部分代码........
-
- List
pullRequestList = new ArrayList(); - for (MessageQueue mq : mqSet) {
- if (!this.processQueueTable.containsKey(mq)) {
- if (isOrder && !this.lock(mq)) {
- log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
- continue;
- }
-
- this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
-
- long nextOffset = -1L;
- try {
- //TODO:初始化偏移量
- nextOffset = this.computePullFromWhereWithException(mq);
- } catch (MQClientException e) {
- log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
- continue;
- }
-
- if (nextOffset >= 0) {
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- //TODO:构建拉取请求对象PullRequest
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
- //TODO:重点:设置初始offset值
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
- }
- }
-
- this.dispatchPullRequest(pullRequestList);
-
- return changed;
- }
-
初始化 offset值的地方就是 computePullFromWhereWithException(mq)
方法,那么点进去看下它的内部逻辑:
RebalancePushImpl
- @Override
- public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
- long result = -1;
- //TODO:默认是 CONSUME_FROM_LAST_OFFSET
- final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
- //TODO:获取offsetStore类型,集群模式是RemoteBrokerOffsetStore,后面还会在单独说
- final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
- switch (consumeFromWhere) {
- //TOOD:前3个已经废弃
- case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
- case CONSUME_FROM_MIN_OFFSET:
- case CONSUME_FROM_MAX_OFFSET:
- case CONSUME_FROM_LAST_OFFSET: {
- //TODO:从broker中读取offset值
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- }
- // First start,no offset
- else if (-1 == lastOffset) {
- if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- result = 0L;
- } else {
- try {
- result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- } catch (MQClientException e) {
- log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
- throw e;
- }
- }
- } else {
- result = -1;
- }
- break;
- }
- //TODO:从broker中读取offset值
- case CONSUME_FROM_FIRST_OFFSET: {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- } else if (-1 == lastOffset) {
- result = 0L;
- } else {
- result = -1;
- }
- break;
- }
- //TODO:从broker中读取offset值
- case CONSUME_FROM_TIMESTAMP: {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- } else if (-1 == lastOffset) {
- if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- try {
- result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- } catch (MQClientException e) {
- log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
- throw e;
- }
- } else {
- try {
- long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
- UtilAll.YYYYMMDDHHMMSS).getTime();
- result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
- } catch (MQClientException e) {
- log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
- throw e;
- }
- }
- } else {
- result = -1;
- }
- break;
- }
-
- default:
- break;
- }
-
- return result;
- }
-
这里看到RocketMQ一个比较有意思的枚举 ConsumeFromWhere, 我这里就不对他展开描述了,如果大家感兴趣可以自己研究下,总之就是无论哪种配置策略都要先从broker读取offset,如果“没有”,则按照客户端的规则进行offset的初始化。
所以这里就是确定好初始offset. 确定好offset后,就可以构建PullRequest
了,我们继续看前面的代码:
- long nextOffset = -1L;
- try {
- //TODO:初始化offset
- nextOffset = this.computePullFromWhereWithException(mq);
- } catch (MQClientException e) {
- log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
- continue;
- }
-
- if (nextOffset >= 0) {
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
-
- //TODO:设置初始化offset,拉取消息时,就从这个offset开始拉取
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
-
这里就比较熟悉了,前面分析消费流程的时候我们也看到过,接下来就是将填充好MessageQueue
(主要就是queueid), ProcessQueue
(消费者本地缓存消息的队列),nextOffset
(消费偏移量)的 PullRequest
对象放入拉取消息的类PullMessageService
中的pullRequestQueue
队列属性中,这样PullMessageService
就可以去broker拉取消息了。
拉取消息的过程我们还只是关注消费偏移量(nextOffset)的变化
- @Override
- public void run() {
- log.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- try {
- //TODO:从队列中取出 PullRequest
- PullRequest pullRequest = this.pullRequestQueue.take();
- //TODO:拉取消息
- this.pullMessage(pullRequest);
- } catch (InterruptedException ignored) {
- } catch (Exception e) {
- log.error("Pull Message Service Run Method exception", e);
- }
- }
-
- log.info(this.getServiceName() + " service end");
- }
-
那么在拉取消息的过程中,offset是如何工作的呢?我们用一幅图来展示:
那么我们在从源码中简单看下拉取过程:
DefaultMQPushConsumerImpl#pullMessage(PullRequest pullRequest)
- public void pullMessage(final PullRequest pullRequest) {
-
- //TODO:....省略诸多流控代码.......
-
- //TODO:消息从broker拉取成功后的本地回调处理
- //TODO:PullResult 包含了从broker读取的消息,以及新的offset
- PullCallback pullCallback = new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- if (pullResult != null) {
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
-
- switch (pullResult.getPullStatus()) {
- case FOUND:
- long prevRequestOffset = pullRequest.getNextOffset();
-
- //TODO:重点关注
- //TODO:将broker返回的新的offset值重新设置给PullRequest的 nextOffset
- //TODO:下次就用新的offset去broker读取消息
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- long pullRT = System.currentTimeMillis() - beginTimestamp;
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullRT);
-
- long firstMsgOffset = Long.MAX_VALUE;
- if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- } else {
- firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
-
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
-
- boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
- pullResult.getMsgFoundList(),
- processQueue,
- pullRequest.getMessageQueue(),
- dispatchToConsume);
-
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
- } else {
-
- //TODO: 重点关注
- //TODO:再次将PullRequest放入队列中
- //TODO:还是原来的PullRequest对象,只不过offset更新了
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- }
- }
-
- //TODO:...省略部分代码......
-
- break;
- case NO_NEW_MSG:
- case NO_MATCHED_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
-
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
-
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
-
- default:
- break;
- }
- }
- }
-
- @Override
- public void onException(Throwable e) {
- if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("execute the pull request exception", e);
- }
-
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- }
- };
-
- //TODO:...省略部分代码。。。。。
-
- //TODO:去broker拉取消息
- try {
- this.pullAPIWrapper.pullKernelImpl(
- pullRequest.getMessageQueue(),
- subExpression,
- subscriptionData.getExpressionType(),
- subscriptionData.getSubVersion(),
- //TODO:从PullReqeust中获取offset值给broker
- pullRequest.getNextOffset(),
- this.defaultMQPushConsumer.getPullBatchSize(),
- sysFlag,
- commitOffsetValue,
- BROKER_SUSPEND_MAX_TIME_MILLIS,
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
- CommunicationMode.ASYNC,
- //TODO:上面构建的回调对象,当从broker拉取到消息后交给回调处理
- pullCallback
- );
- } catch (Exception e) {
- log.error("pullKernelImpl exception", e);
- this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
- }
- }
通过分析源码,我们也大概知道了在拉取消息过程中offset是如何变化的。那么接下来我们在看下offset是如何完成持久化的。
在消费者消费完消息后,就需要更新offset并完成持久化,我们我们就看下消费完成后处理消费结果的代码:
ConsumeMessageConcurrentlyService#processConsumeResult(...)
- public void processConsumeResult(
- final ConsumeConcurrentlyStatus status,
- final ConsumeConcurrentlyContext context,
- final ConsumeRequest consumeRequest
- ) {
- int ackIndex = context.getAckIndex();
-
- if (consumeRequest.getMsgs().isEmpty())
- return;
-
- switch (status) {
- case CONSUME_SUCCESS:
- if (ackIndex >= consumeRequest.getMsgs().size()) {
- ackIndex = consumeRequest.getMsgs().size() - 1;
- }
- int ok = ackIndex + 1;
- int failed = consumeRequest.getMsgs().size() - ok;
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
- break;
- case RECONSUME_LATER:
- ackIndex = -1;
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
- consumeRequest.getMsgs().size());
- break;
- default:
- break;
- }
-
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING:
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
- }
- break;
- case CLUSTERING:
- List
msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); - for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- boolean result = this.sendMessageBack(msg, context);
- if (!result) {
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- msgBackFailed.add(msg);
- }
- }
-
- if (!msgBackFailed.isEmpty()) {
- consumeRequest.getMsgs().removeAll(msgBackFailed);
-
- this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
- }
- break;
- default:
- break;
- }
-
- //TODO:关注这里即可
- //TODO:从本地缓存队列中将消费的消息移除
- long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
- if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- //TODO:更新offset
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
- }
- }
-
我们继续看下updateOffset的逻辑:
这里有两个实现类
LocalFileOffsetStore
: 广播模式使用它RemoteBrokerOffsetStore
: 集群模式使用它,我们分析的就是集群模式那么我们就看下RemoteBrokerOffsetStore
的updateOffset的逻辑:
- public class RemoteBrokerOffsetStore implements OffsetStore {
-
- //TOOD:....省略部分代码.......
-
- @Override
- public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
- if (mq != null) {
- AtomicLong offsetOld = this.offsetTable.get(mq);
- if (null == offsetOld) {
- offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
- }
-
- if (null != offsetOld) {
- if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
- } else {
- offsetOld.set(offset);
- }
- }
- }
- }
- }
-
其实很简单,就是将消费过的消息的offset(递增)更新到消费者本地offset变量表中。然后通过定时任务持久化到broker中。
接下来在简单看下消费者客户端持久化offset到broker. 在消费者启动时,会启动一个定时任务:
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- MQClientInstance.this.persistAllConsumerOffset();
- } catch (Exception e) {
- log.error("ScheduledTask persistAllConsumerOffset exception", e);
- }
- }
- }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
-
每隔5s将offset持久化到broker
我们在简单看下broker端如何处理offset
- public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
-
- //TODO:.....省略其他code.....
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
- switch (request.getCode()) {
- case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
- return this.getConsumerListByGroup(ctx, request);
- //TODO:更新offset
- case RequestCode.UPDATE_CONSUMER_OFFSET:
- return this.updateConsumerOffset(ctx, request);
- case RequestCode.QUERY_CONSUMER_OFFSET:
- return this.queryConsumerOffset(ctx, request);
- default:
- break;
- }
- return null;
- }
-
- //TODO:....省略其他代码
- }
-
最终代码:
- public class ConsumerOffsetManager extends ConfigManager {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final String TOPIC_GROUP_SEPARATOR = "@";
-
- //TODO: key = topic@group
- //TODO: value = Map[key = queueId, value = offset]
- private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
- new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
-
- //TODO:....省略诸多code.....
-
- private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
- ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
- if (null == map) {
- map = new ConcurrentHashMap<Integer, Long>(32);
- map.put(queueId, offset);
- this.offsetTable.put(key, map);
- } else {
- Long storeOffset = map.put(queueId, offset);
- if (storeOffset != null && offset < storeOffset) {
- log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
- }
- }
- }
-
- //TODO:......
- }
-
其实也很简单,就是将客户端发送的offset更新到broker的offset表中,然后再通过broker的定时任务持久化到文件中。那么我们在简单看下broker的持久化:
broker启动时,也会启动持久化的定时任务
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.consumerOffsetManager.persist();
- } catch (Throwable e) {
- log.error("schedule persist consumerOffset error.", e);
- }
- }
- }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- 复制代码
通过定时任务将上面ConsumerOffsetManager
类中的offset表offsetTable
写到文件中。至此,就完成了offset的更新和持久化。
本文从源码的角度分析了RocketMQ的消费偏移量offset是如何被使用的,简单总结下持久化过程:
关于offset的使用,请参考上面的图。