• RocketMQ源码(二十)之事务消息


    版本

    1. 基于rocketmq-all-4.3.1版本

    简介

    1. 事务消息流程

      • 应用程序事务发起者发送事务Prepare消息到MQ的broker端,发送成功后,Broker会回调事务监听器的本地事务执行方法执行本地事务
      • RocketMQ的broker收到Prepare消息后,先对消息的topic与消费队列进行备份,然后存储到主题为 RMQ_SYS_TRANS_HALF_TOPIC 的队列中(只有一个队列)
      • broker端启动时会启动一个定时任务,取出RMQ_SYS_TRANS_HALF_TOPIC中的消息向消息的发送者(生产组中的任意一个Producer)发起回查。发送端根据本地事务的具体执行状态返回提交/回滚/事务未知状态
      • 如果返回提交/回滚则broker对事务消息进行提交或者回滚
        • 如果返回了未知,则等待下次继续进行回查。达到回查最大次数依旧无法获取事务状态的消息,broker会对该事务消息做回滚操作。
        • 如果是提交事务,将事务消息恢复,写入到原始的Topic中,然后向RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中写入一条消息。消息体就是当前这条事务消息的队列偏移值
        • 如果是回滚事务,只是向RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中写入一条消息。消息体就是当前这条事务消息的队列偏移值
    2. 整体的交互过程
      在这里插入图片描述

    3. 为什么要采用额外的两个Topic来实现事务消息呢?

      • 为了让消费端不能消费Prepare消息,将Prepare消息先写入到RMQ_SYS_TRANS_HALF_TOPIC的队列中。这样的好处就是服务端处理发送事务Prepare消息的逻辑与普通消息的逻辑没什么区别,可以直接复用,只是额外做一下判断即可

      • RocketMQ所有的消息都是追加的方式写入到文件中,无论是提交或者回滚我们都不能直接修改或者删除原来的Prepare消息。这样会导致很多的脏页,严重影响性能。所以删除和修改就是向另一个RMQ_SYS_TRANS_HALF_TOPIC的队列里写入一条消息。这样当事务回查时,先从此队列中查询,如果找不到,说明是未知状态,此时需要再次回查

      • 以上这样做的缺点就是:所有消息都写入RMQ_SYS_TRANS_HALF_TOPIC队列,如果事务消息较多可能有瓶颈。并且消息会被存储多次

    4. 如果回查次数达到最大值或者文件已经过期,当前版本只是打印日志。如果Broker发现消息是未知状态,当再次处理时会重新追加这条消息

    发送事务消息

    1. 发送事务消息使用TransactionMQProducer,此类继承DefaultMQProducer。委托DefaultMQProducerImpl执行发送逻辑

      @Override
      public TransactionSendResult sendMessageInTransaction(final Message msg,
          final Object arg) throws MQClientException {
          if (null == this.transactionListener) {
              throw new MQClientException("TransactionListener is null", null);
          }
      
          return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    2. DefaultMQProducerImpl#sendMessageInTransaction是发送的核心方法。主要逻辑

      • 校验事务监听器和消息相关配置(消息、topic、消息大小等)
      • 设置消息的事务属性,表示这是一个事务prepare消息。设置生产者组。用于回查本地事务,从生产者组中选择随机选择一个生产者。避免由于生产者挂掉导致一直回查失败
      • 发送prepare消息,返回成功结果后(SEND_OK)才执行本地回调事务监听器transactionListener。如果发送发生异常,则不会执行本地事务监听器
      • 发送本地处理结果给Broker,Broker根据状态回滚或者提交
      public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                            final LocalTransactionExecuter localTransactionExecuter, final Object arg)
          throws MQClientException {
          // 校验是否配置事务监听器
          TransactionListener transactionListener = getCheckListener();
          if (null == localTransactionExecuter && null == transactionListener) {
              throw new MQClientException("tranExecutor is null", null);
          }
          Validators.checkMessage(msg, this.defaultMQProducer);
      
          SendResult sendResult = null;
          //设置属性,表示这是一个Prepare消息
          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        	//设置生产者组。用于回查本地事务,从生产者组中选择随机选择一个生产者。避免由于生产者挂掉导致一直回查失败
          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
          try {
              sendResult = this.send(msg);
          } catch (Exception e) {
              throw new MQClientException("send message Exception", e);
          }
      
          LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
          Throwable localException = null;
          switch (sendResult.getSendStatus()) {
              case SEND_OK: {
                  try {
                      // 事务ID
                      if (sendResult.getTransactionId() != null) {
                          msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                      }
                      //UNIQ_KEY,客户端发送时生成的唯一ID
                      String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                      if (null != transactionId && !"".equals(transactionId)) {
                          msg.setTransactionId(transactionId);
                      }
                      // 执行本地配合的transactionListener逻辑。localTransactionExecuter已经过时
                      if (null != localTransactionExecuter) {
                          localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                      } else if (transactionListener != null) {
                          log.debug("Used new transaction API");
                          //如果这个执行出现异常可能导致localTransactionState默认就是UNKNOW,如果返回null,则需要赋值一个默认值UNKNOW
                          localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                      }
                      if (null == localTransactionState) {
                          localTransactionState = LocalTransactionState.UNKNOW;
                      }
      
                      if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                          log.info("executeLocalTransactionBranch return {}", localTransactionState);
                          log.info(msg.toString());
                      }
                  } catch (Throwable e) {
                      log.info("executeLocalTransactionBranch exception", e);
                      log.info(msg.toString());
                      localException = e;
                  }
              }
              break;
              // 未发送成功,设置回滚状态
              case FLUSH_DISK_TIMEOUT:
              case FLUSH_SLAVE_TIMEOUT:
              case SLAVE_NOT_AVAILABLE:
                  localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                  break;
              default:
                  break;
          }
      
          try {
              // 发送本地处理结果给Broker
              this.endTransaction(sendResult, localTransactionState, localException);
          } catch (Exception e) {
              log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
          }
      
          TransactionSendResult transactionSendResult = new TransactionSendResult();
          transactionSendResult.setSendStatus(sendResult.getSendStatus());
          transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
          transactionSendResult.setMsgId(sendResult.getMsgId());
          transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
          transactionSendResult.setTransactionId(sendResult.getTransactionId());
          transactionSendResult.setLocalTransactionState(localTransactionState);
          return transactionSendResult;
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
    3. DefaultMQProducerImpl#endTransaction发送本地处理结果给Broker,本地处理结果会做转换

      • 如果localTransactionState==COMMIT_MESSAGE,设置为MessageSysFlag.TRANSACTION_COMMIT_TYPE( 0x2 << 2;//1000)
      • 如果localTransactionState==ROLLBACK_MESSAGE,设置为MessageSysFlag.TRANSACTION_ROLLBACK_TYPE(0x3 << 2;//1100)
      • 如果localTransactionState==UNKNOW,设置为MessageSysFlag.TRANSACTION_NOT_TYPE(0;//0000)
      public void endTransaction(
          final SendResult sendResult,
          final LocalTransactionState localTransactionState,
          final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
          final MessageId id;
          // 服务端的消息ID
          if (sendResult.getOffsetMsgId() != null) {
              id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
          } else {
              id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
          }
          String transactionId = sendResult.getTransactionId();
          // prepare发送到哪个broker,就提交或者回滚在哪个Broker
          final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
          EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
          requestHeader.setTransactionId(transactionId);
          //事务消息的提交偏移量
          requestHeader.setCommitLogOffset(id.getOffset());
          switch (localTransactionState) {
              case COMMIT_MESSAGE:
                  requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                  break;
              case ROLLBACK_MESSAGE:
                  requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                  break;
              case UNKNOW:
                  requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                  break;
              default:
                  break;
          }
      
          requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
          requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
          requestHeader.setMsgId(sendResult.getMsgId());
          // 携带本地执行事务回调的异常信息
          String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
          //发送事务本地处理结果给Broker
          this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
              this.defaultMQProducer.getSendMsgTimeout());
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41

    Broker处理Prepare消息

    1. 与处理普通消息一样,事务的prepare消息也是通过SendMessageProcessor#processRequest处理。针对事务Prepare消息的存储与普通消息不同的是,其委托TransactionalMessageService进行处理

       //获取producer发送的时候设置的事务消息属性[prepare消息  commit消息]
       String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
       if (traFlag != null && Boolean.parseBoolean(traFlag)) {
           //是否允许事务消息存储,默认允许
           if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
               response.setCode(ResponseCode.NO_PERMISSION);
               response.setRemark(
                   "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                       + "] sending transaction message is forbidden");
               return response;
           }
           putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
       } else {
           putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    2. TransactionalMessageServiceImpl#prepareMessage调用TransactionalMessageBridge#putHalfMessage进行预存储事务消息,将消息原始的Topic即QueueId信息备份到属性中(为了后续提交时使用),将消息的原始Topic更改为RMQ_SYS_TRANS_HALF_TOPIC,此Topic只有一个队列0。默认是调用DefaultMessageStore#putMessage。这里就跟普通消息的存储没有任何区别了。

      //TransactionalMessageServiceImpl#prepareMessage
      @Override
      public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
          return transactionalMessageBridge.putHalfMessage(messageInner);
      }
      private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
          // 把消息的原始topic及队列信息存储到属性中,因为要写到事务prepare的主题RMQ_SYS_TRANS_HALF_TOPIC的队列里
          MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
          MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
              String.valueOf(msgInner.getQueueId()));
          // 去掉事务标记,设置为0
          msgInner.setSysFlag(
              MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
          // prepare消息的主题RMQ_SYS_TRANS_HALF_TOPIC
          msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
          // RMQ_SYS_TRANS_HALF_TOPIC只有一个队列
          msgInner.setQueueId(0);
          msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
          return msgInner;
      }
      
      //TransactionalMessageBridge#putHalfMessage
      public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
          // 将消息存储到CommitLog或者其他的存储中
          return store.putMessage(parseHalfMessageInner(messageInner));
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    3. 从上面可知,事务Prepare消息的存储与普通消息并没有太大区别,那Broker是如何保证Prepare不会被Consume消费掉的呢?主要通过一下方式

      • 旧的实现方式(废弃)

        • Broker在消息写入CommitLog的时候会判断消息类型,如果是prepare或者rollback的事务消息,ConsumeQueue的queueOffset不会增加(queueOffset每追加一条就会自增)。

        • Broker在构造ConsumeQueue时会判断prepare和rollback消息,如果是则不会将消息写入ConsumeQueue。即此消息不会在ConsumeQueue中,所以Consumer也就不会消费

      • 新的实现方式:更改原有的Topic,只有Commit消息后才会将其发送到原始的Topic下,这样就保证没有Commit前,Consumer无法消费

    4. 查看CommitLog.DefaultAppendMessageCallback#doAppend是事务消息相关的判断。由于前面将Topic换成事务的Topic,并且将事务的标记去掉了,所以这里标记永远是TRANSACTION_NOT_TYPE。之所以有这个逻辑,我猜测是之前的事务实现方式(没有更改Topic的方式)

      public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) {
          ...省略...
          // Record ConsumeQueue information
          keyBuilder.setLength(0);
          keyBuilder.append(msgInner.getTopic());
          keyBuilder.append('-');
          keyBuilder.append(msgInner.getQueueId());
          String key = keyBuilder.toString();
          Long queueOffset = CommitLog.this.topicQueueTable.get(key);
          if (null == queueOffset) {
              queueOffset = 0L;
              CommitLog.this.topicQueueTable.put(key, queueOffset);
          }
      
          //事务消息Prepare和Rollback消息,队列偏移量都设置的是0
          final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
          switch (tranType) {
              // Prepared and Rollback message is not consumed, will not enter the
              // consumer queuec
              case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
              case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                  queueOffset = 0L;
                  break;
              case MessageSysFlag.TRANSACTION_NOT_TYPE:
              case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
              default:
                  break;
          }
      
          ...省略...
          // 只有事务TRANSACTION_COMMIT_TYPE消息和TRANSACTION_NOT_TYPE才会设置队列偏移量
          switch (tranType) {
              case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
              case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                  break;
              case MessageSysFlag.TRANSACTION_NOT_TYPE:
              case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                  // The next update ConsumeQueue information
                  CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                  break;
              default:
                  break;
          }
          return result;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
    5. 查看DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch是异步构造ConsumeQueue的地方。这里可以看到,如果是Prepare和Rollback消息,并不会构造。这样Consumer也就无法消费了

      class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
      
          @Override
          public void dispatch(DispatchRequest request) {
              final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
              switch (tranType) {
                  case MessageSysFlag.TRANSACTION_NOT_TYPE:
                  case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                      DefaultMessageStore.this.putMessagePositionInfo(request);
                      break;
                  // 对于prepare和rollback消息不会构造ConsumeQueue
                  case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                  case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                      break;
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17

    Broker处理提交/回滚消息

    1. Producer发送本地结果给Broker,调用RequestCode.END_TRANSACTION命令,此命令在broker端是通过EndTransactionProcessor来进行处理的。EndTransactionProcessor#processRequest处理逻辑如下

      • 只有Master节点可以处理,打印相关日志。只有提交或者回滚的消息才会向下执行
      • 如果是回滚消息,根据偏移量从RMQ_SYS_TRANS_HALF_TOPIC查询出提交的消息,并检查Prepare消息的正确性。删除回滚消息(其实就是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题写入消息,tag是d),标识此消息已经被删除
      • 如果是提交消息,根据偏移量从RMQ_SYS_TRANS_HALF_TOPIC查询出提交的消息,并检查Prepare消息的正确性。恢复原始消息,包括恢复原始Topic、Queue等,并且清除事务属性,并且将原始消息存储到CommitLog中,存储成功时删除prepare消息(其实就是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题写入消息,tag是d),标识此消息已经被删除
    2. EndTransactionProcessor#processRequest源码

      @Override
      public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
          RemotingCommandException {
          final RemotingCommand response = RemotingCommand.createResponseCommand(null);
          final EndTransactionRequestHeader requestHeader =
              (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
          LOGGER.info("Transaction request:{}", requestHeader);
          // 从节点不允许处理事务消息
          if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
              response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
              LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
              return response;
          }
          // 事务回查标记,是否为事务回查(仅仅打印日志),只有提交或者回滚的消息才向后处理
          if (requestHeader.getFromTransactionCheck()) {
              switch (requestHeader.getCommitOrRollback()) {
                  case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                      LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                              + "RequestHeader: {} Remark: {}",
                          RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                          requestHeader.toString(),
                          request.getRemark());
                      return null;
                  }
      
                  case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                      LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                              + "RequestHeader: {} Remark: {}",
                          RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                          requestHeader.toString(),
                          request.getRemark());
      
                      break;
                  }
      
                  case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                      LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                              + "RequestHeader: {} Remark: {}",
                          RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                          requestHeader.toString(),
                          request.getRemark());
                      break;
                  }
                  default:
                      return null;
              }
          } else {
              switch (requestHeader.getCommitOrRollback()) {
                  case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                      LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                              + "RequestHeader: {} Remark: {}",
                          RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                          requestHeader.toString(),
                          request.getRemark());
                      return null;
                  }
      
                  case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                      break;
                  }
      
                  case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                      LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                              + "RequestHeader: {} Remark: {}",
                          RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                          requestHeader.toString(),
                          request.getRemark());
                      break;
                  }
                  default:
                      return null;
              }
          }
          OperationResult result = new OperationResult();
          if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
              // 根据之前提交的内部事务topic的偏移量查出来提交的这条消息
              result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
              if (result.getResponseCode() == ResponseCode.SUCCESS) {
                  // 校验查询出来的这条消息是否正确
                  RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                  if (res.getCode() == ResponseCode.SUCCESS) {
                      // 恢复原始消息
                      MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                      msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                      msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                      msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                      msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                      //存储到CommitLog文件中,如果成功,则删除半消息
                      RemotingCommand sendResult = sendFinalMessage(msgInner);
                      if (sendResult.getCode() == ResponseCode.SUCCESS) {
                          // 删除prepare消息,其实就是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题写入消息,tag是d
                          // 因为RocketMQ是追加消息,不支持更改和删除,所以删除就是在特有的主题下新增一条消息
                          // 这样无论是提交还是回滚,都可以找到,以此来判断是回滚还是提交了。如果没有则是未知状态
                          this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                      }
                      return sendResult;
                  }
                  return res;
              }
          } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
              result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
              if (result.getResponseCode() == ResponseCode.SUCCESS) {
                  RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                  if (res.getCode() == ResponseCode.SUCCESS) {
                      this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                  }
                  return res;
              }
          }
          response.setCode(result.getResponseCode());
          response.setRemark(result.getResponseRemark());
          return response;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
    3. TransactionalMessageServiceImpl#deletePrepareMessage删除消息(并不是物理删除,而是追加),删除消息本质是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题的队列追加一条有特定tag的消息

      TransactionalMessageServiceImpl#deletePrepareMessage
      @Override
      public boolean deletePrepareMessage(MessageExt msgExt) {
          if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
              log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
              return true;
          } else {
              log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
              return false;
          }
      }
      
      // TransactionalMessageBridge#putOpMessage 向RMQ_SYS_TRANS_OP_HALF_TOPIC追加消息
      public boolean putOpMessage(MessageExt messageExt, String opType) {
          // messageExt是Prepare消息
          // 构建一个消息队列
          MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
              this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
          if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
              return addRemoveTagInTransactionOp(messageExt, messageQueue);
          }
          return true;
      }
      
      //TransactionalMessageBridge#addRemoveTagInTransactionOp
      private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
          Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
              String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
          writeOp(message, messageQueue);
          return true;
      }
      
      //TransactionalMessageBridge#writeOp 调用存储putMessage
      private void writeOp(Message message, MessageQueue mq) {
        	//此处mq指的是Prepare消息队列(RMQ_SYS_TRANS_HALF_TOPIC主题的)
          //key=RMQ_SYS_TRANS_HALF_TOPIC队列与value=RMQ_SYS_TRANS_OP_HALF_TOPIC缓存
          MessageQueue opQueue;
          if (opQueueMap.containsKey(mq)) {
              opQueue = opQueueMap.get(mq);
          } else {
              // 创建一个RMQ_SYS_TRANS_OP_HALF_TOPIC主题的消息队列
              opQueue = getOpQueueByHalf(mq);
              // 如果已经存在不会覆盖已有的值,直接返回已有的值
              MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
              if (oldQueue != null) {
                  opQueue = oldQueue;
              }
          }
          //TODO by jannal 此处为什么会为null ??
          if (opQueue == null) {
              opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
          }
          putMessage(makeOpMessageInner(message, opQueue));
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55

    Broker事务回查

    1. 正常情况下如果客户端在处理回调后,会返回给Broker相关的状态。假设Producer此时挂了,或者因为网络原因调用Broker失败了。这个时候就需要Broker事务定期回查。

    2. TransactionalMessageCheckService是一个服务线程,在Broker启动时,此服务线程会启动。默认60s执行一次回查,每次执行的超时时间是6s,最大回查次数15次。调用TransactionalMessageService#check方法做消息回查

      BrokerController#start
       
      ...省略....
      if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
          if (this.transactionalMessageCheckService != null) {
              log.info("Start transaction service!");
              this.transactionalMessageCheckService.start();
          }
      } 
      
      TransactionalMessageCheckService#run
      @Override
      public void run() {
          log.info("Start transaction check service thread!");
        	// 默认60s
          long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
          while (!this.isStopped()) {
              this.waitForRunning(checkInterval);
          }
          log.info("End transaction check service thread!");
      }
      
      @Override
      protected void onWaitEnd() {
          //默认6000ms
          long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
          //默认最大检查15次
          int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
          long begin = System.currentTimeMillis();
          log.info("Begin to check prepare message, begin time:{}", begin);
          this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
          log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
    3. TransactionalMessageService#check主要流程

      • 根据RMQ_SYS_TRANS_HALF_TOPIC查找队列,目前只有一个0队列
      • 遍历RMQ_SYS_TRANS_HALF_TOPIC的MessageQueue,每个MessageQueue的处理时间是60s
      • 通过RMQ_SYS_TRANS_HALF_TOPIC的MessageQueue作为Key从缓存中获取RMQ_SYS_TRANS_OP_HALF_TOPIC的MessageQueue,如果不存在,则创建一个
      • 使用CID_RMQ_SYS_TRANS消费组拉取op队列里的消息,一次拉取32条
      • 判断prepare中获取到的消息与OP中的对比,如果OP中包含此消息,则不回查。如果回查超过15次、消息过期,则直接跳过
      • 如果处理时间已经超过了事务的回查时间,则进行回查,否则继续拉取消息。
      • 将消息重新追加prepare消息队列并更新偏移量
      • 发送回查消息给Producer
      • 更新prepare队列和op队列的消费进度
    4. TransactionalMessageService#check源码

      @Override
      public void check(long transactionTimeout, int transactionCheckMax,
          AbstractTransactionalMessageCheckListener listener) {
          try {
              String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
              // RMQ_SYS_TRANS_HALF_TOPIC 只有一个队列
              Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
              if (msgQueues == null || msgQueues.size() == 0) {
                  log.warn("The queue of topic is empty :" + topic);
                  return;
              }
              log.info("Check topic={}, queues={}", topic, msgQueues);
              for (MessageQueue messageQueue : msgQueues) {
                  // 队列处理开始时间
                  long startTime = System.currentTimeMillis();
                  // 一条prepare消息队列对应一个op队列(提交或回滚后),实际就一个队列
                  MessageQueue opQueue = getOpQueue(messageQueue);
                  // 获取prepare消息队列最新的的消费偏移量
                  long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                  // 获取op消息队列最新的消费偏移量
                  long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                  log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                  if (halfOffset < 0 || opOffset < 0) {
                      log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                          halfOffset, opOffset);
                      continue;
                  }
                  // 已经被处理的Op消息的偏移量
                  List<Long> doneOpOffset = new ArrayList<>();
                  // 已经被删除的prepare消息
                  HashMap<Long, Long> removeMap = new HashMap<>();
                  // 确认prepare消息是否已经被删除。主要目的是为了避免重复调用事务回查请求
                  PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                  if (null == pullResult) {
                      log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                          messageQueue, halfOffset, opOffset);
                      continue;
                  }
                  // single thread
                  // 获取空消息的次数
                  int getMessageNullCount = 1;
                  long newOffset = halfOffset;
                  // 逻辑偏移量
                  long i = halfOffset;
                  while (true) {
                      // 每一个MessageQueue处理时间限制在60s内
                      if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                          log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                          break;
                      }
                      // 如果Prepare消息已经被处理过,则直接remove
                      if (removeMap.containsKey(i)) {
                          log.info("Half offset {} has been committed/rolled back", i);
                          removeMap.remove(i);
                      } else {
                          // 获取当前要处理的prepare消息
                          GetResult getResult = getHalfMsg(messageQueue, i);
                          MessageExt msgExt = getResult.getMsg();
                          // 消息不存在,直接退出循环
                          if (msgExt == null) {
                              if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                  break;
                              }
                              if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                  log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                      messageQueue, getMessageNullCount, getResult.getPullResult());
                                  break;
                              } else {
                                  log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                      i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                  i = getResult.getPullResult().getNextBeginOffset();
                                  newOffset = i;
                                  continue;
                              }
                          }
                          // 超过15次丢弃,或者消息过期了(超过了设置的文件保存时间,默认72小时)
                          if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                              // 默认是打印一条日志
                              listener.resolveDiscardMsg(msgExt);
                              newOffset = i + 1;
                              i++;
                              continue;
                          }
                          // 消息存储时间大于回查程序开始时间的不处理,这是一个防御
                          if (msgExt.getStoreTimestamp() >= startTime) {
                              log.info("Fresh stored. the miss offset={}, check it later, store={}", i,
                                  new Date(msgExt.getStoreTimestamp()));
                              break;
                          }
                          // 消息已存储的时间差
                          long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                          //默认超时6s
                          long checkImmunityTime = transactionTimeout;
                          //目前此属性只是在测试用例使用
                          String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                          if (null != checkImmunityTimeStr) {
                              //checkImmunityTimeStr如果是-1,则使用transactionTimeout
                              checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                              // 给事务预留时间用于提交事务,此时不应该做回查
                              if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                  // 超过检查时间,重新写回prepare消息队列
                                  if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) {
                                      newOffset = i + 1;
                                      i++;
                                      continue;
                                  }
                              }
                          } else {
                              // 新提交的prepare消息,暂不处理,此时可能正在提交的路上
                              if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                  log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                      checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                  break;
                              }
                          }
                          List<MessageExt> opMsg = pullResult.getMsgFoundList();
                          // checkImmunityTime默认是6秒,第一次可以检查的时间
                          // 时间超过事务超时时间、最后一条消息的存储时间减去处理的起始时间超过超时时间
                          // TODO valueOfCurrentMinusBorn什么情况会<=-1????
                          boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                              || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                              || (valueOfCurrentMinusBorn <= -1);
      
                          if (isNeedCheck) {
                              // 把这个消息重新写回prepare消息队列里并更新偏移量
                              if (!putBackHalfMsgQueue(msgExt, i)) {
                                  continue;
                              }
                              // 事务回查(异步线程发送),发送消息给Producer
                              listener.resolveHalfMsg(msgExt);
                          } else {
                              // 如果没有超时继续拉
                              pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                              log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                  messageQueue, pullResult);
                              continue;
                          }
                      }
                      // 消费偏移量加+1
                      newOffset = i + 1;
                      i++;
                  }
                  // 说明消费了,此时需要更新偏移量
                  if (newOffset != halfOffset) {
                      transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                  }
      
                  long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                  if (newOpOffset != opOffset) {
                      transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
              log.error("Check error", e);
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
  • 相关阅读:
    NVIDIA Maxine Video Effects SDK 編程指南 - 实践小记
    FFmpeg开发笔记(九)Linux交叉编译Android的x265库
    在2023年使用Unity2021从Built-in升级到Urp可行么
    我注册了某音帐号之后。。。(内含推荐算法)
    Linux系统编程:makefile以及文件系统编程
    基于ElasticSearch+Vue实现简易搜索
    day02 spring-ioc
    大数据培训技术Kylin特点
    人工智能、机器学习概述
    计算机网络基础概念入门
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126504592