• RocketMQ源码阅读(十五)事务消息


    目录

    简介事务消息

    Producer端

    sendMessageInTransaction

    Broker端

    SendMessageProcessor

    EndTransactionProcessor

    TransactionalMessageCheckService


    简介事务消息

    先盗个流程图来简介一下事务消息的大致流程:

    1、Producer先发送半消息,半消息暂时对消费者们不可见

    2、半消息发送成功

    3、发送半消息成功后,Producer开始执行本地事务,本地事务执行成功后返回事务状态(Commit\Rollback\Unknow)

    4、将本地事务状态返回给Broker,如果是Commit,则将半消息暴露给消费者消费,如果是Rollback,则删除该消息,如果是Unknow,则表示Broker需要调用事务回查接口来判断具体事务状态

    5、因为返回Unknow或者网络原因等Commit/Rollback指令未发送到Broker,Broker定时调用回查接口判断本地事务状态

    6、执行Producer端定义的回查接口逻辑

    7、回查接口返回事务状态,Broker根据Commit/Rollback状态进行不同处理

    下边从Producer端和Broker端来分析源码是如何实现的

    Producer端

    来看Producer端的事务消息实现,与普通消息的发送有以下几点区别

    1、TransactionMQProducer 事务消息的Producer实现类,继承DefaultMQProducer并扩展了以下两个属性

    2、TransactionListener  事务监听器,实现两个接口来执行本地事务和回查

    3、LocalTransactionState   本地事务执行状态

    example包中示例的生产者端代码

    sendMessageInTransaction

    1、发送前校验(清除延时标记、消息大小校验)

    2、设置事务消息标记并发送

    3、发送成功后调用监听器执行本地事务

    4、向Broker发送END_TRANSACTION消息返回本地事务状态

     

    Broker端

    Broker端有三部分,一是接收半消息的处理,二是Producer返回Broker本地事务状态,三是Broker进行事务状态回查,下边挨个看代码实现

    SendMessageProcessor

    SendMessageProcessor是Broker中接收RequestCode.SEND_MESSAGE消息的处理类

    asyncSendMessage

    事务消息使用TransactionalMessageService进行存储消息

     org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#asyncPrepareMessage

    org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#asyncPutHalfMessage

    消息和延迟消息一样被替换了Topic,写入到了RMQ_SYS_TRANS_HALF_TOPIC 主题下,再由定时任务进行事务状态判断,然后写入原Topic由消费者消费

     

    EndTransactionProcessor

    生产者返回本地事务状态,EndTransactionProcessor是Broker端对RequestCode.END_TRANSACTION的处理类,根据返回的事务状态是Commit还是Rollback做不同处理

    Commit: 将消息查询出来恢复原Topic等,重新写入CommitLog进行消息分发,再删除半消息,这里删除不是真的删除,是将消息写到了RMQ_SYS_TRANS_OP_HALF_TOPIC这个主题下,表示已经处理过了

    Rollback: 除了不用恢复原Topic重新写入,其他一致

    org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest

    TransactionalMessageCheckService

    Broker通过TransactionalMessageCheckService定时对RMQ_SYS_TRANS_HALF_TOPIC主题的半消息进行监测和回查状态,默认一分钟一执行

    org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check

    查询已经处理的OP队列消息,判断该事务消息如果还未处理,向Producer组中的一个发送RequestCode.CHECK_TRANSACTION_STATE命令来检测本地事务状态,如果该事务消息已经处理,则更新消费进度然后忽略。(大致是这样,但是有些代码部分没理解深刻。。

    ClientRemotingProcessor

    Producer端接收检测事务状态请求(CHECK_TRANSACTION_STATE)的处理器

     org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState

     

  • 相关阅读:
    (十七)STM32——定时器
    【pandas小技巧】--花哨的DataFrame
    彻底搞懂blob对象,实现文件下载,文件分片技术
    【ES】es查询term、match、match_phrase、mast_not、mast...
    NXP公司K60N512+PWM控制BLDC电机
    电源线虚接,导致信号线发烫
    微信关于权重条件,连续下降积分的原因有以下这些
    《web课程设计》用HTML CSS做一个简洁、漂亮的个人博客网站
    Java面试被问了几个简单的问题,却回答的不是很好
    怒冲GitHub榜首,京东T8幕后打造高并发面试手册,狂虐阿里面试官
  • 原文地址:https://blog.csdn.net/xyjy11/article/details/126333877