• RocketMQ特性--Broker是如何存储事务消息的?


    Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。

    1. private void initialTransaction() {
    2. this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    3. if (null == this.transactionalMessageService) {
    4. this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
    5. LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    6. }
    7. this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    8. if (null == this.transactionalMessageCheckListener) {
    9. this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
    10. LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    11. }
    12. this.transactionalMessageCheckListener.setBrokerController(this);
    13. this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    14. }
    15. 复制代码

    这里有三个核心的初始化变量

    TransactionalMessageService

    事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()方法进行加载。TransactionalMessageService类定义如下。内部属性已加注释标明。

    1. public interface TransactionalMessageService {
    2. //用于保存Half事务消息
    3. PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
    4. CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner);
    5. //删除事务消息
    6. boolean deletePrepareMessage(MessageExt messageExt);
    7. //提交事务消息
    8. OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
    9. //回滚事务消息
    10. OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
    11. void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    12. //打开事务消息
    13. boolean open();
    14. //关闭事务消息
    15. void close();
    16. }
    17. 复制代码

    transactionalMessageCheckListener

    事务消息回查监听器

    transactionalMessageCheckService

    事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。

    处理事务消息

    当初始化完成之后,Broker就可以处理事务消息了。

    Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,这和普通消息其实是一样的。
    但是有两点针对事务消息的特殊处理

    第一处:

    org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

    1. //获取扩展字段的值,若是该值为true则为事务消息
    2. String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    3. boolean sendTransactionPrepareMessage = false;
    4. if (Boolean.parseBoolean(traFlag)
    5. && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) {
    6. //判断当前Broker配置是否支持事务消息
    7. if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
    8. response.setCode(ResponseCode.NO_PERMISSION);
    9. response.setRemark(
    10. "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
    11. + "] sending transaction message is forbidden");
    12. return response;
    13. }
    14. sendTransactionPrepareMessage = true;
    15. }
    16. 复制代码
    1. if (sendTransactionPrepareMessage) {
    2. //保存Half信息
    3. putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    4. } else {
    5. putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    6. }
    7. 复制代码

    第二处:

    存储事务消息前的预处理,对应方法是org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

    1. private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    2. //将原消息的topic保存在扩展字段中
    3. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    4. //将原消息的QueueId保存在扩展字段中
    5. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
    6. String.valueOf(msgInner.getQueueId()));
    7. //将原消息的SysFlag保存在扩展字段中
    8. msgInner.setSysFlag(
    9. MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    10. //修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
    11. msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    12. //修改Queueid为0
    13. msgInner.setQueueId(0);
    14. msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    15. return msgInner;
    16. }
    17. 复制代码

    完成上述步骤之后,调用DefaultMessageStole.putMessage()方法将其保存到CommitLog中。

    CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法对其进行处理。

    1. final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    2. switch (tranType) {
    3. // Prepared and Rollback message is not consumed, will not enter the consume queue
    4. case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    5. case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
    6. queueOffset = 0L;
    7. break;
    8. case MessageSysFlag.TRANSACTION_NOT_TYPE:
    9. case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    10. default:
    11. break;
    12. }
    13. 复制代码

    这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费

  • 相关阅读:
    【CC3200AI 实验教程11】疯壳·AI语音人脸识别(会议记录仪/人脸打卡机)-AI语音系统架构
    为什么要少用全局变量
    Black Friday案例分析
    LeetCode-组合总和 III(C++)
    docker部署Elasticsearch集群并设置安全
    “Ubuntu终端闪退”的解决方法
    C#基础--委托、lambda表达式和事件
    【Android】WebView控件最全使用解析
    第一节 vue3 router内置类型有哪些
    C++工程师面试模拟
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/126384204