• springcloudalibaba架构(29): Seata分布式事务(AT模式)


    前言

    分布式解决方案seata。

    环境说明和准备

    1. 需要启动nacos
      版本信息:
      springboot版本2.3.2.RELEASE
      springcloud版本Hoxton.SR8
      springcloud-alibaba版本2.2.5.RELEASE
      seata-server版本: 0.9.0

    本章代码已分享至Gitee: https://gitee.com/lengcz/springcloudalibaba01.git

    第一节 什么是seata

    Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造了一站式的分布式事务解决方案。

    1. 全局事务

    seata的设计目标是对业务的无侵入,因此对业务无侵入的2PC方案,在传统2PC 的基础上演进。它把所有的事务理解为一个包含了若干分支事务的全局事务。全局事务的职责是协调其管辖的分支事务达成一致,要么同时成功,要么一起回滚。此外分支事务本身就是一个关系型数据库的本地事务。
    在这里插入图片描述

    2. Seata 主要由三个重要组件组成

    • TC(Transaction Coordinator):事务协调器,管理器全局的分支事务的状态,用于全局性事务的提交和回滚。
    • TM(Transaction Manager):事务管理器,用于开启全局、提交或者回滚全局事务。
    • RM(Resource Manager):资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。

    在这里插入图片描述

    Seata的执行流程

    1. A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
    2. A服务的RM向TC注册分支事务,并将其纳入XID对一个全局事务的管辖
    3. A服务执行分支事务,向数据库做操作
    4. A服务开始远程调用B服务,此时XID会将微服务的调用链上传播
    5. B服务的RM向TC注册分支事务,并将其纳入到XID对应的全局事务的管辖
    6. B服务执行分支事务,向数据库做操作
    7. 全局事务调用链处理完成,TM根据有无异常向TC发起全局事务的提交或者回滚
    8. TC协调其管辖的所有分支事务,决定是否回滚

    3. Seata的四种模式

    3.1 AT模式

    Seata的一大特色是AT对业务代码完全无入侵,使用非常简单,改造成本低。用户只需要关注自己的业务SQL ,Seata会通过分析用户的业务SQL反向生成回滚数据。AT模式分为两阶段,如下图

    • 一阶段,所有参与事务的分支,本地事务commit业务数据并写入回滚日志(Undo Log)
    • 二节点,事务协调者根据所有分支的情况,决定本次全局事务是commit还是Rollback。

    在这里插入图片描述

    Seata实现2PC和传统2PC的差别。

    1. 架构层面,传统2PC方案的RM实际上是在数据库层,RM本质上就是数据库自身,通过XA协议实现,而Seatac的RM是以jar包的形式作为中间件部署在应用程序这一侧的。
    2. l两阶段提交方面,传统2PC无论是第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到 二阶段完成才释放。而Seata的做法是在一阶段就将本地事务提交,这样就可以省去二阶段持锁的时间,整体提高效率。

    3.2 XA模式

    XA模式是Seata另一种无侵入的分布式事务解决方案,它在seata定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对XA协议的支持,以XA协议的机制来管理分支事务,XA要求数据库本身对规范和协议的支持。
    从编程模式上,XA模式和AT模式保持完全一致,只需要修改数据源代理即,即可实现XA模式与AT模式之间的切换,代码如下。

        @Bean("dataSource")
        public DataSource dataSource(DruidDataSource druidDataSource){
    //        return new DataSourceProxy(druidDataSource);//AT模式
            return new DataSourceProxyXA(druidDataSource);//XA模式
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.3 TCC模式

    Seata中的TCC模式同样包含了三个阶段

    • Try阶段: 所有参与分布式事务的分支,对业务资源进行检查和预留
    • Confirm: 所有分支的Try全部成功后,执行业务的提交。
    • Cancel节点: 取消Try阶段预留的业务资源。
      在这里插入图片描述
      与AT和XA模式相比,TCC模式需要用户自己抽象并实现Try、Confirm和Cancel三个接口,编码量会比较大一些。但是由于事务的每一个阶段都由开发人员自己实现,而且相比于AT模式来说,减少了SQL解析的过程,也就没有全局锁的限制,所以TCC模式的性能优于AT、XA模式,但是会带来工作量的增加。

    3.4 SAGA模式

    Saga是长事务解决方案,每个参与者都需要实现事务的正向操作和补偿操作。当参与者的正向操作执行失败时,在回滚本地事务的同时会调用上一阶段的补偿操作,在业务失败时最终会使事务回到初始状态。
    Saga与TCC类似,同样没有全局锁。由于缺少锁定资源这一步,在某些适合的场景,Saga要比TCC实现起来更简单。
    Saga模式是长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统,Saga模式一阶段就会提交本地事务且无锁,在长流程情况下可以保证性能,多用于渠道层、集成层业务系统、事务参与者可能是其它公司的服务或者是遗留系统的服务,无法进行改造或者提供TCC要求的接口,也可以使用Saga模式。

    在这里插入图片描述

    第二节 seata案例(下单&扣减库存)

    用户下单,创建订单,并调用远程服务扣减库存

    1. 在product模块添加扣减库存的代码
    public interface ProductService {
    
        Product findById(Integer pid);
    
        /**
         * 扣除库存
         * @param pid
         * @param num
         */
        void reduceInventory(Integer pid, Integer num);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    @Service
    public class ProductServiceImpl implements ProductService {
        @Autowired
        private ProductDao productDao;
    
        @Override
        public Product findById(Integer pid) {
            return productDao.findById(pid).get();
        }
    
        @Transactional  //事务注解
        public void reduceInventory(Integer pid, Integer num) {
            Product product = productDao.findById(pid).get();
            product.setStock(product.getStock()-num);
            productDao.save(product);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
        @RequestMapping("/product/reduceInventory")
        public void reduceInventory(Integer pid,Integer num) {
            log.info("扣库存:pid" + pid+",num:"+num);
            productService.reduceInventory( pid, num);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 在order模块里创建下单的代码
    /**
     * 订单service
     */
    public interface OrderService2 {
    
        public Order createOrder(Integer pid,Integer num);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    package com.lcz.service.impl;
    
    import com.alibaba.fastjson.JSON;
    import com.lcz.dao.OrderDao;
    import com.lcz.pojo.Order;
    import com.lcz.pojo.Product;
    import com.lcz.service.OrderService2;
    import com.lcz.service.ProductService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service
    @Slf4j
    public class OrderService2Impl implements OrderService2 {
    
        @Autowired
        private OrderDao orderDao;
    
        @Autowired
        private ProductService productService;
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @Override
        public Order createOrder(Integer pid, Integer num) {
            log.info("----------------调用商品服务,调用商品微服务查询此商品------------------------");
    
            /**
             * Ribbon负载均衡
             */
            Product product = productService.findById(pid);
            log.info("查询到的商品内容:" + JSON.toJSONString(product));
    
            Order order = new Order();
            order.setUid(1);
            order.setUsername("测试");
            order.setPid(pid);
            order.setPrice(product.getPrice());
            order.setPname(product.getPname());
            order.setNumber(num);
    
            orderDao.save(order);//本地事务
            log.info("用户下单成功,订单信息为:" + JSON.toJSON(order));
    
            log.info("扣减库存");
            //扣减库存
            productService.reduceInventory(pid,num);//远程事务
    
            //发送订单到消息队列
            rocketMQTemplate.convertAndSend("order-topic",order);
            return order;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    package com.lcz.service.listener;
    
    import com.lcz.pojo.Order;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 监听器
     */
    @Component
    @RocketMQMessageListener(
            topic = "order-topic",//消费主题
            consumerGroup = "group_rocketmq" //消费者分组
    )
    @Slf4j
    public class OrderRocketMessageListener implements RocketMQListener<Order> {
    
        @Override
        public void onMessage(Order order) {
            System.out.println("-----监听到订单-----");
           log.info("监听到用户下单成功,向用户发送待支付通知"+ order);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    需要注释掉服务容错的代码

    /**
     * value用于指定调用的nacos下哪个微服务
    * fallback用于指定当前feign接口容错类 */
    //@FeignClient(value="server-product",fallback = ProductServiceFallback.class) @FeignClient(value="server-product" // ,fallbackFactory = ProductServiceFallbackFactory.class ) public interface ProductService { @RequestMapping("/product/{pid}") Product findById(@PathVariable Integer pid); /** * 扣库存的远程调用 * @param pid 商品id * @param num 扣除数量 */ @RequestMapping("/product/reduceInventory") void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("num") Integer num); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 在order新建一个下单的controller
    /**
     * seata处理事务
     */
    @RestController
    @Slf4j
    public class OrderController3 {
    
    
        @Autowired
        private OrderService2 orderService2;
    
        @GetMapping("/order3/prod/{pid}")
        public Order order(@PathVariable Integer pid) {
            //这里直接写死了下单两个
            return orderService2.createOrder(pid,2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. 启动product和order微服务,进行下单。http://127.0.0.1:8091/order3/prod/3
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述

    到这里,我们完成了下单和扣减库存的业务代码。

    第三节 异常模拟

    很显然order和product是两个微服务内,商品id为3时下单成功,但是扣减库存可能会失败(抛异常)。

      @Transactional  //事务注解
        @Override
        public void reduceInventory(Integer pid, Integer num) {
            Product product = productDao.findById(pid).get();
            product.setStock(product.getStock()-num);
            if(pid==3){
                int i=1/0;//模拟异常
            }
            productDao.save(product);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    我们调用下单,虽然下单成功,但是扣减库存失败了,这就是电信的分布式事务问题。我们需要保证事务的一致性,不可分割,要么同时成功,要么同时失败。

    清空订单表,并将shop_product库存设置为500,此时发送下单请求,显示请求失败了,但是订单表里已经下单,但是库存没有减少。
    在这里插入图片描述

    发送下单请求显示失败,数据库有此订单,但是库存未减少。
    在这里插入图片描述
    在这里插入图片描述

    通过上面的例子,我们复现了发生故障时,分布式事务的不一致问题。

    第四节 seata的使用

    前面我们构造了一个分布式事务的典型问题,如何解决分布式事务问题呢?下面我们引入seata,使用seata解决分布式事务问题。

    1. 下载seata服务端(TC事务协调器)

    Seata Server是Seata中的事务协调器,从官网下载Seata-Server解压安装。
    下载地址: https://github.com/seata/seata/releases

    2. 配置和启动seata-server

    1. 下载seata-server

    2. 修改配置conf下的registry.conf,注册和配置均使用nacos

    registry {
      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
      type = "nacos"
    
      nacos {
        serverAddr = "localhost"
        namespace = "public"
        cluster = "default"
      }
    }
    
    config {
      # file、nacos 、apollo、zk、consul、etcd3
      type = "nacos"
    
      nacos {
        serverAddr = "localhost"
        namespace = "public"
    	cluster = "default"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 初始化nacos配置(将配置文件导入到nacos注册中心),导入成功,可以在nacos下查看
    #nacos-config.sh在conf目录下,直接执行命令即可
    nacos-config.sh 127.0.0.1
    
    • 1
    • 2

    如果你需要指定其它参数,请参考eg

    eg: sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u username -w password
    
    • 1

    在这里插入图片描述

    1. 启动进入bin目录下,启动seata-server
    seata-server.bat -p 8091 -m file
    
    • 1

    在这里插入图片描述
    5. 到nacos后台查看,如果看到serverAddr即可启动成功。(seata-server各版本的服务名称有些不同,也有叫seata-server)
    在这里插入图片描述

    3. 创建undo_log表

    注意:需要在使用到seata全局事务的所有微服务对应的数据库创建undo_log表。
    以下是mysql的建表语句,其它类型数据库请见https://github.com/seata/seata/tree/develop/script/client/at/db

    CREATE TABLE `undo_log`
    (
    `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
    `branch_id` BIGINT(20) NOT NULL,
    `xid` VARCHAR(100) NOT NULL,
    `context` VARCHAR(128) NOT NULL,
    `rollback_info` LONGBLOB NOT NULL,
    `log_status` INT(11) NOT NULL,
    `log_created` DATETIME NOT NULL,
    `log_modified` DATETIME NOT NULL,
    `ext` VARCHAR(100) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
    ) ENGINE = INNODB
    AUTO_INCREMENT = 1
    DEFAULT CHARSET = utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    4. 在微服务中配置seata

    此处的下单模型中,order微服务调用product微服务,order创建订单,product扣减库存,这两个行为属于子事务流程,所以都需要配置seata。我们要如何操作呢?
    注意: 下面的操作需要在product和order两个模块都需要配置,因为它们两个模块都是全局事务的一部分。

    在这里插入图片描述

    1. 导入seata依赖
     
            <dependency>
                <groupId>com.alibaba.cloudgroupId>
                <artifactId>spring-cloud-starter-alibaba-nacos-configartifactId>
            dependency>
            
            <dependency>
                <groupId>com.alibaba.cloudgroupId>
                <artifactId>spring-cloud-starter-alibaba-seataartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 添加代理数据源

    Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy 的
    Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务

    package com.lcz.config;
    
    import com.alibaba.druid.pool.DruidDataSource;
    import io.seata.rm.datasource.DataSourceProxy;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    
    
    /**
     * seata配置代理数据源
     */
    @Configuration
    public class DataSourceProxyConfig {
    
        @Bean
        @ConfigurationProperties(prefix = "spring.datasource")
        public DruidDataSource druidDataSource(){
            DruidDataSource druidDataSource=new DruidDataSource();
            return druidDataSource;
        }
    
        @Bean
        @Primary
        public DataSourceProxy dataSource(DruidDataSource druidDataSource){
            return new DataSourceProxy(druidDataSource);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    1. 在resources目录下添加registry.conf文件(其实就是把seata-server的conf目录下的registry.conf粘贴过来即可)
    registry {
      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
      type = "nacos"
    
      nacos {
        serverAddr = "localhost"
        namespace = "public"
        cluster = "default"
      }
    }
    
    config {
      # file、nacos 、apollo、zk、consul、etcd3
      type = "nacos"
    
      nacos {
        serverAddr = "localhost"
        namespace = "public"
    	cluster = "default"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 配置bootstrap.yaml,不同微服务,name名称不一样,这里name为server-product是product微服务的,order工程的改一下名字即可
    spring:
      application:
        name: server-product
      cloud:
        nacos:
          config:
            server-addr: 127.0.0.1:8848
            namespace: public #命名空间
            group: SEATA_GROUP
        alibaba:
          seata:
            tx-service-group: my_test_tx_group
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    在这里插入图片描述

    5. 开启全局事务

    只需要在入口方法添加@GlobalTransactional

     @GlobalTransactional //开启全局事务控制
     public Order createOrder(Integer pid, Integer num) {}
    
    • 1
    • 2

    在这里插入图片描述

    6. 测试

    我们前面的模拟异常,商品id为3时,会抛出异常。我们先用对id为2商品下单,可以下单成功,扣除两个,并且有一笔id为106的订单。
    在这里插入图片描述
    此时我们对id为3的商品下单http://localhost:8092/order3/prod/3,发现报错,数据库中的库存没有减少,也没有生成这笔订单。
    在这里插入图片描述
    在这里插入图片描述

    此时,我们再次对id为2的商品下单,发起一笔请求http://localhost:8092/order3/prod/2,可以发现库存减少,生成了id为108的订单。
    在这里插入图片描述

    订单表的id为什么不连续
    这个问题很明显说明了前面id为3的商品虽然最终下单失败(扣减库存异常),但是实际已经生成了id为107的订单,在事务的最终被seata反向删除了。

    流程重点解析
    1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连
    接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务
    操作就一定有undo_log。
    2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成
    就已经将分支事务提交,也就释放了锁资源。
    3、 TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支
    事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
    4、第二阶段全局事务提交, TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事
    务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
    5、第二阶段全局事务回滚, TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应
    的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失
    败则会重试回滚操作。

    第五节 扩展内容

    1. 自定义配置事务分组

    这里我直接使用的seata默认的事务分组,也可以自己创建事务分组。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    js-聊聊低代码平台
    RabbitMQ(任务模型,交换机(广播,订阅,通配符订阅))
    java--ArrayList快速入门
    Spark SQL 的总体工作流程
    vue 在什么情况下在数据发生改变的时候不会触发视图更新
    Mpeg-NTA((Nitrilotriacetic acid)) 次氮基三乙酸
    Java 简单实现令牌桶
    李沐63_束搜索——自学笔记
    数据库笔试面试题
    多线程指南:探究多线程在Node.js中的广泛应用
  • 原文地址:https://blog.csdn.net/u011628753/article/details/126375512