• 分布式事务Seata详细使用教程


    分布式事务Seata使用

    seata服务的本地化及docker部署见此教程:分布式事务Seata安装:本地部署、docker部署seata

    springboot项目配置

    微服务环境:添加依赖

    
        io.seata
        seata-spring-boot-starter
        最新版
    
    
    
        com.alibaba.nacos
        nacos-client
        1.2.0及以上版本
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    项目yml中添加seata配置

    seata:
        enabled: true
        enable-auto-data-source-proxy: false
        application-id: ${spring.application.name}
        tx-service-group: my_tx_group  
        service:
            vgroup-mapping:
                my_tx_group: default
     # 不使用注册中心和配置中心不加以下配置
        config:
            type: nacos
            nacos:
                server-addr: 127.0.0.1:8848
                group: "SEATA_GROUP"
                namespace: "seata"
                dataId: "seataServer.properties"
                username: ""
                password: ""
        registry:
            type: nacos
            nacos:
                application: seata-server
                server-addr: 127.0.0.1:8848
                group: "SEATA_GROUP"
                namespace: "seata"
                username: ""
                password: ""
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    数据库中添加回滚日志表。

    • 数据源连接的每个数据库中添加 undo_log 表

      CREATE TABLE sys_seata_undo_log (
      id bigint(20) NOT NULL AUTO_INCREMENT,
      branch_id bigint(20) NOT NULL,
      xid varchar(100) NOT NULL,
      context varchar(128) NOT NULL,
      rollback_info longblob NOT NULL,
      log_status int(11) NOT NULL,
      log_created datetime NOT NULL,
      log_modified datetime NOT NULL,
      PRIMARY KEY (id),
      UNIQUE KEY ux_undo_log (xid,branch_id)
      ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

    注意:

    • 1、nacos注册中心配置group项,Server和Client端的值需一致。

    • 2、使用多数据源需要:

    关闭seata的数据源代理 seata.enable-auto-data-source-proxy=false,使用多数据源代理

    开启多数据源对seata的支持spring.datasource.dynamic.seata=true

    • 3、默认的AT模式需要在数据库中添加回滚日志表"sys_seata_undo_log"

    • 4、已支持从一个Nacos dataId中获取所有配置信息,你只需要额外添加一个dataId配置项。此处dataId为seataServer.properties。

    注:当client配置了seata.config,client端会优先去nacos读取"seataServer.properties"中的配置参数,在yml配置的service、client等参数不再生效。

    故若在yml中修改seata server相关参数,请注释掉seata.config ,再添加配置。

    nacos详细参数配置:https://seata.io/zh-cn/docs/user/configurations.html

    • 5、seataServer.properties 中service.vgroupMapping.default_tx_group=default

    这个配置为事务组名称。其中“default_tx_group”与“default”都可以自定义,但是客户端与服务端的这个配置项必须一致,否则会报错。

    客户端配置如下面所示,其中 seata.tx-service-group的值,要与default_tx_group一致;seata.service.vgroup-mapping.default_tx_group的值,要与default一致。

    seata:
      tx-service-group: default_tx_group
      service:
        vgroup-mapping:
          default_tx_group: default
    
    • 1
    • 2
    • 3
    • 4
    • 5

    AT模式使用

    在需要发起全局事务的service方法上添加注解 @GlobalTransactional,

    使用Fiegn、restTemplate等方式发送请求,提供方只添加@Transactional保证本地事务

    Seata全局事务中涉及三个身份

    TC 事务协调者,对应seata服务

    TM事务管理者,对应base服务中发起全局事务的方法

    RM资源管理者,对应工作流服务

    注意问题:

    1. seata通过线程变量 XID 判断TC与RM是否在同一事务下,现支持使用restTemplate与Feign方式发送请求自动携带xid到被调用方,

    使用其他方式可将xid放入请求头中,key为"TX_XID",RM会在请求头中自动获取。若采用其他方式需自行保证xid的传递。

    1. 被调用方产生异常却没有回滚:

    当被调用方RM产生异常时,为了调用方TM可以正确接收到异常状态码,使Feign能抛出异常发起全局事务回滚,RM最好不要添加异常处理去拦截异常。

    若RM异常被拦截,需要TM通过Feign返回的自定义的状态码判断被调用方是否产生异常,并手动抛出异常。如下:

    ApiResponse apiResponse = feignClientService.clientB();
    if(apiResponse.getCode()!= 10000){
        throw new RuntimeException(apiResponse.getMsg());
    }
    
    • 1
    • 2
    • 3
    • 4
    1. AT模式基于数据库本地事务实现,分布式事务的参与各方,都需要在各自添加回滚日志表(undo_log)。

    可修改nacos中seataServer.properties文件client.undo.logTable=表名 自定义回滚日志表名。

    使用Demo

    服务A调用服务B,作为调用方A添加@GlobalTransaction注解

    @Service
    public class AService {
        @GlobalTransactional
        public ApiResponse insertSomething() {
            log.info("===================================");
            log.info("AService XID: "+ RootContext.getXID());
            aMapper.insertSomething();  // A对数据库进行插入操作
            ApiResponse apiResponse = feignClientService.clientB(); // 通过Feign 调用服务B
            // 若被调用方异常状态码被拦截, 判断自定义状态码手动抛出异常
            if(apiResponse.getCode()!= 10000){
                throw new RuntimeException(apiResponse.getMsg());
            }
    //        error(); // 放开此注解,抛出异常
            return ApiResponse.success();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    B服务作为被调用方,添加@Transactional保证本地事务

    @Service
    public class BService {
    
        @Transactional
        public ApiResponse insertSomething() {
            log.info("==========================================");
            log.info("XID: "+ RootContext.getXID());
            bMapper.insertSomething();
    //        error(); // 放开此注解,抛出异常。
            return ApiResponse.success();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    TCC模式使用

    实际上,Seata的AT模式基本上能满足我们使用分布式事务80%的需求,但涉及不支持事务的数据库与中间件(如redis)等的操作,或AT模式暂未支持的数据库(目前AT支持Mysql、Oracle与PostgreSQL)、跨公司服务的调用、跨语言的应用调用或有手动控制整个二阶段提交过程的需求,则需要结合TCC模式。不仅如此,TCC模式还支持与AT模式混合使用。

    一、TCC模式的概念

    一个分布式的全局事务,整体是两阶段提交Try-[Comfirm/Cancel] 的模型。在Seata中,AT模式与TCC模式事实上都是两阶段提交的具体实现。他们的区别在于:

    AT 模式基于支持本地 ACID 事务 的 关系型数据库(目前支持Mysql、Oracle与PostgreSQL):

    一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。 二阶段 commit 行为:马上成功结束,自动异步批量清理回滚日志。 二阶段 rollback 行为:通过回滚日志,自动生成补偿操作,完成数据回滚。

    相应的,TCC 模式,不依赖于底层数据资源的事务支持:

    一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。 二阶段 commit 行为:调用 自定义的 commit 逻辑。 二阶段 rollback 行为:调用 自定义的 rollback 逻辑。

    所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

    简单点概括,SEATA的TCC模式就是手工的AT模式,它允许你自定义两阶段的处理逻辑而不依赖AT模式的undo_log。

    二、前提准备

    • 注册中心 nacos
    • seata服务端(TC)

    本章着重讲TCC模式的实现,项目的搭建看前面文档

    三、TM与TCC-RM搭建

    3.1 定义TCC接口

    由于我们使用的是 SpringCloud + Feign,Feign的调用基于http,因此此处我们使@LocalTCC便可。值得注意的是,@LocalTCC一定需要注解在接口上,此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法便可,TCC相关注解如下:

    • @LocalTCC适用于SpringCloud+Feign模式下的TCC

    • @TwoPhaseBusinessAction注解prepare方法,其中name为当前tcc方法的bean名称,写方法名便可(全局唯一),commitMethod指向提交方法,rollbackMethod指向事务回滚方法。指定好三个方法之后,seata会根据全局事务的成功或失败,去帮我们自动调用提交方法或者回滚方法。

    • @BusinessActionContextParameter注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法。

    • BusinessActionContext便是指TCC事务上下文

      @LocalTCC
      public interface TccService{
      /**
      * 定义两阶段提交
      * name = 该tcc的bean名称,全局唯一
      * commitMethod = commit 为二阶段确认方法
      * rollbackMethod = rollback 为二阶段取消方法
      * BusinessActionContextParameter注解 传递参数到二阶段中
      * @param params -入参
      * @return String
      */
      @TwoPhaseBusinessAction(name = “prepare”, commitMethod = “commit”, rollbackMethod = “rollback”)
      String prepare(
      @BusinessActionContextParameter(paramName = “params”) Map params
      );

      /**
       * 确认方法、可以另命名,但要保证与commitMethod一致
       * context可以传递try方法的参数
       * @param context 上下文
       * @return boolean
       */
      @Override
      boolean commit(BusinessActionContext context);
      
      /**
       * 二阶段取消方法
       * @param context 上下文
       * @return boolean
       */
      @Override
      boolean rollback(BusinessActionContext context);
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

      }

    3.2 TCC接口的业务实现

    在方法prepare中使用@Transational可以直接通过本地事务回滚关系型数据库中的操作,而非关系型数据库等中间件的回滚操作可以交给rollbackMethod方法处理。

    使用context.getActionContext(“params”)便可以得到一阶段try中定义的参数,在二阶段对此参数进行业务回滚操作。

    注意1:此处亦不可以捕获异常(同理切面处理异常),否则TCC将识别该操作为成功,二阶段直接执行commitMethod。

    注意2:TCC模式要开发者自行保证幂等和事务防悬挂

    注意3: commit()与rollback()方法返回值必须为true,当返回值不为true或在方法中抛出异常,seata会不断回调commit或rollback方法,直到返回值为true。

    public class TccServiceImpl implements TccService {
        private static Logger log = LoggerFactory.getLogger(TccServiceImpl.class);
        @Autowired
        TccDAO tccDAO;
    
        @Override
        @Transational
        public String prepare(Map params) {
            log.info("xid = " + RootContext.getXID());
            //todo 实际的操作,或操作MQ、redis等
            tccDAO.insert(params);
            //放开以下注解抛出异常
            //throw new RuntimeException("服务tcc测试回滚");
            return "success";
        }
    
        @Override
        public boolean commit(BusinessActionContext context) {
            log.info("xid = " + context.getXid() + "提交成功");
            //todo 若一阶段资源预留,这里则要提交资源
            return true;
        }
    
        @Override
        public boolean rollback(BusinessActionContext context) {
            //todo 这里写中间件、非关系型数据库的回滚操作
            System.out.println("please manually rollback this data:" + context.getActionContext("params"));
            return true;
        }
    }
    
    
    @RestController
    @RequestMapping("/at")
    public class AtController {
        @Autowired
        AtService atService;
        @PostMapping("/at-insert")
        public String insert(@RequestBody Map params) {
            return atService.insert(params);
        }
    }
    
        @Override
        @DSTransactional
        public String insert(Map params) {
            log.info("------------------> xid = " + RootContext.getXID());
            // 调用master分支
            atDAO.insert(params);
            atDAO.insert(params);
            atDAO.insert(params);
            atDAO.insert(params);
            // 调用slave分支
            params.put("name", "B服务测试");
            studentMapper.insert(params);
            studentMapper.insert(params);
            studentMapper.insert(params);
            studentMapper.insert(params);
            return "success";
        }
    
    @Mapper
    @DS("master")
    public interface AtDAO {
        @Insert("insert into service_at (name) value (#{name})")
        int insert(Map map);
    }
    
    @Mapper
    @DS("slave")
    public interface StudentMapper {
        @Insert("insert into student (name) value (#{name})")
        int insert(Map map);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    爬虫过程和反爬
    代码随想录算法训练营第五九天 | 下一个更大元素II、接雨水
    开关电源模块 遥控开/关电路
    SAM论文翻译
    T1175计算两个日期之间的天数
    莹莹API管理系统源码附带两套模板
    计算机是如何工作的(上篇)
    同源策略和跨域问题
    Java面试题:mysql执行速度慢的原因和优化
    分组后取最大值对应记录
  • 原文地址:https://blog.csdn.net/m0_54853503/article/details/126080974