• SpringBoot集成RocketMQ实现分布式事务


    基本概念

    RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致 

    执行流程

     

    (1) 发送方向 MQ 服务端发送消息。
    (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
    (3) 发送方开始执行本地事务逻辑。
    (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
    (5) 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。
    (6) 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    (7) 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作

    项目实例

    pom文件内容

    
    
        4.0.0
        
            org.springframework.boot
            spring-boot-starter-parent
            2.7.3
             
        
        com.example
        rocketmq
        0.0.1-SNAPSHOT
        rocketmq
        Demo project for Spring Boot
        
            1.8
        
        
            
                org.springframework.boot
                spring-boot-starter-web
            
    
            
                org.springframework.boot
                spring-boot-starter-test
                test
            
    
            
                org.apache.rocketmq
                rocketmq-spring-boot-starter
                2.2.0
            
    
            
                cn.hutool
                hutool-all
                5.5.8
            
        
    
        
            
                
                    org.springframework.boot
                    spring-boot-maven-plugin
                
            
        
    
    
    

    application.yml

    server:
      port: 8088
    
    rocketmq:
      name-server: 127.0.0.1:9876
      access-channel: LOCAL
      producer:
        group: deer_message_push
        send-message-timeout: 3000
        compress-message-body-threshold: 4096
        max-message-size: 2048000
        retry-times-when-send-failed: 2
        retry-next-server: true

    消息生产者工具类

    package com.example;
    import cn.hutool.core.util.IdUtil;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.client.producer.TransactionSendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import java.io.Serializable;
    
    /**
     * @Author 何志鹏
     * @Date 2022/8/30 11:44
     * @Version 1.0
     */
    @Component
    public class RocketmqProducer {
    
        private final Logger logger = LoggerFactory.getLogger(RocketmqProducer.class);
    
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
    
        /**
         * MQ半消息
         *
         * @param topic target topic
         * @param tag   topic's tag
         * @param msg   message
         * @return send status
         */
        public  SendStatus txSend(T msg, String topic, String tag) {
            String destination = String.format("%s:%s", topic, tag);
    
            Message message = MessageBuilder.withPayload(msg)
                    .setHeader("KEYS", IdUtil.simpleUUID())
                    .setHeader("DESTINATION", destination)
                    .build();
    
            TransactionSendResult result =
                    rocketMQTemplate.sendMessageInTransaction(destination, message, msg);
    
            //发送状态
            String sendStatus = result.getSendStatus().name();
            // 本地事务执行状态
            String localTxState = result.getLocalTransactionState().name();
            logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
            return result.getSendStatus();
        }
    
        /**
         * 同步消息
         *
         * @param msg
         * @param topic
         * @param tag
         * @param 
         * @return
         */
        public  SendStatus syncSend(T msg, String topic, String tag) {
            String destination = String.format("%s:%s", topic, tag);
    
            Message message = MessageBuilder.withPayload(msg)
                    .setHeader("KEYS", IdUtil.simpleUUID())
                    .setHeader("DESTINATION", destination)
                    .build();
            return rocketMQTemplate.syncSend(destination, message).getSendStatus();
        }
    
        /**
         * 异步消息
         *
         * @param msg
         * @param topic
         * @param tag
         * @param 
         * @return
         */
        public  void asyncSend(T msg, String topic, String tag) {
            String destination = String.format("%s:%s", topic, tag);
    
            Message message = MessageBuilder.withPayload(msg)
                    .setHeader("KEYS", IdUtil.simpleUUID())
                    .setHeader("DESTINATION", destination)
                    .build();
            rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("============================发送成功=================================");
                }
    
                @Override
                public void onException(Throwable e) {
                    System.out.println("============================发送失败================================="+e.getMessage());
                }
            });
        }
    }

    创建user实体类

    package com.example;
    
    import java.io.Serializable;
    
    /**
     * @Author 何志鹏
     * @Date 2022/8/30 17:05
     * @Version 1.0
     */
    public class User implements Serializable{
    
        private static final long serialVersionUID = 4247558661107952933L;
    
        private Integer id;
        private String name;
        private int age;
    
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public int getAge() {
            return age;
        }
    
        public void setAge(int age) {
            this.age = age;
        }
    }

    生产者测试类

    package com.example;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RocketmqApplicationTests {
    
    
        @Autowired
        private RocketmqProducer rocketmqProducer;
    
    
        @Test
        void contextLoads() {
    
            User user = new User();
            user.setId(1);
            user.setName("何志鹏555555555555555555555");
            user.setAge(18);
            rocketmqProducer.txSend(user, "topic1", "11111");
            System.out.println("==========================开始发送消息=========================================");
        }
    
    }
    

    消费者

    package com.example;
    
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author 何志鹏
     * @Date 2022/8/30 14:16
     * @Version 1.0
     */
    @Component
    @RocketMQMessageListener(consumerGroup = "deer_message_push", topic = "topic1")
    public class RocketMQConsumer  implements RocketMQListener {
    
    
        @Override
        public void onMessage(Object message) {
            System.out.println("收到的消息为 ================================================: " + message);
        }
    }
    

    自定义一个RocketmqTransactionListener注解  这边为了方便定义哪些消息需要实现事务  如下:

    package com.example;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RocketmqTransactionListener {
        String topic();
    
        String selectorExpression();
    }

    事务监听实现

    package com.example;
    
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.context.ApplicationContext;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    @Component
    @RocketMQTransactionListener(corePoolSize = 2, maximumPoolSize = 10)
    public class RocketmqTransactionListenerAdapter implements RocketMQLocalTransactionListener, InitializingBean {
        private final Logger logger = LoggerFactory.getLogger(RocketmqTransactionListenerAdapter.class);
        private final ApplicationContext applicationContext;
        private final ConcurrentHashMap listeners;
    
        public RocketmqTransactionListenerAdapter(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
            this.listeners = new ConcurrentHashMap<>();
        }
    
        @Override
        public void afterPropertiesSet() {
            applicationContext.getBeansWithAnnotation(RocketmqTransactionListener.class)
                    .values()
                    .stream()
                    .filter(m -> RocketMQLocalTransactionListener.class.isAssignableFrom(m.getClass()))
                    .collect(Collectors.toList())
                    .forEach(listener -> {
                        RocketmqTransactionListener annotation =
                                listener.getClass().getAnnotation(RocketmqTransactionListener.class);
    
                        Stream.of(annotation.selectorExpression()
                                .split("\\|\\|"))
                                .forEach(tag -> {
                                    String destination = String.format("%s:%s", annotation.topic(), tag);
                                    listeners.put(destination, (RocketMQLocalTransactionListener) listener);
                                });
                    });
        }
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String destination = getDestination(msg);
            if (!listeners.containsKey(destination)) {
                logger.info("----------------------------------------------------");
                logger.warn("{} transaction message is not supported", destination);
                return RocketMQLocalTransactionState.ROLLBACK;
            } else {
                RocketMQLocalTransactionListener listener = listeners.get(destination);
                return listener.executeLocalTransaction(msg, arg);
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String destination = getDestination(msg);
            if (!listeners.containsKey(destination)) {
                logger.info("----------------------------------------------------");
                logger.warn("{} transaction message is not supported", destination);
                return RocketMQLocalTransactionState.ROLLBACK;
            } else {
                RocketMQLocalTransactionListener listener = listeners.get(destination);
                return listener.checkLocalTransaction(msg);
            }
        }
    
        private String getDestination(Message msg) {
            String topic = msg.getHeaders().get("rocketmq_TOPIC", String.class);
            String tags = msg.getHeaders().get("rocketmq_TAGS", String.class);
            return String.format("%s:%s", topic, tags);
        }
    
    
    }

    说明:
    定义本地事务处理类,实现RocketMQLocalTransactionListener接口,以及加上@RocketMQTransactionListener注解,这个类似方法的调用是异步的;
    executeLocalTransaction方法:当我们处理完业务后,可以根据业务处理情况,返回事务执行状态,有rollback, commit or unknown三种,分别是回滚事务,提交事务和未知;根据事务消息执行流程,如果返回rollback,则直接丢弃消息;如果是返回commit,则消费消息;如果是unknow,则继续等待,然后调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;
    checkLocalTransaction方法:是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和上面的方法一样;
     

    自定义生产者事务监听类

    package com.example;
    
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Service;
    
    /**
     * @Author 何志鹏
     * @Date 2022/8/30 16:56
     * @Version 1.0
     */
    @Service
    @RocketmqTransactionListener(topic = "topic1", selectorExpression = "11111")
    public class ProducerListener implements RocketMQLocalTransactionListener {
    
    
    
    
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                //todo 数据库相关逻辑  报错事务回滚 否则事务提交
                int count = 1;
                System.err.println("==================================开始执行事务===============================");
                return count > 0 ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
            } catch (Exception e) {
                System.err.println("=====================================事务执行失败============================================");
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            try {
                //todo 数据库相关逻辑  报错事务回滚 否则事务提交
                boolean flag = true;
                return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
            } catch (Exception e) {
                System.err.println("=====================================事务执行失败222222============================================");
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    }
    

    启动测试类RocketmqApplicationTests测试

    当执行程序没有异常的情况下  可以看出,执行成功后,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为COMMIT_MESSAGE

    回滚测试

    仿造异常情况下的程序  如改动以下程序即可

    重启项目进行测试  打印日志如下:

     从执行的结果可以看出,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为:ROLLBACK_MESSAGE,直接丢弃消息,所以消费端无法消费此消息  本地事务的状态分别为提交状态、回滚状态、未知状态  当生产者回调操作执行的结果为本地事务状态。其会发送给TC,而TC会在发送给TM,TM会根据TC发送过来的本地事务状态来决定全局事务确认指令 状态如下:

    TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
    TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
    TransactionStatus.Unknown: 未知状态,它代表需要检查消息队列来确定状态

    最后附上项目源码

    rocketmq: 测试rocketmq事务实现

     

     

     

  • 相关阅读:
    点成案例丨温度梯度培养箱在探究温度对植物发芽影响中的应用
    第四次数据库作业
    【PyTorch深度学习项目实战100例】—— 基于MFCC对GTZAN音乐流派分类 | 第66例
    G1回收器的适用场景
    LFS学习系列2 — 总览
    fastapi项目结构以及多进程部署
    接口自动化测试是个啥?如何开始?什么是框架?带你揭开神秘面纱
    logback日志级别动态切换的终极方案(Java ASM使用)
    前端笔记(11) Vue3 Router 编程式导航 router.push router.replace
    基于ARM+FPGA的ISA总线/MMи总线接口转换设计在轨道交通的应用
  • 原文地址:https://blog.csdn.net/weixin_39643007/article/details/126620127