• RocketMQ事务消息机制


    事务

    事务介绍

    说到事务大家都不陌生,必须满足4个条件(ACID):原子性(Atomicity,或称不可分割性)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。RocketMQ的事务消息和数据库中的数据没有太多关系,但是本质上是基于2PC理论实现的最终一致性。

    RocketMQ事务消息使用

    RocketMQ提供了一个事务监听器接口TransactionListener,里面有两个方法:executeLocalTransaction中执行本地事务方法,执行完返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW,分别对应事务消息执行成功、回滚、未知,如果是未知状态则会触发Broker调用checkLocalTransaction进行状态回查。
    在这里插入图片描述

    事务消息原理介绍

    1、 Producer在发送消息的时候会指定该消息为半消息,Broker接受到半消息是不能被Consumer消费的。
    2、 Broker向Producer发送半消息发送成功标志。
    3、 Producer开始执行本地事务方法。
    4、 Producer执行完本地事务方法开始向Broker发送提交或回滚事务消息命令。
    5、 如果Producer发送的事务消息状态是未知,则会定时向Producer进行回查。
    6、 通过回查确定事务消息执行状态再觉得是否将消息暴露给消费者。

    在这里插入图片描述

    源码解读

    Producer发送事务消息源码

    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction 代码有删减

    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                              final LocalTransactionExecuter localTransactionExecuter, final Object arg)
            throws MQClientException {
            // ①
            TransactionListener transactionListener = getCheckListener();
    		......
            SendResult sendResult = null;
            // ②
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
     
            	// ③
                sendResult = this.send(msg);
      
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            switch (sendResult.getSendStatus()) {
                case SEND_OK: {
                    try {
                       
                        String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                        if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }
                    else if (transactionListener != null) {
                            log.debug("Used new transaction API");
                            // ④
                            localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                        }
            	// ⑤
                this.endTransaction(sendResult, localTransactionState, localException);
         
            return transactionSendResult;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    ①、从Producer拿到TransactionListener 监听器。
    ②、给事务消息打上标识,在发送到Broker时会做特殊处理。
    ③、发送消息。
    ④、执行本地事务方法。
    ⑤、结束事务方法,告诉Broker是进行提交或者回滚。

    Broker接受事务消息处理请求

    前面说过在Broker接受到事务消息的时候,会调用prepareMessage进行存储消息,中间会对消息做特殊处理。

    org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
    在这里插入图片描述
    TransactionalMessageBridge就是处理普通消息和事务消息的桥梁。首先会备份原始消息的topic和queueId,以备消息还原,然后再将消息的topic设置为半消息内置topic中,queueId固定为0,此时的topic对消费者是不可见的。
    在这里插入图片描述
    事务消息发送完成,就等待Producer提交事务消息执行状态,在Broker会分发给EndTransactionProcessor处理。如果是提交状态,则取出半消息,还原原始消息的属性,并重新投递到队列中去;如果消息是回滚,则从半消息队列中移除,投入到RMQ_SYS_TRANS_OP_HALF_TOPIC主题下。
    在这里插入图片描述

    那么如果Producer反馈的消息状态是未知的话,Broker也是提供了回查机制的,并且默认回查的次数数15次,处理线程是TransactionalMessageCheckService。
    在这里插入图片描述

    到这里事务消息的介绍就结束了,你有没有一点感触呢,是否能推断出RocketMQ是如何处理延迟消息的么?

  • 相关阅读:
    Maven项目,进行编译,使用idea的 编译功能,就是正常的,但是在终端中执行 mvn clean compile 报错
    linux内核分析:进程与调度
    国际结算习题集及答案
    用JpaTransactionManager操作数据库事务
    Spring-Java
    Java高手的30k之路|面试宝典|精通项目介绍方法&优化简历项目介绍
    [carla] GNSS传感器与Carla坐标系 转换方法
    如何借助边缘智能网关打造智慧城市便民驿站
    高德面试:为什么Map不能插入null?
    【CSS】transition、transform以及animation
  • 原文地址:https://blog.csdn.net/qq_39408435/article/details/125449708