目录
TransactionalMessageCheckService
先盗个流程图来简介一下事务消息的大致流程:
1、Producer先发送半消息,半消息暂时对消费者们不可见
2、半消息发送成功
3、发送半消息成功后,Producer开始执行本地事务,本地事务执行成功后返回事务状态(Commit\Rollback\Unknow)
4、将本地事务状态返回给Broker,如果是Commit,则将半消息暴露给消费者消费,如果是Rollback,则删除该消息,如果是Unknow,则表示Broker需要调用事务回查接口来判断具体事务状态
5、因为返回Unknow或者网络原因等Commit/Rollback指令未发送到Broker,Broker定时调用回查接口判断本地事务状态
6、执行Producer端定义的回查接口逻辑
7、回查接口返回事务状态,Broker根据Commit/Rollback状态进行不同处理
下边从Producer端和Broker端来分析源码是如何实现的
来看Producer端的事务消息实现,与普通消息的发送有以下几点区别
1、TransactionMQProducer 事务消息的Producer实现类,继承DefaultMQProducer并扩展了以下两个属性
2、TransactionListener 事务监听器,实现两个接口来执行本地事务和回查
3、LocalTransactionState 本地事务执行状态
example包中示例的生产者端代码
1、发送前校验(清除延时标记、消息大小校验)
2、设置事务消息标记并发送
3、发送成功后调用监听器执行本地事务
4、向Broker发送END_TRANSACTION消息返回本地事务状态
Broker端有三部分,一是接收半消息的处理,二是Producer返回Broker本地事务状态,三是Broker进行事务状态回查,下边挨个看代码实现
SendMessageProcessor是Broker中接收RequestCode.SEND_MESSAGE消息的处理类
asyncSendMessage
事务消息使用TransactionalMessageService进行存储消息
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#asyncPrepareMessage
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#asyncPutHalfMessage
消息和延迟消息一样被替换了Topic,写入到了RMQ_SYS_TRANS_HALF_TOPIC 主题下,再由定时任务进行事务状态判断,然后写入原Topic由消费者消费
生产者返回本地事务状态,EndTransactionProcessor是Broker端对RequestCode.END_TRANSACTION的处理类,根据返回的事务状态是Commit还是Rollback做不同处理
Commit: 将消息查询出来恢复原Topic等,重新写入CommitLog进行消息分发,再删除半消息,这里删除不是真的删除,是将消息写到了RMQ_SYS_TRANS_OP_HALF_TOPIC这个主题下,表示已经处理过了
Rollback: 除了不用恢复原Topic重新写入,其他一致
org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
Broker通过TransactionalMessageCheckService定时对RMQ_SYS_TRANS_HALF_TOPIC主题的半消息进行监测和回查状态,默认一分钟一执行
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check
查询已经处理的OP队列消息,判断该事务消息如果还未处理,向Producer组中的一个发送RequestCode.CHECK_TRANSACTION_STATE命令来检测本地事务状态,如果该事务消息已经处理,则更新消费进度然后忽略。(大致是这样,但是有些代码部分没理解深刻。。)
Producer端接收检测事务状态请求(CHECK_TRANSACTION_STATE)的处理器
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState