在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()
方法执行的。
- private void initialTransaction() {
- this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
- if (null == this.transactionalMessageService) {
- this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
- LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
- }
- this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
- if (null == this.transactionalMessageCheckListener) {
- this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
- LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
- }
- this.transactionalMessageCheckListener.setBrokerController(this);
- this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
- }
- 复制代码
这里有三个核心的初始化变量
事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl
也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()
方法进行加载。TransactionalMessageService
类定义如下。内部属性已加注释标明。
- public interface TransactionalMessageService {
- //用于保存Half事务消息
- PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
- CompletableFuture
asyncPrepareMessage(MessageExtBrokerInner messageInner); - //删除事务消息
- boolean deletePrepareMessage(MessageExt messageExt);
- //提交事务消息
- OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
- //回滚事务消息
- OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
- void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
- //打开事务消息
- boolean open();
- //关闭事务消息
- void close();
- }
- 复制代码
事务消息回查监听器
事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。
当初始化完成之后,Broker就可以处理事务消息了。
Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor
,这和普通消息其实是一样的。
但是有两点针对事务消息的特殊处理:
在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
中:
- //获取扩展字段的值,若是该值为true则为事务消息
- String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- boolean sendTransactionPrepareMessage = false;
- if (Boolean.parseBoolean(traFlag)
- && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) {
- //判断当前Broker配置是否支持事务消息
- if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark(
- "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
- + "] sending transaction message is forbidden");
- return response;
- }
- sendTransactionPrepareMessage = true;
- }
- 复制代码
- if (sendTransactionPrepareMessage) {
- //保存Half信息
- putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
- } else {
- putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
- }
- 复制代码
存储事务消息前的预处理,对应方法是org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
- private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
- //将原消息的topic保存在扩展字段中
- MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
- //将原消息的QueueId保存在扩展字段中
- MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
- String.valueOf(msgInner.getQueueId()));
- //将原消息的SysFlag保存在扩展字段中
- msgInner.setSysFlag(
- MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
- //修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
- msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
- //修改Queueid为0
- msgInner.setQueueId(0);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- return msgInner;
- }
- 复制代码
完成上述步骤之后,调用DefaultMessageStole.putMessage()
方法将其保存到CommitLog
中。
CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
方法对其进行处理。
- final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
- switch (tranType) {
- // Prepared and Rollback message is not consumed, will not enter the consume queue
- case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
- case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
- queueOffset = 0L;
- break;
- case MessageSysFlag.TRANSACTION_NOT_TYPE:
- case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
- default:
- break;
- }
- 复制代码
这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费。