• SpringCloudAlibaba-3.分布式事务(Seata)


    目录

    一、概念

    1.1 问题引入

    1.2 Seata介绍 

    1.3 AT模式

     二、Seata的环境配置

    三、演示

    3.1 建数据库表

    3.2 订单模块

    3.3 库存模块

    3.4 账户模块

    3.5 测试


    一、概念

    1.1 问题引入

            在分布式环境中,单体应用拆分微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源,业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局数据一致性问题没法保证。        

             一句话来说:一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题

    1.2 Seata介绍 

            Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

            Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM RM 是作为 Seata 的客户端业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

            分布式事务处理过程的一ID+三组件模型

                 Transaction ID XID:全局唯一的事务ID

            Transaction Coordinator (TC)事务协调器,维护全局事务运行状态,负责协调驱动全局事务提交回滚

            Transaction Manager (TM)控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交全局回滚的决议;

            Resource Manager (RM)控制分支事务,负责分支注册状态汇报,并接收事务协调器指令,驱动分支(本地)事务提交回滚

            分布式事务执行的过程

    1.TM 向 TC 注册全局事务,全局事务创建成功并生成一个全局唯一的 XID
    2.XID 在微服务调用链路的上下文中传播
    3.RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务管辖(根据业务要求编排数据库、服务等事务内资源)
    4.TM 向 TC 通知针对 XID 的全局提交/回滚决议;(一阶段结束

    5.TC汇总事务信息,决定分布式事务提交还是回滚
    6.TC 通知 XID 下管辖的全部分支事务(RM)完成提交/回滚请求。(二阶段结束

     通俗解释TM、TC、RM的关系(班主任打算开线上班会):

                    班长(TM)、班主任(TC)、学生(RM)

            1.班长:”王主任,马上到上网课的时间了,你把房间号发一下吧!”

            2.班主任:”好的,小王。房间号是8848,我发在班级群里了!”

            3.同学们看到了群消息,各自打开XX会议,进入了8848房间... ...

            4.班长:”王主任,同学们基本都到齐了,我看您可以开始讲了!”(一阶段结束

            5.班主任突然接到了一通神秘的电话... ....

            6.稍等片刻后,班主任:”同学们,我现在通知一个事情(同学们没白等O(∩_∩)O)/不好意思浪费各位的时间,通知临时取消,散了吧(同学们白等了这么长时间 ┭┮﹏┭┮)......“(二阶段结束) 

    1.3 AT模式

            AT模式是seata提供的默认模式,是一种对业务无任何侵入的分布式事务解决方案

            在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交回滚操作。 

            AT模式如何做到无侵入?

            在一阶段,Seata 会拦截“业务 SQL

    1  解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,
    2  执行“业务 SQL”更新业务数据

    3.在业务数据更新之后,其保存成“after image”,最后生成行锁
            以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性

            (before imageafter image都在保存在 undolog 中) 

            二阶段如是顺利提交的话(业务顺利执行),因为“业务 SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据行锁删掉,完成数据清理即可。 

            二阶段如果是回滚的话(业务某环节出错抛异常),Seata 就需要回滚一阶段已经执行的“业务 SQL”还原业务数据。回滚方式便是用“before image还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理

             当全局回滚发生时,server会发送一个回滚消息client端,client接到回滚通知后,通过xidbranchid找到对应的undolog,获取到事务执行前后的数据镜像,解析成反向sql进行补偿。

     二、Seata的环境配置

            1.首先准备好压缩包,例如:seata-server-0.9.0

            2.解压压缩包,打开 conf/file.conf 

                    2.1 修改server模块的组名,名字随意

                     2.2 修改数据库的信息

            :我用的是mysql8.xurldriver-class-namemysql5.x都有所不同,并且需要去lib文件夹内删除原来的mysql-connecter-java.jar , 把8.x版本的jar包拷贝进去

    在这里插入图片描述

             3.修改 conf/registry.conf ,配置nacos的地址

             4.在数据库中创建seata数据库,执行 conf/db_store.sql 脚本(若conf内没有就去官网找)

             5.在启动了nacos的前提下去执行 bin/seata-server.bat

    三、演示

            这里我们会创建三个服务,一个订单服务,一个库存服务,一个账户服务
            当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。
             该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。

    3.1 建数据库表

    1. 1.CREATE DATABASE seata_order; #订单
    2. CREATE TABLE t_order (
    3. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    4. `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
    5. `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
    6. `count` INT(11) DEFAULT NULL COMMENT '数量',
    7. `money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
    8. `status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结'
    9. ) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
    10. 2.CREATE DATABASE seata_storage; #库存
    11. CREATE TABLE t_storage (
    12. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    13. `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
    14. `total` INT(11) DEFAULT NULL COMMENT '总库存',
    15. `used` INT(11) DEFAULT NULL COMMENT '已用库存',
    16. `residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
    17. ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
    18. INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)
    19. VALUES ('1', '1', '100', '0', '100');
    20. 3.CREATE DATABASE seata_account; #账户
    21. CREATE TABLE t_account (
    22. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
    23. `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
    24. `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
    25. `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
    26. `residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
    27. ) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
    28. INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');

             按照上述3库分别建对应的回滚日志表,脚本为conf/db_undo_log.sql,若没有去官网下

    1. drop table `undo_log`;
    2. CREATE TABLE `undo_log` (
    3. `id` bigint(20) NOT NULL AUTO_INCREMENT,
    4. `branch_id` bigint(20) NOT NULL,
    5. `xid` varchar(100) NOT NULL,
    6. `context` varchar(128) NOT NULL,
    7. `rollback_info` longblob NOT NULL,
    8. `log_status` int(11) NOT NULL,
    9. `log_created` datetime NOT NULL,
    10. `log_modified` datetime NOT NULL,
    11. `ext` varchar(100) DEFAULT NULL,
    12. PRIMARY KEY (`id`),
    13. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    14. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

    3.2 订单模块

            1.新建Order-Module

            2.POM

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.alibaba.cloudgroupId>
    4. <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>com.alibaba.cloudgroupId>
    8. <artifactId>spring-cloud-starter-alibaba-seataartifactId>
    9. <exclusions>
    10. <exclusion>
    11. <artifactId>seata-allartifactId>
    12. <groupId>io.seatagroupId>
    13. exclusion>
    14. exclusions>
    15. dependency>
    16. <dependency>
    17. <groupId>io.seatagroupId>
    18. <artifactId>seata-allartifactId>
    19. <version>0.9.0version>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.cloudgroupId>
    23. <artifactId>spring-cloud-starter-openfeignartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.bootgroupId>
    27. <artifactId>spring-boot-starter-webartifactId>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-starter-actuatorartifactId>
    32. dependency>
    33. <dependency>
    34. <groupId>mysqlgroupId>
    35. <artifactId>mysql-connector-javaartifactId>
    36. <version>8.0.26version>
    37. dependency>
    38. <dependency>
    39. <groupId>com.alibabagroupId>
    40. <artifactId>druid-spring-boot-starterartifactId>
    41. <version>1.1.10version>
    42. dependency>
    43. <dependency>
    44. <groupId>org.mybatis.spring.bootgroupId>
    45. <artifactId>mybatis-spring-boot-starterartifactId>
    46. <version>2.0.0version>
    47. dependency>
    48. <dependency>
    49. <groupId>org.springframework.bootgroupId>
    50. <artifactId>spring-boot-starter-testartifactId>
    51. <scope>testscope>
    52. dependency>
    53. <dependency>
    54. <groupId>org.projectlombokgroupId>
    55. <artifactId>lombokartifactId>
    56. <optional>trueoptional>
    57. dependency>
    58. dependencies>

            3.YML

    1. server:
    2. port: 2001
    3. spring:
    4. application:
    5. name: seata-order-service
    6. cloud:
    7. alibaba:
    8. seata:
    9. #自定义事务组名称需要与seata-server中的对应
    10. tx-service-group: wz_tx_group
    11. nacos:
    12. discovery:
    13. server-addr: localhost:8848
    14. datasource:
    15. driver-class-name: com.mysql.cj.jdbc.Driver
    16. url: jdbc:mysql://localhost:3306/seata_order?serverTimeZone=UTC
    17. username: root
    18. password: xxx
    19. feign:
    20. hystrix:
    21. enabled: false
    22. logging:
    23. level:
    24. io:
    25. seata: info
    26. mybatis:
    27. mapperLocations: classpath*:mapper/*.xml

            4.domain

    1. @Data
    2. @AllArgsConstructor
    3. @NoArgsConstructor
    4. public class CommonResult
    5. {
    6. private Integer code;
    7. private String message;
    8. private T data;
    9. public CommonResult(Integer code, String message)
    10. {
    11. this(code,message,null);
    12. }
    13. }
    1. @Data
    2. @AllArgsConstructor
    3. @NoArgsConstructor
    4. public class Order
    5. {
    6. private Long id;
    7. private Long userId;
    8. private Long productId;
    9. private Integer count;
    10. private BigDecimal money;
    11. /**
    12. * 订单状态:0:创建中;1:已完结
    13. */
    14. private Integer status;
    15. }

            5.Dao与mapper.xml

    1. @Mapper
    2. public interface OrderDao {
    3. /**
    4. * 创建订单
    5. */
    6. void create(Order order);
    7. /**
    8. * 修改订单金额
    9. */
    10. void update(@Param("userId") Long userId, @Param("status") Integer status);
    11. }
    1. "1.0" encoding="UTF-8" ?>
    2. mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    3. <mapper namespace="com.atguigu.springcloud.alibaba.dao.OrderDao">
    4. <resultMap id="BaseResultMap" type="com.atguigu.springcloud.alibaba.domain.Order">
    5. <id column="id" property="id" jdbcType="BIGINT"/>
    6. <result column="user_id" property="userId" jdbcType="BIGINT"/>
    7. <result column="product_id" property="productId" jdbcType="BIGINT"/>
    8. <result column="count" property="count" jdbcType="INTEGER"/>
    9. <result column="money" property="money" jdbcType="DECIMAL"/>
    10. <result column="status" property="status" jdbcType="INTEGER"/>
    11. resultMap>
    12. <insert id="create">
    13. INSERT INTO `t_order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)
    14. VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0);
    15. insert>
    16. <update id="update">
    17. UPDATE `t_order`
    18. SET status = 1
    19. WHERE user_id = #{userId} AND status = #{status};
    20. update>
    21. mapper>

            6.Service

    1. public interface OrderService {
    2. /**
    3. * 创建订单
    4. */
    5. void create(Order order);
    6. }
    1. @Service
    2. @Slf4j
    3. public class OrderServiceImpl implements OrderService
    4. {
    5. @Resource
    6. private OrderDao orderDao;
    7. @Resource
    8. private StorageService storageService;
    9. @Resource
    10. private AccountService accountService;
    11. /**
    12. * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
    13. * 简单说:
    14. * 下订单->减库存->减余额->改状态
    15. */
    16. @Override
    17. @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
    18. public void create(Order order) {
    19. log.info("------->下单开始");
    20. //本应用创建订单
    21. orderDao.create(order);
    22. //远程调用库存服务扣减库存
    23. log.info("------->order-service中扣减库存开始");
    24. storageService.decrease(order.getProductId(),order.getCount());
    25. log.info("------->order-service中扣减库存结束");
    26. //远程调用账户服务扣减余额
    27. log.info("------->order-service中扣减余额开始");
    28. accountService.decrease(order.getUserId(),order.getMoney());
    29. log.info("------->order-service中扣减余额结束");
    30. //修改订单状态为已完成
    31. log.info("------->order-service中修改订单状态开始");
    32. orderDao.update(order.getUserId(),0);
    33. log.info("------->order-service中修改订单状态结束");
    34. log.info("------->下单结束");
    35. }
    36. }
    1. @FeignClient(value = "seata-storage-service")
    2. public interface StorageService {
    3. /**
    4. * 扣减库存
    5. */
    6. @PostMapping(value = "/storage/decrease")
    7. CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
    8. }
    1. @FeignClient(value = "seata-account-service")
    2. public interface AccountService {
    3. /**
    4. * 扣减账户余额
    5. */
    6. //@RequestMapping(value = "/account/decrease", method = RequestMethod.POST, produces = "application/json; charset=UTF-8")
    7. @PostMapping("/account/decrease")
    8. CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
    9. }

            7.Controller

    1. @RestController
    2. public class OrderController {
    3. @Autowired
    4. private OrderService orderService;
    5. /**
    6. * 创建订单
    7. */
    8. @GetMapping("/order/create")
    9. public CommonResult create( Order order) {
    10. orderService.create(order);
    11. return new CommonResult(200, "订单创建成功!");
    12. }
    13. }

            8.Config

    1. @Configuration
    2. @MapperScan({"com.atguigu.springcloud.alibaba.dao"})
    3. public class MyBatisConfig {
    4. }
    1. @Configuration
    2. public class DataSourceProxyConfig {
    3. @Value("${mybatis.mapperLocations}")
    4. private String mapperLocations;
    5. @Bean
    6. @ConfigurationProperties(prefix = "spring.datasource")
    7. public DataSource druidDataSource(){
    8. return new DruidDataSource();
    9. }
    10. @Bean
    11. public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    12. return new DataSourceProxy(dataSource);
    13. }
    14. @Bean
    15. public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    16. SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    17. sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    18. sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
    19. sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
    20. return sqlSessionFactoryBean.getObject();
    21. }
    22. }

            9.主启动

    1. @EnableDiscoveryClient
    2. @EnableFeignClients
    3. @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建
    4. public class SeataOrderMainApp2001
    5. {
    6. public static void main(String[] args)
    7. {
    8. SpringApplication.run(SeataOrderMainApp2001.class, args);
    9. }
    10. }

            10.file.conf

    1. transport {
    2. # tcp udt unix-domain-socket
    3. type = "TCP"
    4. #NIO NATIVE
    5. server = "NIO"
    6. #enable heartbeat
    7. heartbeat = true
    8. #thread factory for netty
    9. thread-factory {
    10. boss-thread-prefix = "NettyBoss"
    11. worker-thread-prefix = "NettyServerNIOWorker"
    12. server-executor-thread-prefix = "NettyServerBizHandler"
    13. share-boss-worker = false
    14. client-selector-thread-prefix = "NettyClientSelector"
    15. client-selector-thread-size = 1
    16. client-worker-thread-prefix = "NettyClientWorkerThread"
    17. # netty boss thread size,will not be used for UDT
    18. boss-thread-size = 1
    19. #auto default pin or 8
    20. worker-thread-size = 8
    21. }
    22. shutdown {
    23. # when destroy server, wait seconds
    24. wait = 3
    25. }
    26. serialization = "seata"
    27. compressor = "none"
    28. }
    29. service {
    30. vgroup_mapping.wz_tx_group = "default" #修改自定义事务组名称
    31. default.grouplist = "127.0.0.1:8091"
    32. enableDegrade = false
    33. disable = false
    34. max.commit.retry.timeout = "-1"
    35. max.rollback.retry.timeout = "-1"
    36. disableGlobalTransaction = false
    37. }
    38. client {
    39. async.commit.buffer.limit = 10000
    40. lock {
    41. retry.internal = 10
    42. retry.times = 30
    43. }
    44. report.retry.count = 5
    45. tm.commit.retry.count = 1
    46. tm.rollback.retry.count = 1
    47. }
    48. ## transaction log store
    49. store {
    50. ## store mode: file、db
    51. mode = "db"
    52. ## file store
    53. file {
    54. dir = "sessionStore"
    55. # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    56. max-branch-session-size = 16384
    57. # globe session size , if exceeded throws exceptions
    58. max-global-session-size = 512
    59. # file buffer size , if exceeded allocate new buffer
    60. file-write-buffer-cache-size = 16384
    61. # when recover batch read size
    62. session.reload.read_size = 100
    63. # async, sync
    64. flush-disk-mode = async
    65. }
    66. ## database store
    67. db {
    68. ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    69. datasource = "dbcp"
    70. ## mysql/oracle/h2/oceanbase etc.
    71. db-type = "mysql"
    72. driver-class-name = "com.mysql.cj.jdbc.Driver"
    73. url = "jdbc:mysql://127.0.0.1:3306/seata?serverTimeZone=UTC"
    74. user = "root"
    75. password = "xxx"
    76. min-conn = 1
    77. max-conn = 3
    78. global.table = "global_table"
    79. branch.table = "branch_table"
    80. lock-table = "lock_table"
    81. query-limit = 100
    82. }
    83. }
    84. lock {
    85. ## the lock store mode: local、remote
    86. mode = "remote"
    87. local {
    88. ## store locks in user's database
    89. }
    90. remote {
    91. ## store locks in the seata's server
    92. }
    93. }
    94. recovery {
    95. #schedule committing retry period in milliseconds
    96. committing-retry-period = 1000
    97. #schedule asyn committing retry period in milliseconds
    98. asyn-committing-retry-period = 1000
    99. #schedule rollbacking retry period in milliseconds
    100. rollbacking-retry-period = 1000
    101. #schedule timeout retry period in milliseconds
    102. timeout-retry-period = 1000
    103. }
    104. transaction {
    105. undo.data.validation = true
    106. undo.log.serialization = "jackson"
    107. undo.log.save.days = 7
    108. #schedule delete expired undo_log in milliseconds
    109. undo.log.delete.period = 86400000
    110. undo.log.table = "undo_log"
    111. }
    112. ## metrics settings
    113. metrics {
    114. enabled = false
    115. registry-type = "compact"
    116. # multi exporters use comma divided
    117. exporter-list = "prometheus"
    118. exporter-prometheus-port = 9898
    119. }
    120. support {
    121. ## spring
    122. spring {
    123. # auto proxy the DataSource bean
    124. datasource.autoproxy = false
    125. }
    126. }

            11.registry.conf 

    1. registry {
    2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
    3. type = "nacos"
    4. nacos {
    5. serverAddr = "localhost:8848"
    6. namespace = ""
    7. cluster = "default"
    8. }
    9. eureka {
    10. serviceUrl = "http://localhost:8761/eureka"
    11. application = "default"
    12. weight = "1"
    13. }
    14. redis {
    15. serverAddr = "localhost:6379"
    16. db = "0"
    17. }
    18. zk {
    19. cluster = "default"
    20. serverAddr = "127.0.0.1:2181"
    21. session.timeout = 6000
    22. connect.timeout = 2000
    23. }
    24. consul {
    25. cluster = "default"
    26. serverAddr = "127.0.0.1:8500"
    27. }
    28. etcd3 {
    29. cluster = "default"
    30. serverAddr = "http://localhost:2379"
    31. }
    32. sofa {
    33. serverAddr = "127.0.0.1:9603"
    34. application = "default"
    35. region = "DEFAULT_ZONE"
    36. datacenter = "DefaultDataCenter"
    37. cluster = "default"
    38. group = "SEATA_GROUP"
    39. addressWaitTime = "3000"
    40. }
    41. file {
    42. name = "file.conf"
    43. }
    44. }
    45. config {
    46. # file、nacos 、apollo、zk、consul、etcd3
    47. type = "file"
    48. nacos {
    49. serverAddr = "localhost"
    50. namespace = ""
    51. }
    52. consul {
    53. serverAddr = "127.0.0.1:8500"
    54. }
    55. apollo {
    56. app.id = "seata-server"
    57. apollo.meta = "http://192.168.1.204:8801"
    58. }
    59. zk {
    60. serverAddr = "127.0.0.1:2181"
    61. session.timeout = 6000
    62. connect.timeout = 2000
    63. }
    64. etcd3 {
    65. serverAddr = "http://localhost:2379"
    66. }
    67. file {
    68. name = "file.conf"
    69. }
    70. }

    3.3 库存模块

            1.新建Storage-Module

            2.POM

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.alibaba.cloudgroupId>
    4. <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>com.alibaba.cloudgroupId>
    8. <artifactId>spring-cloud-starter-alibaba-seataartifactId>
    9. <exclusions>
    10. <exclusion>
    11. <artifactId>seata-allartifactId>
    12. <groupId>io.seatagroupId>
    13. exclusion>
    14. exclusions>
    15. dependency>
    16. <dependency>
    17. <groupId>io.seatagroupId>
    18. <artifactId>seata-allartifactId>
    19. <version>0.9.0version>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.cloudgroupId>
    23. <artifactId>spring-cloud-starter-openfeignartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.bootgroupId>
    27. <artifactId>spring-boot-starter-webartifactId>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-starter-testartifactId>
    32. <scope>testscope>
    33. dependency>
    34. <dependency>
    35. <groupId>org.mybatis.spring.bootgroupId>
    36. <artifactId>mybatis-spring-boot-starterartifactId>
    37. <version>2.0.0version>
    38. dependency>
    39. <dependency>
    40. <groupId>mysqlgroupId>
    41. <artifactId>mysql-connector-javaartifactId>
    42. <version>8.0.26version>
    43. dependency>
    44. <dependency>
    45. <groupId>com.alibabagroupId>
    46. <artifactId>druid-spring-boot-starterartifactId>
    47. <version>1.1.10version>
    48. dependency>
    49. <dependency>
    50. <groupId>org.projectlombokgroupId>
    51. <artifactId>lombokartifactId>
    52. <optional>trueoptional>
    53. dependency>
    54. dependencies>

            3.YML

    1. server:
    2. port: 2002
    3. spring:
    4. application:
    5. name: seata-storage-service
    6. cloud:
    7. alibaba:
    8. seata:
    9. tx-service-group: wz_tx_group
    10. nacos:
    11. discovery:
    12. server-addr: localhost:8848
    13. datasource:
    14. driver-class-name: com.mysql.cj.jdbc.Driver
    15. url: jdbc:mysql://localhost:3306/seata_storage?serverTimeZone=UTC
    16. username: root
    17. password: xxx
    18. logging:
    19. level:
    20. io:
    21. seata: info
    22. mybatis:
    23. mapperLocations: classpath*:mapper/*.xml

            4.domain

    1. @Data
    2. @AllArgsConstructor
    3. @NoArgsConstructor
    4. public class CommonResult
    5. {
    6. private Integer code;
    7. private String message;
    8. private T data;
    9. public CommonResult(Integer code, String message)
    10. {
    11. this(code,message,null);
    12. }
    13. }
    1. @Data
    2. public class Storage {
    3. private Long id;
    4. /**
    5. * 产品id
    6. */
    7. private Long productId;
    8. /**
    9. * 总库存
    10. */
    11. private Integer total;
    12. /**
    13. * 已用库存
    14. */
    15. private Integer used;
    16. /**
    17. * 剩余库存
    18. */
    19. private Integer residue;
    20. }

            5.Dao

    1. @Mapper
    2. public interface StorageDao {
    3. //扣减库存
    4. void decrease(@Param("productId") Long productId, @Param("count") Integer count);
    5. }

            6.Service

    1. public interface StorageService {
    2. /**
    3. * 扣减库存
    4. */
    5. void decrease(Long productId, Integer count);
    6. }
    1. @Service
    2. public class StorageServiceImpl implements StorageService {
    3. private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
    4. @Resource
    5. private StorageDao storageDao;
    6. /**
    7. * 扣减库存
    8. */
    9. @Override
    10. public void decrease(Long productId, Integer count) {
    11. LOGGER.info("------->storage-service中扣减库存开始");
    12. storageDao.decrease(productId,count);
    13. LOGGER.info("------->storage-service中扣减库存结束");
    14. }
    15. }

            7.Controller

    1. @RestController
    2. public class StorageController {
    3. @Autowired
    4. private StorageService storageService;
    5. /**
    6. * 扣减库存
    7. */
    8. @RequestMapping("/storage/decrease")
    9. public CommonResult decrease(Long productId, Integer count) {
    10. storageService.decrease(productId, count);
    11. return new CommonResult(200,"扣减库存成功!");
    12. }
    13. }

            8.Config

    1. @Configuration
    2. public class DataSourceProxyConfig {
    3. @Value("${mybatis.mapperLocations}")
    4. private String mapperLocations;
    5. @Bean
    6. @ConfigurationProperties(prefix = "spring.datasource")
    7. public DataSource druidDataSource(){
    8. return new DruidDataSource();
    9. }
    10. @Bean
    11. public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    12. return new DataSourceProxy(dataSource);
    13. }
    14. @Bean
    15. public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    16. SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    17. sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    18. sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
    19. sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
    20. return sqlSessionFactoryBean.getObject();
    21. }
    22. }
    1. @Configuration
    2. @MapperScan({"com.atguigu.springcloud.alibaba.dao"})
    3. public class MyBatisConfig {
    4. }

            9.主启动

    1. @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
    2. @EnableDiscoveryClient
    3. @EnableFeignClients
    4. public class SeataStorageServiceApplication2002
    5. {
    6. public static void main(String[] args)
    7. {
    8. SpringApplication.run(SeataStorageServiceApplication2002.class, args);
    9. }
    10. }

            10.file.conf        

    1. transport {
    2. # tcp udt unix-domain-socket
    3. type = "TCP"
    4. #NIO NATIVE
    5. server = "NIO"
    6. #enable heartbeat
    7. heartbeat = true
    8. #thread factory for netty
    9. thread-factory {
    10. boss-thread-prefix = "NettyBoss"
    11. worker-thread-prefix = "NettyServerNIOWorker"
    12. server-executor-thread-prefix = "NettyServerBizHandler"
    13. share-boss-worker = false
    14. client-selector-thread-prefix = "NettyClientSelector"
    15. client-selector-thread-size = 1
    16. client-worker-thread-prefix = "NettyClientWorkerThread"
    17. # netty boss thread size,will not be used for UDT
    18. boss-thread-size = 1
    19. #auto default pin or 8
    20. worker-thread-size = 8
    21. }
    22. shutdown {
    23. # when destroy server, wait seconds
    24. wait = 3
    25. }
    26. serialization = "seata"
    27. compressor = "none"
    28. }
    29. service {
    30. #vgroup->rgroup
    31. vgroup_mapping.wz_tx_group = "default"
    32. #only support single node
    33. default.grouplist = "127.0.0.1:8091"
    34. #degrade current not support
    35. enableDegrade = false
    36. #disable
    37. disable = false
    38. #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
    39. max.commit.retry.timeout = "-1"
    40. max.rollback.retry.timeout = "-1"
    41. disableGlobalTransaction = false
    42. }
    43. client {
    44. async.commit.buffer.limit = 10000
    45. lock {
    46. retry.internal = 10
    47. retry.times = 30
    48. }
    49. report.retry.count = 5
    50. tm.commit.retry.count = 1
    51. tm.rollback.retry.count = 1
    52. }
    53. transaction {
    54. undo.data.validation = true
    55. undo.log.serialization = "jackson"
    56. undo.log.save.days = 7
    57. #schedule delete expired undo_log in milliseconds
    58. undo.log.delete.period = 86400000
    59. undo.log.table = "undo_log"
    60. }
    61. support {
    62. ## spring
    63. spring {
    64. # auto proxy the DataSource bean
    65. datasource.autoproxy = false
    66. }
    67. }

            11.registry.conf 

    1. registry {
    2. # file 、nacos 、eureka、redis、zk
    3. type = "nacos"
    4. nacos {
    5. serverAddr = "localhost:8848"
    6. namespace = ""
    7. cluster = "default"
    8. }
    9. eureka {
    10. serviceUrl = "http://localhost:8761/eureka"
    11. application = "default"
    12. weight = "1"
    13. }
    14. redis {
    15. serverAddr = "localhost:6381"
    16. db = "0"
    17. }
    18. zk {
    19. cluster = "default"
    20. serverAddr = "127.0.0.1:2181"
    21. session.timeout = 6000
    22. connect.timeout = 2000
    23. }
    24. file {
    25. name = "file.conf"
    26. }
    27. }
    28. config {
    29. # file、nacos 、apollo、zk
    30. type = "file"
    31. nacos {
    32. serverAddr = "localhost"
    33. namespace = ""
    34. cluster = "default"
    35. }
    36. apollo {
    37. app.id = "fescar-server"
    38. apollo.meta = "http://192.168.1.204:8801"
    39. }
    40. zk {
    41. serverAddr = "127.0.0.1:2181"
    42. session.timeout = 6000
    43. connect.timeout = 2000
    44. }
    45. file {
    46. name = "file.conf"
    47. }
    48. }

    3.4 账户模块

            1.新建Account-Module

            2.POM

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.alibaba.cloudgroupId>
    4. <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>com.alibaba.cloudgroupId>
    8. <artifactId>spring-cloud-starter-alibaba-seataartifactId>
    9. <exclusions>
    10. <exclusion>
    11. <artifactId>seata-allartifactId>
    12. <groupId>io.seatagroupId>
    13. exclusion>
    14. exclusions>
    15. dependency>
    16. <dependency>
    17. <groupId>io.seatagroupId>
    18. <artifactId>seata-allartifactId>
    19. <version>0.9.0version>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.cloudgroupId>
    23. <artifactId>spring-cloud-starter-openfeignartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.bootgroupId>
    27. <artifactId>spring-boot-starter-webartifactId>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-starter-testartifactId>
    32. <scope>testscope>
    33. dependency>
    34. <dependency>
    35. <groupId>org.mybatis.spring.bootgroupId>
    36. <artifactId>mybatis-spring-boot-starterartifactId>
    37. <version>2.0.0version>
    38. dependency>
    39. <dependency>
    40. <groupId>mysqlgroupId>
    41. <artifactId>mysql-connector-javaartifactId>
    42. <version>8.0.26version>
    43. dependency>
    44. <dependency>
    45. <groupId>com.alibabagroupId>
    46. <artifactId>druid-spring-boot-starterartifactId>
    47. <version>1.1.10version>
    48. dependency>
    49. <dependency>
    50. <groupId>org.projectlombokgroupId>
    51. <artifactId>lombokartifactId>
    52. <optional>trueoptional>
    53. dependency>
    54. dependencies>

            3.YML

    1. server:
    2. port: 2003
    3. spring:
    4. application:
    5. name: seata-account-service
    6. cloud:
    7. alibaba:
    8. seata:
    9. tx-service-group: wz_tx_group
    10. nacos:
    11. discovery:
    12. server-addr: localhost:8848
    13. datasource:
    14. driver-class-name: com.mysql.cj.jdbc.Driver
    15. url: jdbc:mysql://localhost:3306/seata_account?serverTimeZone=UTC
    16. username: root
    17. password: xxx
    18. feign:
    19. hystrix:
    20. enabled: false
    21. logging:
    22. level:
    23. io:
    24. seata: info
    25. mybatis:
    26. mapperLocations: classpath*:mapper/*.xml

            4.domain

    1. @Data
    2. @AllArgsConstructor
    3. @NoArgsConstructor
    4. public class Account {
    5. private Long id;
    6. /**
    7. * 用户id
    8. */
    9. private Long userId;
    10. /**
    11. * 总额度
    12. */
    13. private BigDecimal total;
    14. /**
    15. * 已用额度
    16. */
    17. private BigDecimal used;
    18. /**
    19. * 剩余额度
    20. */
    21. private BigDecimal residue;
    22. }
    1. @Data
    2. @AllArgsConstructor
    3. @NoArgsConstructor
    4. public class CommonResult
    5. {
    6. private Integer code;
    7. private String message;
    8. private T data;
    9. public CommonResult(Integer code, String message)
    10. {
    11. this(code,message,null);
    12. }
    13. }

            5.Dao

    1. @Mapper
    2. public interface AccountDao {
    3. /**
    4. * 扣减账户余额
    5. */
    6. void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
    7. }

            6.Service

    1. public interface AccountService {
    2. /**
    3. * 扣减账户余额
    4. * @param userId 用户id
    5. * @param money 金额
    6. */
    7. void decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
    8. }
    1. @Service
    2. public class AccountServiceImpl implements AccountService {
    3. private static final Logger LOGGER = LoggerFactory.getLogger(AccountServiceImpl.class);
    4. @Resource
    5. AccountDao accountDao;
    6. /**
    7. * 扣减账户余额
    8. */
    9. @Override
    10. public void decrease(Long userId, BigDecimal money) {
    11. LOGGER.info("------->account-service中扣减账户余额开始");
    12. //模拟超时异常,全局事务回滚
    13. //暂停几秒钟线程
    14. try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
    15. accountDao.decrease(userId,money);
    16. LOGGER.info("------->account-service中扣减账户余额结束");
    17. }
    18. }

            7.Controller

    1. @RestController
    2. public class AccountController {
    3. @Resource
    4. AccountService accountService;
    5. /**
    6. * 扣减账户余额
    7. */
    8. @RequestMapping("/account/decrease")
    9. public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){
    10. accountService.decrease(userId,money);
    11. return new CommonResult(200,"扣减账户余额成功!");
    12. }
    13. }

            8.Config

    1. @Configuration
    2. @MapperScan({"com.atguigu.springcloud.alibaba.dao"})
    3. public class MyBatisConfig {
    4. }
    1. @Configuration
    2. public class DataSourceProxyConfig {
    3. @Value("${mybatis.mapperLocations}")
    4. private String mapperLocations;
    5. @Bean
    6. @ConfigurationProperties(prefix = "spring.datasource")
    7. public DataSource druidDataSource(){
    8. return new DruidDataSource();
    9. }
    10. @Bean
    11. public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    12. return new DataSourceProxy(dataSource);
    13. }
    14. @Bean
    15. public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    16. SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    17. sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    18. sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
    19. sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
    20. return sqlSessionFactoryBean.getObject();
    21. }
    22. }

            9.主启动

    1. @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
    2. @EnableDiscoveryClient
    3. @EnableFeignClients
    4. public class SeataAccountMainApp2003
    5. {
    6. public static void main(String[] args)
    7. {
    8. SpringApplication.run(SeataAccountMainApp2003.class, args);
    9. }
    10. }

            10.file.conf        

    1. transport {
    2. # tcp udt unix-domain-socket
    3. type = "TCP"
    4. #NIO NATIVE
    5. server = "NIO"
    6. #enable heartbeat
    7. heartbeat = true
    8. #thread factory for netty
    9. thread-factory {
    10. boss-thread-prefix = "NettyBoss"
    11. worker-thread-prefix = "NettyServerNIOWorker"
    12. server-executor-thread-prefix = "NettyServerBizHandler"
    13. share-boss-worker = false
    14. client-selector-thread-prefix = "NettyClientSelector"
    15. client-selector-thread-size = 1
    16. client-worker-thread-prefix = "NettyClientWorkerThread"
    17. # netty boss thread size,will not be used for UDT
    18. boss-thread-size = 1
    19. #auto default pin or 8
    20. worker-thread-size = 8
    21. }
    22. shutdown {
    23. # when destroy server, wait seconds
    24. wait = 3
    25. }
    26. serialization = "seata"
    27. compressor = "none"
    28. }
    29. service {
    30. vgroup_mapping.wz_tx_group = "default" #修改自定义事务组名称
    31. default.grouplist = "127.0.0.1:8091"
    32. enableDegrade = false
    33. disable = false
    34. max.commit.retry.timeout = "-1"
    35. max.rollback.retry.timeout = "-1"
    36. disableGlobalTransaction = false
    37. }
    38. client {
    39. async.commit.buffer.limit = 10000
    40. lock {
    41. retry.internal = 10
    42. retry.times = 30
    43. }
    44. report.retry.count = 5
    45. tm.commit.retry.count = 1
    46. tm.rollback.retry.count = 1
    47. }
    48. ## transaction log store
    49. store {
    50. ## store mode: file、db
    51. mode = "db"
    52. ## file store
    53. file {
    54. dir = "sessionStore"
    55. # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    56. max-branch-session-size = 16384
    57. # globe session size , if exceeded throws exceptions
    58. max-global-session-size = 512
    59. # file buffer size , if exceeded allocate new buffer
    60. file-write-buffer-cache-size = 16384
    61. # when recover batch read size
    62. session.reload.read_size = 100
    63. # async, sync
    64. flush-disk-mode = async
    65. }
    66. ## database store
    67. db {
    68. ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    69. datasource = "dbcp"
    70. ## mysql/oracle/h2/oceanbase etc.
    71. db-type = "mysql"
    72. driver-class-name = "com.mysql.cj.jdbc.Driver"
    73. url = "jdbc:mysql://172.0.0.1:3306/seata_account?serverTimeZone=UTC"
    74. user = "root"
    75. password = "xxx"
    76. min-conn = 1
    77. max-conn = 3
    78. global.table = "global_table"
    79. branch.table = "branch_table"
    80. lock-table = "lock_table"
    81. query-limit = 100
    82. }
    83. }
    84. lock {
    85. ## the lock store mode: local、remote
    86. mode = "remote"
    87. local {
    88. ## store locks in user's database
    89. }
    90. remote {
    91. ## store locks in the seata's server
    92. }
    93. }
    94. recovery {
    95. #schedule committing retry period in milliseconds
    96. committing-retry-period = 1000
    97. #schedule asyn committing retry period in milliseconds
    98. asyn-committing-retry-period = 1000
    99. #schedule rollbacking retry period in milliseconds
    100. rollbacking-retry-period = 1000
    101. #schedule timeout retry period in milliseconds
    102. timeout-retry-period = 1000
    103. }
    104. transaction {
    105. undo.data.validation = true
    106. undo.log.serialization = "jackson"
    107. undo.log.save.days = 7
    108. #schedule delete expired undo_log in milliseconds
    109. undo.log.delete.period = 86400000
    110. undo.log.table = "undo_log"
    111. }
    112. ## metrics settings
    113. metrics {
    114. enabled = false
    115. registry-type = "compact"
    116. # multi exporters use comma divided
    117. exporter-list = "prometheus"
    118. exporter-prometheus-port = 9898
    119. }
    120. support {
    121. ## spring
    122. spring {
    123. # auto proxy the DataSource bean
    124. datasource.autoproxy = false
    125. }
    126. }

            11.registry.conf 

    1. registry {
    2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
    3. type = "nacos"
    4. nacos {
    5. serverAddr = "localhost:8848"
    6. namespace = ""
    7. cluster = "default"
    8. }
    9. eureka {
    10. serviceUrl = "http://localhost:8761/eureka"
    11. application = "default"
    12. weight = "1"
    13. }
    14. redis {
    15. serverAddr = "localhost:6379"
    16. db = "0"
    17. }
    18. zk {
    19. cluster = "default"
    20. serverAddr = "127.0.0.1:2181"
    21. session.timeout = 6000
    22. connect.timeout = 2000
    23. }
    24. consul {
    25. cluster = "default"
    26. serverAddr = "127.0.0.1:8500"
    27. }
    28. etcd3 {
    29. cluster = "default"
    30. serverAddr = "http://localhost:2379"
    31. }
    32. sofa {
    33. serverAddr = "127.0.0.1:9603"
    34. application = "default"
    35. region = "DEFAULT_ZONE"
    36. datacenter = "DefaultDataCenter"
    37. cluster = "default"
    38. group = "SEATA_GROUP"
    39. addressWaitTime = "3000"
    40. }
    41. file {
    42. name = "file.conf"
    43. }
    44. }
    45. config {
    46. # file、nacos 、apollo、zk、consul、etcd3
    47. type = "file"
    48. nacos {
    49. serverAddr = "localhost"
    50. namespace = ""
    51. }
    52. consul {
    53. serverAddr = "127.0.0.1:8500"
    54. }
    55. apollo {
    56. app.id = "seata-server"
    57. apollo.meta = "http://192.168.1.204:8801"
    58. }
    59. zk {
    60. serverAddr = "127.0.0.1:2181"
    61. session.timeout = 6000
    62. connect.timeout = 2000
    63. }
    64. etcd3 {
    65. serverAddr = "http://localhost:2379"
    66. }
    67. file {
    68. name = "file.conf"
    69. }
    70. }

    3.5 测试

            注意我们在AccountServiceImpl添加超时:

    1. //模拟超时异常,全局事务回滚
    2. //暂停几秒钟线程
    3. try { TimeUnit.SECONDS.sleep(20); } ... ...

            输入:

    http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100

            直接报错了,查看数据库的数据,没有任何改变。

            如果我们把OrderServiceImpl中的@GlobalTransactional注解去掉,再尝试,也会报错,但会发现这样的BUG:钱已经扣了,但订单状态仍显示未完成!

     

  • 相关阅读:
    2022宁夏杯B题论文分析+完整代码(大学生就业问题分析)
    算法设计与分析 SCAU10346 带价值的作业安排问题
    deque的插入和删除
    HNU计网实验:实验二 网络基础编程实验(JAVA\Python3)
    代码随想录Day22 | Leetcode39 组合总和、Leetcode40 数组总和II | Leetcode131 分割回文串
    一文了解:离散型制造业轻量化MES解决方案
    JS 运算符使用
    VisionMaster 学习笔记(仪表盘检测)
    第1章丨IRIS Globals 简介
    深入理解面向对象(第二篇)
  • 原文地址:https://blog.csdn.net/weixin_62427168/article/details/126558693