• 分布式事务(五)———可靠消息队列解决方案


    该系列参考:
    https://www.jianshu.com/p/962271bbf4ea
    https://blog.csdn.net/a745233700/article/details/122402303
    https://cloud.tencent.com/developer/article/2048776
    https://icyfenix.cn/

    本地消息事务、RocketMq、最大努力通知都是基于消息队列来实现的解决方案,所以我就放一起来介绍了

    一、本地消息事务表

    1.核心思想

    将分布式事务拆分成本地事务进行处理,通过在分布式事务主动发起方额外新建事务消息表,事务发起方把业务处理和记录事务消息在本地事务中完成,轮询事务消息表发送事务消息,事务被动方基于消息中间件消费事务消息表中的事务消息

    在这里插入图片描述

    2.流程步骤

    1. 事务主动方在同一个本地事务中处理业务和写消息表操作 (消息表状态为待处理)
    2. 事务主动方通过消息中间件通知事务被动方处理事务(此过程需要加定时任务,扫描消息表内状态为待处理的消息,重新发送消息)
    3. 事务被动方收到消息,处理业务后,发送事务已处理的消息给事务主动方
    4. 事务主动方收到消息后,将该条事务消息状态更新为已处理

    过程分析

    1. 假设步骤1 异常或宕机:消息跟业务处理的数据一起回滚了,啥事没发生
    2. 假设步骤2 消息发送失败或者消息丢失:有定时任务,定时扫描消息表重复发送消息(保证消息一定发送成功)
    3. 假设步骤3业务处理失败或者消息发送失败:业务数据会回滚(必要时需要人工介入),反馈消息丢失,会有步骤2的定时任务重复唤起
    4. 假设步骤4更新失败:数据会回滚,依旧依赖定时任务的重复执行 直至成功

    注意事项

    1. 消息一定要带有唯一性事务ID
    2. 两边的消息处理一定要具备幂等性
    3. 一个事务涉及多个服务,就要有多个状态或者多条不同业务类型的消息
    4. 整个过程过于依赖主动方的定时任务,可视情况优化
    5. 上述过程保证了事务一定成功,如果某些场景允许回滚,步骤3处理失败也可以发起回滚消息,主动方收到消息回滚处理,并更新事务状态为已回滚即可,即无论是正向成功还是反向回滚,保持最终一致性即可 (该方案不太建议回滚处理,毕竟核心思想就是保证第一步业务成功,后续一定成功! 但是万事看业务场景嘛)

    3.优缺点

    优点: 简单实现,没有过度依赖中间件

    缺点:

    • 与具体的业务场景绑定,耦合性强,不可公用
    • 消息数据与业务数据同库,占用业务系统资源
    • 业务系统在使用关系型数据库的情况下,消息服务性能会受到关系型数据库并发性能的局限
    • 每个事务入口的业务系统都需要有消息表

    特点: 柔性事务,数据最终一致性,最大努力交付思想

    二、RokcetMQ

    基于MQ的分布式事务方案本质上是对本地消息表的封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库中

    在这里插入图片描述

    一.流程步骤

    1. 服务主动方发送半消息给Broker,Broker返回ACK
    2. 服务主动方收到Broker返回的ACK后提交本地事务
    3. 根据本地事务的结果向Broker发送commit消息或者回滚消息
    4. Broker收到commit消息则会向消费端投递消息,如果是回滚消息则会清掉该事务消息,无事发生

    过程分析:

    1. 步骤1失败:无事发生
    2. 步骤2失败:事务回滚,并向broker发送rollback消息,broker清掉消息,无事发生
    3. 步骤3消息发送失败或消息丢失(broker没收到消息):borker会回调checkLocalTransaction检查事务的结果(也有检测次数阈值),如果事务已经成功则Commit消息,如果失败则rollback消息,无事发生 (上图的5.6.7步骤)
    4. 步骤4投递消息失败或者消费消息失败:会重试,RokcetMq的重试机制,达到默认次数则进死信队列,人工处理

    二、优缺点及注意事项

    优点: 业务系统和消息系统解耦,无需在新建消息表以及额外的定时处理,性能比消息表高

    缺点: 额外的网络请求开销,要有额外的事务检查接口

    注意事项: 要保证幂等性

    三、示例

    1.RokcetMq事务消息状态

    1. TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息。
    2. TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费。
    3. TransactionStatus.Unknown:中间状态,表示需要MQ回查才能确定状态。

    2.发送事务消息

    使用TransactionMQProducer类创建生产者客户端,并指定唯一的producerGroup,可以设置自定义线程池来处理检查请求。本地事务执行后,需要根据执行结果回复MQ

    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            TransactionListener transactionListener = new TransactionListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
    
            producer.setExecutorService(executorService);
            producer.setTransactionListener(transactionListener);
            producer.start();
    
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            for (int i = 0; i < 100000; i++) {
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3.实现TransactionListener接口

    executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三个事务状态之一。
    checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三个事务状态之一

    public class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            //这里做事务处理,根据结果返回消息状态 见RokcetMq事务消息状态
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            //这里是broker回调查询事务状态,根据事务状态返回对应的消息状态 见RokcetMq事务消息状态 
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    三、最大努力通知

    最大努力通知也称为定期校对。它在事务主动方增加了消息校对的接口,如果事务被动方没有接收到主动方发送的消息,此时可以调用事务主动方提供的消息校对的接口主动获取

    在这里插入图片描述

    在可靠消息事务中,事务主动方需要将消息发送出去,并且让接收方成功接收消息,这种可靠性发送是由事务主动方保证的;但是最大努力通知,事务主动方仅仅是尽最大努力(重试,轮询…)将事务发送给事务接收方,所以存在事务被动方接收不到消息的情况,此时需要事务被动方主动调用事务主动方的消息校对接口查询业务消息并消费,这种通知的可靠性是由事务被动方保证的

    特点由被动方主动调用接口来校对业务情况是最核心的思想

    系列文章

  • 相关阅读:
    项目一:使用 Spring + SpringMVC + Mybatis + lombok 实现网络五子棋
    低代码是开发的未来吗?浅谈低代码平台
    深入了解Jedis:Java操作Redis的常见类型数据存储
    Unity扩展UGUI组件多态按钮MultimodeButton
    Vue电商项目--登录与注册
    虚析构+纯虚函数+抽象类+友元
    掌握Flink键控状态(Keyed State):深入指南与实践
    基于智能分析的恶意软件检测研究进展和挑战
    贪吃蛇QT c++设计
    Java 基础常见知识点&面试题总结(上),2022 最新版!| JavaGuide
  • 原文地址:https://blog.csdn.net/weixin_44102992/article/details/126687205