• RocketMQ源码分析:Consumer消费偏移量


    基于rocketmq-4.9.0 版本分析rocketmq

    1.什么是消费偏移量offset?

    我们先看一幅图

    消费偏移量offset就是记录消费者的消费进度的。也是rocketmq保证消息不会重复消费的核心(当然,极端情况下还是可能会导致重复消费)。

    consumequeue中一个消息的索引单元就是一个offset值。

    在分析rocketmq的消费者是如何利用这个offset完成消息消费的之前,我们先看下broker端是如何管理这些offset值的。

    服务端管理offset

    这里的服务端就是broker

    1. broker在初始化(initialize())时会通过消费者offset管理类ConsumerOffsetManager来加载配置文件中的offset值,然后设置到offsetTable属性中。
    1. public class ConsumerOffsetManager extends ConfigManager {
    2. //TODO: key = topic@group
    3. //TODO: value = Map[key = queueId, value = offset]
    4. private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
    5. new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
    6. //TOOD:...other
    7. }

    offset文件默认路径:$user.home/store/config/consumerOffset.json 文件内容例子:

    1. {
    2. "offsetTable":{
    3. //topic@consumerGroupName
    4. "batchTopic@test_cg_batch":{0:0,1:0,2:0,3:1
    5. },
    6. //key是queueid,value是offset值,就是我们今天讨论的主角
    7. "ordered_topic@CG":{0:0,1:15,2:0,3:35
    8. },
    9. "qiuguan_topic@qiuguan_group_1":{0:2533,1:2534,2:2531,3:2531
    10. },
    11. "hacker_topic@fuyuanhui_group_2":{0:64035,1:64034,2:64034,3:64034
    12. },
    13. "qiuguan_topic_2@qiuguan_group":{0:2,1:1,2:7,3:6
    14. },
    15. "qiuguan_topic@qiuguan_group":{0:2533,1:2534,2:2531,3:2531
    16. }
    17. }
    18. }
    1. 消费者消费后,会将offset发送到broker,这里会先写入到上面的消费者offset管理类ConsumerOffsetManageroffsetTable中,然后通过定时任务将其刷盘到文件中。属性中。然后通过3将数据持久化到文件中

    稍后我们分析消费者时还会看到

    1. broker在初始化(initialize())时,会启动定时任务,每隔5秒执行一次初始化,将ConsumerOffsetManageroffsetTable属性中的值持久化到文件中。
    1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    2. @Override
    3. public void run() {
    4. try {
    5. BrokerController.this.consumerOffsetManager.persist();
    6. } catch (Throwable e) {
    7. log.error("schedule persist consumerOffset error.", e);
    8. }
    9. }
    10. }, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS);

    那么服务端对于offset值的管理大致就这些,那么我们来看下消费者是如何利用offset来进行消息消费的。

    总的来说就是,消费者定时的将消费过的offset值上传到broker的内存offsetTable中,然后通过定时任务将其刷盘到文件中。

    那么接下来就看看消费者是如何使用这个offset值的。

    3.消费者使用offset

    3.1 消费者初始化offset

    前面在分析consumer消费时我们知道,它会启动一个消息拉取服务PullMessageService对象,还有一个是在拉取消息之前要完成的重平衡RebalanceService对象。offset初始化就和重平衡息息相关,那么我们就看下重平衡是如何完成offset初始化的。

    我们这里还是只讨论集群消费模式。它和广播模式的区别就是,广播模式每个消费者都要消费topic下的所有队列,集群模式通过分配算法(默认是平均)来将topic下的所有队列分配给消费者。既然这里我们主要讨论的是offset,那么就以集群模式进行分析即可。

    前面我们分析了重平衡,那么这里我们就只看和offset初始化相关的部分

    RebalanceService#run()一步一步来到初始化offset的地方

    1. private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,
    2. final boolean isOrder) {
    3. boolean changed = false;
    4. //TODO: 省略部分代码........
    5. List pullRequestList = new ArrayList();
    6. for (MessageQueue mq : mqSet) {
    7. if (!this.processQueueTable.containsKey(mq)) {
    8. if (isOrder && !this.lock(mq)) {
    9. log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
    10. continue;
    11. }
    12. this.removeDirtyOffset(mq);
    13. ProcessQueue pq = new ProcessQueue();
    14. long nextOffset = -1L;
    15. try {
    16. //TODO:初始化偏移量
    17. nextOffset = this.computePullFromWhereWithException(mq);
    18. } catch (MQClientException e) {
    19. log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
    20. continue;
    21. }
    22. if (nextOffset >= 0) {
    23. ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    24. if (pre != null) {
    25. log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
    26. } else {
    27. log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    28. //TODO:构建拉取请求对象PullRequest
    29. PullRequest pullRequest = new PullRequest();
    30. pullRequest.setConsumerGroup(consumerGroup);
    31. //TODO:重点:设置初始offset值
    32. pullRequest.setNextOffset(nextOffset);
    33. pullRequest.setMessageQueue(mq);
    34. pullRequest.setProcessQueue(pq);
    35. pullRequestList.add(pullRequest);
    36. changed = true;
    37. }
    38. } else {
    39. log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    40. }
    41. }
    42. }
    43. this.dispatchPullRequest(pullRequestList);
    44. return changed;
    45. }

    初始化 offset值的地方就是 computePullFromWhereWithException(mq)方法,那么点进去看下它的内部逻辑:

    RebalancePushImpl

    1. @Override
    2. public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
    3. long result = -1;
    4. //TODO:默认是 CONSUME_FROM_LAST_OFFSET
    5. final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
    6. //TODO:获取offsetStore类型,集群模式是RemoteBrokerOffsetStore,后面还会在单独说
    7. final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
    8. switch (consumeFromWhere) {
    9. //TOOD:前3个已经废弃
    10. case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
    11. case CONSUME_FROM_MIN_OFFSET:
    12. case CONSUME_FROM_MAX_OFFSET:
    13. case CONSUME_FROM_LAST_OFFSET: {
    14. //TODO:从broker中读取offset值
    15. long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
    16. if (lastOffset >= 0) {
    17. result = lastOffset;
    18. }
    19. // First start,no offset
    20. else if (-1 == lastOffset) {
    21. if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    22. result = 0L;
    23. } else {
    24. try {
    25. result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
    26. } catch (MQClientException e) {
    27. log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
    28. throw e;
    29. }
    30. }
    31. } else {
    32. result = -1;
    33. }
    34. break;
    35. }
    36. //TODO:从broker中读取offset值
    37. case CONSUME_FROM_FIRST_OFFSET: {
    38. long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
    39. if (lastOffset >= 0) {
    40. result = lastOffset;
    41. } else if (-1 == lastOffset) {
    42. result = 0L;
    43. } else {
    44. result = -1;
    45. }
    46. break;
    47. }
    48. //TODO:从broker中读取offset值
    49. case CONSUME_FROM_TIMESTAMP: {
    50. long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
    51. if (lastOffset >= 0) {
    52. result = lastOffset;
    53. } else if (-1 == lastOffset) {
    54. if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    55. try {
    56. result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
    57. } catch (MQClientException e) {
    58. log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
    59. throw e;
    60. }
    61. } else {
    62. try {
    63. long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
    64. UtilAll.YYYYMMDDHHMMSS).getTime();
    65. result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
    66. } catch (MQClientException e) {
    67. log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
    68. throw e;
    69. }
    70. }
    71. } else {
    72. result = -1;
    73. }
    74. break;
    75. }
    76. default:
    77. break;
    78. }
    79. return result;
    80. }

    这里看到RocketMQ一个比较有意思的枚举 ConsumeFromWhere, 我这里就不对他展开描述了,如果大家感兴趣可以自己研究下,总之就是无论哪种配置策略都要先从broker读取offset,如果“没有”,则按照客户端的规则进行offset的初始化。

    所以这里就是确定好初始offset. 确定好offset后,就可以构建PullRequest了,我们继续看前面的代码:

    1. long nextOffset = -1L;
    2. try {
    3. //TODO:初始化offset
    4. nextOffset = this.computePullFromWhereWithException(mq);
    5. } catch (MQClientException e) {
    6. log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
    7. continue;
    8. }
    9. if (nextOffset >= 0) {
    10. ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    11. if (pre != null) {
    12. log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
    13. } else {
    14. log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    15. PullRequest pullRequest = new PullRequest();
    16. pullRequest.setConsumerGroup(consumerGroup);
    17. //TODO:设置初始化offset,拉取消息时,就从这个offset开始拉取
    18. pullRequest.setNextOffset(nextOffset);
    19. pullRequest.setMessageQueue(mq);
    20. pullRequest.setProcessQueue(pq);
    21. pullRequestList.add(pullRequest);
    22. changed = true;
    23. }
    24. } else {
    25. log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    26. }

    这里就比较熟悉了,前面分析消费流程的时候我们也看到过,接下来就是将填充好MessageQueue(主要就是queueid), ProcessQueue(消费者本地缓存消息的队列),nextOffset(消费偏移量)的 PullRequest 对象放入拉取消息的类PullMessageService 中的pullRequestQueue队列属性中,这样PullMessageService 就可以去broker拉取消息了。

    拉取消息的过程我们还只是关注消费偏移量(nextOffset)的变化

    3.3 利用offset拉取消息

    1. @Override
    2. public void run() {
    3. log.info(this.getServiceName() + " service started");
    4. while (!this.isStopped()) {
    5. try {
    6. //TODO:从队列中取出 PullRequest
    7. PullRequest pullRequest = this.pullRequestQueue.take();
    8. //TODO:拉取消息
    9. this.pullMessage(pullRequest);
    10. } catch (InterruptedException ignored) {
    11. } catch (Exception e) {
    12. log.error("Pull Message Service Run Method exception", e);
    13. }
    14. }
    15. log.info(this.getServiceName() + " service end");
    16. }

    那么在拉取消息的过程中,offset是如何工作的呢?我们用一幅图来展示:

    那么我们在从源码中简单看下拉取过程:

    DefaultMQPushConsumerImpl#pullMessage(PullRequest pullRequest)

    1. public void pullMessage(final PullRequest pullRequest) {
    2. //TODO:....省略诸多流控代码.......
    3. //TODO:消息从broker拉取成功后的本地回调处理
    4. //TODO:PullResult 包含了从broker读取的消息,以及新的offset
    5. PullCallback pullCallback = new PullCallback() {
    6. @Override
    7. public void onSuccess(PullResult pullResult) {
    8. if (pullResult != null) {
    9. pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    10. subscriptionData);
    11. switch (pullResult.getPullStatus()) {
    12. case FOUND:
    13. long prevRequestOffset = pullRequest.getNextOffset();
    14. //TODO:重点关注
    15. //TODO:将broker返回的新的offset值重新设置给PullRequest的 nextOffset
    16. //TODO:下次就用新的offset去broker读取消息
    17. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    18. long pullRT = System.currentTimeMillis() - beginTimestamp;
    19. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
    20. pullRequest.getMessageQueue().getTopic(), pullRT);
    21. long firstMsgOffset = Long.MAX_VALUE;
    22. if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
    23. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    24. } else {
    25. firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    26. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
    27. pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    28. boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
    29. DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    30. pullResult.getMsgFoundList(),
    31. processQueue,
    32. pullRequest.getMessageQueue(),
    33. dispatchToConsume);
    34. if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
    35. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
    36. DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    37. } else {
    38. //TODO: 重点关注
    39. //TODO:再次将PullRequest放入队列中
    40. //TODO:还是原来的PullRequest对象,只不过offset更新了
    41. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    42. }
    43. }
    44. //TODO:...省略部分代码......
    45. break;
    46. case NO_NEW_MSG:
    47. case NO_MATCHED_MSG:
    48. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    49. DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    50. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    51. break;
    52. default:
    53. break;
    54. }
    55. }
    56. }
    57. @Override
    58. public void onException(Throwable e) {
    59. if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    60. log.warn("execute the pull request exception", e);
    61. }
    62. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    63. }
    64. };
    65. //TODO:...省略部分代码。。。。。
    66. //TODO:去broker拉取消息
    67. try {
    68. this.pullAPIWrapper.pullKernelImpl(
    69. pullRequest.getMessageQueue(),
    70. subExpression,
    71. subscriptionData.getExpressionType(),
    72. subscriptionData.getSubVersion(),
    73. //TODO:从PullReqeust中获取offset值给broker
    74. pullRequest.getNextOffset(),
    75. this.defaultMQPushConsumer.getPullBatchSize(),
    76. sysFlag,
    77. commitOffsetValue,
    78. BROKER_SUSPEND_MAX_TIME_MILLIS,
    79. CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
    80. CommunicationMode.ASYNC,
    81. //TODO:上面构建的回调对象,当从broker拉取到消息后交给回调处理
    82. pullCallback
    83. );
    84. } catch (Exception e) {
    85. log.error("pullKernelImpl exception", e);
    86. this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    87. }
    88. }

    通过分析源码,我们也大概知道了在拉取消息过程中offset是如何变化的。那么接下来我们在看下offset是如何完成持久化的。

    3.4 offset的更新和持久化

    在消费者消费完消息后,就需要更新offset并完成持久化,我们我们就看下消费完成后处理消费结果的代码:

    ConsumeMessageConcurrentlyService#processConsumeResult(...)

    1. public void processConsumeResult(
    2. final ConsumeConcurrentlyStatus status,
    3. final ConsumeConcurrentlyContext context,
    4. final ConsumeRequest consumeRequest
    5. ) {
    6. int ackIndex = context.getAckIndex();
    7. if (consumeRequest.getMsgs().isEmpty())
    8. return;
    9. switch (status) {
    10. case CONSUME_SUCCESS:
    11. if (ackIndex >= consumeRequest.getMsgs().size()) {
    12. ackIndex = consumeRequest.getMsgs().size() - 1;
    13. }
    14. int ok = ackIndex + 1;
    15. int failed = consumeRequest.getMsgs().size() - ok;
    16. this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
    17. this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
    18. break;
    19. case RECONSUME_LATER:
    20. ackIndex = -1;
    21. this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
    22. consumeRequest.getMsgs().size());
    23. break;
    24. default:
    25. break;
    26. }
    27. switch (this.defaultMQPushConsumer.getMessageModel()) {
    28. case BROADCASTING:
    29. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
    30. MessageExt msg = consumeRequest.getMsgs().get(i);
    31. log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
    32. }
    33. break;
    34. case CLUSTERING:
    35. List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
    36. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
    37. MessageExt msg = consumeRequest.getMsgs().get(i);
    38. boolean result = this.sendMessageBack(msg, context);
    39. if (!result) {
    40. msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
    41. msgBackFailed.add(msg);
    42. }
    43. }
    44. if (!msgBackFailed.isEmpty()) {
    45. consumeRequest.getMsgs().removeAll(msgBackFailed);
    46. this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
    47. }
    48. break;
    49. default:
    50. break;
    51. }
    52. //TODO:关注这里即可
    53. //TODO:从本地缓存队列中将消费的消息移除
    54. long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    55. if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    56. //TODO:更新offset
    57. this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    58. }
    59. }

    我们继续看下updateOffset的逻辑:

    这里有两个实现类

    • LocalFileOffsetStore : 广播模式使用它
    • RemoteBrokerOffsetStore : 集群模式使用它,我们分析的就是集群模式

    那么我们就看下RemoteBrokerOffsetStore的updateOffset的逻辑:

    1. public class RemoteBrokerOffsetStore implements OffsetStore {
    2. //TOOD:....省略部分代码.......
    3. @Override
    4. public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
    5. if (mq != null) {
    6. AtomicLong offsetOld = this.offsetTable.get(mq);
    7. if (null == offsetOld) {
    8. offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
    9. }
    10. if (null != offsetOld) {
    11. if (increaseOnly) {
    12. MixAll.compareAndIncreaseOnly(offsetOld, offset);
    13. } else {
    14. offsetOld.set(offset);
    15. }
    16. }
    17. }
    18. }
    19. }

    其实很简单,就是将消费过的消息的offset(递增)更新到消费者本地offset变量表中。然后通过定时任务持久化到broker中。

    接下来在简单看下消费者客户端持久化offset到broker. 在消费者启动时,会启动一个定时任务:

    1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    2. @Override
    3. public void run() {
    4. try {
    5. MQClientInstance.this.persistAllConsumerOffset();
    6. } catch (Exception e) {
    7. log.error("ScheduledTask persistAllConsumerOffset exception", e);
    8. }
    9. }
    10. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    每隔5s将offset持久化到broker

    我们在简单看下broker端如何处理offset

    1. 首先要接收客户端发送的offset
    1. public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    2. //TODO:.....省略其他code.....
    3. @Override
    4. public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    5. throws RemotingCommandException {
    6. switch (request.getCode()) {
    7. case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
    8. return this.getConsumerListByGroup(ctx, request);
    9. //TODO:更新offset
    10. case RequestCode.UPDATE_CONSUMER_OFFSET:
    11. return this.updateConsumerOffset(ctx, request);
    12. case RequestCode.QUERY_CONSUMER_OFFSET:
    13. return this.queryConsumerOffset(ctx, request);
    14. default:
    15. break;
    16. }
    17. return null;
    18. }
    19. //TODO:....省略其他代码
    20. }

    最终代码:

    1. public class ConsumerOffsetManager extends ConfigManager {
    2. private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    3. private static final String TOPIC_GROUP_SEPARATOR = "@";
    4. //TODO: key = topic@group
    5. //TODO: value = Map[key = queueId, value = offset]
    6. private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
    7. new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
    8. //TODO:....省略诸多code.....
    9. private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
    10. ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
    11. if (null == map) {
    12. map = new ConcurrentHashMap<Integer, Long>(32);
    13. map.put(queueId, offset);
    14. this.offsetTable.put(key, map);
    15. } else {
    16. Long storeOffset = map.put(queueId, offset);
    17. if (storeOffset != null && offset < storeOffset) {
    18. log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
    19. }
    20. }
    21. }
    22. //TODO:......
    23. }

    其实也很简单,就是将客户端发送的offset更新到broker的offset表中,然后再通过broker的定时任务持久化到文件中。那么我们在简单看下broker的持久化:

    1. broker的持久化

    broker启动时,也会启动持久化的定时任务

    1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    2. @Override
    3. public void run() {
    4. try {
    5. BrokerController.this.consumerOffsetManager.persist();
    6. } catch (Throwable e) {
    7. log.error("schedule persist consumerOffset error.", e);
    8. }
    9. }
    10. }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    11. 复制代码

    通过定时任务将上面ConsumerOffsetManager类中的offset表offsetTable写到文件中。至此,就完成了offset的更新和持久化。

    4.总结

    本文从源码的角度分析了RocketMQ的消费偏移量offset是如何被使用的,简单总结下持久化过程:

    1. 消费者客户端本地维护一个offset表,消费者消费完成后先更新到本地offset表中
    2. 消费者客户端启动时会开启一个定时任务,将本地的offset表发送到broker
    3. broker会接收消费者客户端发送的offset数据,并保存到broker的本地offset表中
    4. broker启动时也会开启定时任务,用于将broker的本地offset表中的数据持久化到文件中

    关于offset的使用,请参考上面的图。

  • 相关阅读:
    用边缘计算网关解决离散行业数采问题-天拓四方
    【Python性能优化】list、array与set
    Python ————浅拷贝与深拷贝
    程序化易程序分析之后,如何进入交易程序?
    漂亮的pyqt6皮肤 PyOneDark_Qt_Widgets_Modern_GUIPublic
    【状语从句练习题】例句
    JS防抖与节流
    Vue3项目关于轮播图的封装应该怎么封装才是最简单的呢
    开源一个教学型分库分表示例项目 shardingsphere-jdbc-demo
    Python从入门到入土-基础语法
  • 原文地址:https://blog.csdn.net/LBWNB_Java/article/details/126222047