(1)原子性:事务中的所有操作,要么全部成功,要么全部失败
(2)隔离性:读写发生在不同的线程中时,对同一资源操作的事务不能同时发生
(3)持久性:对数据库做的一切修改将永久保存,不管是否出现故障
(4)一致性:要保证数据库内部完整性约束,声明式约束
分布式系统有三个指标
(1)Consistency(一致性)
(2)Availability(可用性)
(3)Partition tolerance (分区容错性)
分布式系统无法同时满足这三个指标。 这个结论就叫做 CAP 定理。
(1)Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致
(2)Availability(可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝
(3)Partition tolerance (分区容错性)
Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区。
Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务
BASE理论是对CAP的一种解决思路,包含三个思想:
(1)Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。
(2)Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
(3)Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。
而分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论:
(1)AP模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现最终一致。
(2)CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态
指一次大的操作由不同的小操作组成的,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。从本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
在微服务独立数据源的思想,每一个微服务都有一个或者多个数据源,虽然单机单库事务已经非常成熟,但是由于网路延迟和不可靠的客观因素,分布式事务到现在也还没有成熟的方案,对于中大型网站,特别是涉及到交易的网站,一旦将服务拆分微服务,分布式事务一定是绕不开的一个组件,通常解决分布式事务问题。
Seata是阿里开源的一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。
Seata目标打造一站式的分布事务的解决方案,最终会提供四种事务模式:
(1)AT 模式:
(2)TCC 模式:
(3)Saga 模式:
(4)XA 模式:正在开发中… 目前使用的流行度情况是:AT > TCC > Saga。因此,我们在学习Seata的时候,可以花更多精力在AT模式上,最好搞懂背后的实现原理,毕竟分布式事务涉及到数据的正确性,出问题需要快速排查定位并解决。
Seata事务管理中有三个重要的角色:
(1)TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。
(2)TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。
(3)RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
其中,TM是一个分布式事务的发起者和终结者,TC负责维护分布式事务的运行状态,而RM则负责本地事务的运行。
(1)首先TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
(2)XID在微服务调用链路的上下文中传播。
(3)RM 开始执行这个分支事务,RM首先解析这条SQL语句,生成对应的UNDO_LOG记录。下面是一条UNDO_LOG中的记录:
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "GTS"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "TXC"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
可以看到,UNDO_LOG表中记录了分支ID,全局事务ID,以及事务执行的redo和undo数据以供二阶段恢复。
(4)RM在同一个本地事务中执行业务SQL和UNDO_LOG数据的插入。在提交这个本地事务前,RM会向TC申请关于这条记录的全局锁。如果申请不到,则说明有其他事务也在对这条记录进行操作,因此它会在一段时间内重试,重试失败则回滚本地事务,并向TC汇报本地事务执行失败。如下图所示:
(5)RM在事务提交前,申请到了相关记录的全局锁,因此直接提交本地事务,并向TC汇报本地事务执行成功。此时全局锁并没有释放,全局锁的释放取决于二阶段是提交命令还是回滚命令。
(6)TC根据所有的分支事务执行结果,向RM下发提交或回滚命令。
(7)RM如果收到TC的提交命令,首先立即释放相关记录的全局锁,然后把提交请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。异步队列中的提交请求真正执行时,只是删除相应 UNDO LOG 记录而已。
(8)RM如果收到TC的回滚命令,则会开启一个本地事务,通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。将 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理。否则,根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句并执行,然后提交本地事务达到回滚的目的,最后释放相关记录的全局锁。
Seata能够在第一阶段直接提交事务,是因为Seata框架为每一个RM维护了一张UNDO_LOG表(这张表需要客户端自行创建),其中保存了每一次本地事务的回滚数据。因此,二阶段的回滚并不依赖于本地数据库事务的回滚,而是RM直接读取这张UNDO_LOG表,并将数据库中的数据更新为UNDO_LOG中存储的历史数据。这也是在使用seata作为分布式事务解决方案的时候,需要在参与分布式事务的每一个服务中加入UNDO_LOG表。
如果第二阶段是提交命令,那么RM事实上并不会对数据进行提交(因为一阶段已经提交了),而实发起一个异步请求删除UNDO_LOG中关于本事务的记录。
由于Seata一阶段直接提交了本地事务,因此会造成隔离性问题,因此Seata的默认隔离级别为Read Uncommitted。然而Seata也支持Read Committed的隔离级别,我们会在下文中介绍如何实现。
(1)XA模式(正在开发中):强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
(2)TCC模式:最终一致的分阶段事务模式,有业务侵入
(3)AT模式(默认重点):最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
(4)SAGA模式:长事务模式,有业务侵入
目前使用的流行度情况是:AT > TCC > Saga。因此,我们在学习Seata的时候,可以花更多精力在AT模式上,最好搞懂背后的实现原理,毕竟分布式事务涉及到数据的正确性,出问题需要快速排查定位并解决。
(1)首先,引入seata相关依赖
<dependency>
<groupId>com.alibaba.cloudgroupId>
<artifactId>spring-cloud-starter-alibaba-seataartifactId>
<exclusions>
<exclusion>
<artifactId>seata-spring-boot-starterartifactId>
<groupId>io.seatagroupId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>io.seatagroupId>
<artifactId>seata-spring-boot-starterartifactId>
<version>${seata.version}version>
dependency>
(2)然后,配置application.yml,让微服务通过注册中心找到seata-tc-server
可以从https://github.com/seata/seata/releases下载seata-server-$version.zip包。
Windows下载解压后(.zip),直接点击bin/seata-server.bat就可以了。(我使用的是1.4.0版本)
如果觉得官网下载慢,可以使用我分享的网盘地址: https://pan.baidu.com/s/1E9J52g6uW_VFWY34fHL6zA 提取码: vneh
# 订单数据库信息 seata_order
DROP DATABASE IF EXISTS seata_order;
CREATE DATABASE seata_order;
DROP TABLE IF EXISTS seata_order.p_order;
CREATE TABLE seata_order.p_order
(
id INT(11) NOT NULL AUTO_INCREMENT,
user_id INT(11) DEFAULT NULL,
product_id INT(11) DEFAULT NULL,
amount INT(11) DEFAULT NULL,
total_price DOUBLE DEFAULT NULL,
status VARCHAR(100) DEFAULT NULL,
add_time DATETIME DEFAULT CURRENT_TIMESTAMP,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_order.undo_log;
CREATE TABLE seata_order.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
# 产品数据库信息 seata_product
DROP DATABASE IF EXISTS seata_product;
CREATE DATABASE seata_product;
DROP TABLE IF EXISTS seata_product.product;
CREATE TABLE seata_product.product
(
id INT(11) NOT NULL AUTO_INCREMENT,
price DOUBLE DEFAULT NULL,
stock INT(11) DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_product.undo_log;
CREATE TABLE seata_product.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
INSERT INTO seata_product.product (id, price, stock)
VALUES (1, 10, 20);
# 账户数据库信息 seata_account
DROP DATABASE IF EXISTS seata_account;
CREATE DATABASE seata_account;
DROP TABLE IF EXISTS seata_account.account;
CREATE TABLE seata_account.account
(
id INT(11) NOT NULL AUTO_INCREMENT,
balance DOUBLE DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_account.undo_log;
CREATE TABLE seata_account.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
INSERT INTO seata_account.account (id, balance)
VALUES (1, 50);
其中,每个库中的undo_log表,是Seata AT模式必须创建的表,主要用于分支事务的回滚。
另外,考虑到测试方便,我们插入了一条id = 1的account记录,和一条id = 1的product记录。
<dependency>
<groupId>com.ruoyigroupId>
<artifactId>ruoyi-common-datasourceartifactId>
dependency>
# spring配置
spring:
redis:
host: localhost
port: 6379
password:
datasource:
druid:
stat-view-servlet:
enabled: true
loginUsername: admin
loginPassword: 123456
dynamic:
druid:
initial-size: 5
min-idle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,slf4j
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
datasource:
# 主库数据源
master:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/ry-cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: password
# seata_order数据源
order:
username: root
password: password
url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
# seata_account数据源
account:
username: root
password: password
url: jdbc:mysql://localhost:3306/seata_account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
# seata_product数据源
product:
username: root
password: password
url: jdbc:mysql://localhost:3306/seata_product?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
seata: true #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭
# seata配置
seata:
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${spring.application.name}
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${spring.application.name}-group
# 关闭自动代理
enable-auto-data-source-proxy: false
# 服务配置项
service:
# 虚拟组和分组的映射
vgroup-mapping:
ruoyi-system-group: default
# 分组和 Seata 服务的映射
grouplist:
default: 127.0.0.1:8091
config:
type: file
registry:
type: file
# mybatis配置
mybatis:
# 搜索指定包别名
typeAliasesPackage: com.ruoyi.system
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/**/*.xml
# swagger配置
swagger:
title: 系统模块接口文档
license: Powered By ruoyi
licenseUrl: https://ruoyi.vip
注意,一定要设置spring.datasource.dynamic.seata配置项为true,开启对Seata的集成,否则会导致Seata全局事务回滚失败。
(1)Account.java
package com.ruoyi.system.domain;
import java.util.Date;
public class Account
{
private Long id;
/**
* 余额
*/
private Double balance;
private Date lastUpdateTime;
public Long getId()
{
return id;
}
public void setId(Long id)
{
this.id = id;
}
public Double getBalance()
{
return balance;
}
public void setBalance(Double balance)
{
this.balance = balance;
}
public Date getLastUpdateTime()
{
return lastUpdateTime;
}
public void setLastUpdateTime(Date lastUpdateTime)
{
this.lastUpdateTime = lastUpdateTime;
}
}
(2)Order.java
package com.ruoyi.system.domain;
public class Order
{
private Integer id;
/**
* 用户ID
*/
private Long userId;
/**
* 商品ID
*/
private Long productId;
/**
* 订单状态
*/
private int status;
/**
* 数量
*/
private Integer amount;
/**
* 总金额
*/
private Double totalPrice;
public Order()
{
}
public Order(Long userId, Long productId, int status, Integer amount)
{
this.userId = userId;
this.productId = productId;
this.status = status;
this.amount = amount;
}
public Integer getId()
{
return id;
}
public void setId(Integer id)
{
this.id = id;
}
public Long getUserId()
{
return userId;
}
public void setUserId(Long userId)
{
this.userId = userId;
}
public Long getProductId()
{
return productId;
}
public void setProductId(Long productId)
{
this.productId = productId;
}
public int getStatus()
{
return status;
}
public void setStatus(int status)
{
this.status = status;
}
public Integer getAmount()
{
return amount;
}
public void setAmount(Integer amount)
{
this.amount = amount;
}
public Double getTotalPrice()
{
return totalPrice;
}
public void setTotalPrice(Double totalPrice)
{
this.totalPrice = totalPrice;
}
}
(3)Product.java
package com.ruoyi.system.domain;
import java.util.Date;
public class Product
{
private Integer id;
/**
* 价格
*/
private Double price;
/**
* 库存
*/
private Integer stock;
private Date lastUpdateTime;
public Integer getId()
{
return id;
}
public void setId(Integer id)
{
this.id = id;
}
public Double getPrice()
{
return price;
}
public void setPrice(Double price)
{
this.price = price;
}
public Integer getStock()
{
return stock;
}
public void setStock(Integer stock)
{
this.stock = stock;
}
public Date getLastUpdateTime()
{
return lastUpdateTime;
}
public void setLastUpdateTime(Date lastUpdateTime)
{
this.lastUpdateTime = lastUpdateTime;
}
}
(1)PlaceOrderRequest.java
package com.ruoyi.system.domain.dto;
public class PlaceOrderRequest
{
private Long userId;
private Long productId;
private Integer amount;
public PlaceOrderRequest()
{
}
public PlaceOrderRequest(Long userId, Long productId, Integer amount)
{
this.userId = userId;
this.productId = productId;
this.amount = amount;
}
public Long getUserId()
{
return userId;
}
public void setUserId(Long userId)
{
this.userId = userId;
}
public Long getProductId()
{
return productId;
}
public void setProductId(Long productId)
{
this.productId = productId;
}
public Integer getAmount()
{
return amount;
}
public void setAmount(Integer amount)
{
this.amount = amount;
}
}
(2)ReduceBalanceRequest.java
package com.ruoyi.system.domain.dto;
public class ReduceBalanceRequest
{
private Long userId;
private Integer price;
public Long getUserId()
{
return userId;
}
public void setUserId(Long userId)
{
this.userId = userId;
}
public Integer getPrice()
{
return price;
}
public void setPrice(Integer price)
{
this.price = price;
}
}
(3)ReduceStockRequest.java
package com.ruoyi.system.domain.dto;
public class ReduceStockRequest
{
private Long productId;
private Integer amount;
public Long getProductId()
{
return productId;
}
public void setProductId(Long productId)
{
this.productId = productId;
}
public Integer getAmount()
{
return amount;
}
public void setAmount(Integer amount)
{
this.amount = amount;
}
}
(1)AccountMapper.java
package com.ruoyi.system.mapper;
import com.ruoyi.system.domain.Account;
public interface AccountMapper
{
public Account selectById(Long userId);
public void updateById(Account account);
}
(2)OrderMapper.java
package com.ruoyi.system.mapper;
import com.ruoyi.system.domain.Order;
public interface OrderMapper
{
public void insert(Order order);
public void updateById(Order order);
}
(3)ProductMapper.java
package com.ruoyi.system.mapper;
import com.ruoyi.system.domain.Product;
public interface ProductMapper
{
public Product selectById(Long productId);
public void updateById(Product product);
}
(1)AccountService.java
package com.ruoyi.system.service;
public interface AccountService
{
/**
* 账户扣减
* @param userId 用户 ID
* @param price 扣减金额
*/
void reduceBalance(Long userId, Double price);
}
(2)OrderService.java
package com.ruoyi.system.service;
import com.ruoyi.system.domain.dto.PlaceOrderRequest;
public interface OrderService
{
/**
* 下单
*
* @param placeOrderRequest 订单请求参数
*/
void placeOrder(PlaceOrderRequest placeOrderRequest);
}
(3)ProductService.java
package com.ruoyi.system.service;
public interface ProductService
{
/**
* 扣减库存
*
* @param productId 商品 ID
* @param amount 扣减数量
* @return 商品总价
*/
Double reduceStock(Long productId, Integer amount);
}
(1)AccountService.java
package com.ruoyi.system.service.impl;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.ruoyi.system.domain.Account;
import com.ruoyi.system.mapper.AccountMapper;
import com.ruoyi.system.service.AccountService;
import io.seata.core.context.RootContext;
@Service
public class AccountServiceImpl implements AccountService
{
private static final Logger log = LoggerFactory.getLogger(AccountServiceImpl.class);
@Resource
private AccountMapper accountMapper;
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("account")
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void reduceBalance(Long userId, Double price)
{
log.info("=============ACCOUNT START=================");
log.info("当前 XID: {}", RootContext.getXID());
Account account = accountMapper.selectById(userId);
Double balance = account.getBalance();
log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);
if (balance < price)
{
log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new RuntimeException("余额不足");
}
log.info("开始扣减用户 {} 余额", userId);
double currentBalance = account.getBalance() - price;
account.setBalance(currentBalance);
accountMapper.updateById(account);
log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
log.info("=============ACCOUNT END=================");
}
}
(2)OrderService.java
package com.ruoyi.system.service.impl;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.ruoyi.system.domain.Order;
import com.ruoyi.system.domain.dto.PlaceOrderRequest;
import com.ruoyi.system.mapper.OrderMapper;
import com.ruoyi.system.service.AccountService;
import com.ruoyi.system.service.OrderService;
import com.ruoyi.system.service.ProductService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
@Service
public class OrderServiceImpl implements OrderService
{
private static final Logger log = LoggerFactory.getLogger(OrderServiceImpl.class);
@Resource
private OrderMapper orderMapper;
@Autowired
private AccountService accountService;
@Autowired
private ProductService productService;
@DS("order") // 每一层都需要使用多数据源注解切换所选择的数据库
@Override
@Transactional
@GlobalTransactional // 重点 第一个开启事务的需要添加seata全局事务注解
public void placeOrder(PlaceOrderRequest request)
{
log.info("=============ORDER START=================");
Long userId = request.getUserId();
Long productId = request.getProductId();
Integer amount = request.getAmount();
log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);
log.info("当前 XID: {}", RootContext.getXID());
Order order = new Order(userId, productId, 0, amount);
orderMapper.insert(order);
log.info("订单一阶段生成,等待扣库存付款中");
// 扣减库存并计算总价
Double totalPrice = productService.reduceStock(productId, amount);
// 扣减余额
accountService.reduceBalance(userId, totalPrice);
order.setStatus(1);
order.setTotalPrice(totalPrice);
orderMapper.updateById(order);
log.info("订单已成功下单");
log.info("=============ORDER END=================");
}
}
(3)ProductService.java
package com.ruoyi.system.service.impl;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.ruoyi.system.domain.Product;
import com.ruoyi.system.mapper.ProductMapper;
import com.ruoyi.system.service.ProductService;
import io.seata.core.context.RootContext;
@Service
public class ProductServiceImpl implements ProductService
{
private static final Logger log = LoggerFactory.getLogger(ProductServiceImpl.class);
@Resource
private ProductMapper productMapper;
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("product")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Double reduceStock(Long productId, Integer amount)
{
log.info("=============PRODUCT START=================");
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
Product product = productMapper.selectById(productId);
Integer stock = product.getStock();
log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);
if (stock < amount)
{
log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
throw new RuntimeException("库存不足");
}
log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
// 扣减库存
int currentStock = stock - amount;
product.setStock(currentStock);
productMapper.updateById(product);
double totalPrice = product.getPrice() * amount;
log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
log.info("=============PRODUCT END=================");
return totalPrice;
}
}
(1)OrderController.java
package com.ruoyi.system.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.system.domain.dto.PlaceOrderRequest;
import com.ruoyi.system.service.OrderService;
import io.swagger.annotations.ApiOperation;
@RestController
@RequestMapping("/order")
public class OrderController
{
@Autowired
private OrderService orderService;
@PostMapping("/placeOrder")
public String placeOrder(@Validated @RequestBody PlaceOrderRequest request)
{
orderService.placeOrder(request);
return "下单成功";
}
@PostMapping("/test1")
@ApiOperation("测试商品库存不足-异常回滚")
public String test1()
{
// 商品单价10元,库存20个,用户余额50元,模拟一次性购买22个。 期望异常回滚
orderService.placeOrder(new PlaceOrderRequest(1L, 1L, 22));
return "下单成功";
}
@PostMapping("/test2")
@ApiOperation("测试用户账户余额不足-异常回滚")
public String test2()
{
// 商品单价10元,库存20个,用户余额50元,模拟一次性购买6个。 期望异常回滚
orderService.placeOrder(new PlaceOrderRequest(1L, 1L, 6));
return "下单成功";
}
}
(1)AccountMapper.xml
DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.system.mapper.AccountMapper">
<resultMap type="Account" id="AccountResult">
<id property="id" column="id" />
<result property="balance" column="balance" />
<result property="lastUpdateTime" column="last_update_time" />
resultMap>
<select id="selectById" parameterType="Account" resultMap="AccountResult">
select id, balance, last_update_time
from account where id = #{userId}
select>
<update id="updateById" parameterType="Account">
update account set balance = #{balance}, last_update_time = sysdate() where id = #{id}
update>
mapper>
(2)OrderMapper.xml
DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.system.mapper.OrderMapper">
<resultMap type="Order" id="OrderResult">
<id property="id" column="id" />
<result property="userId" column="user_id" />
<result property="productId" column="product_id" />
<result property="amount" column="amount" />
<result property="totalPrice" column="total_price" />
<result property="status" column="status" />
<result property="addTime" column="add_time" />
<result property="lastUpdateTime" column="last_update_time" />
resultMap>
<insert id="insert" parameterType="Order" useGeneratedKeys="true" keyProperty="id">
insert into p_order (
<if test="userId != null and userId != '' ">user_id,if>
<if test="productId != null and productId != '' ">product_id,if>
<if test="amount != null and amount != '' ">amount,if>
<if test="totalPrice != null and totalPrice != '' ">total_price,if>
<if test="status != null and status != ''">status,if>
add_time
)values(
<if test="userId != null and userId != ''">#{userId},if>
<if test="productId != null and productId != ''">#{productId},if>
<if test="amount != null and amount != ''">#{amount},if>
<if test="totalPrice != null and totalPrice != ''">#{totalPrice},if>
<if test="status != null and status != ''">#{status},if>
sysdate()
)
insert>
<update id="updateById" parameterType="Order">
update p_order
<set>
<if test="userId != null and userId != ''">user_id = #{userId},if>
<if test="productId != null and productId != ''">product_id = #{productId},if>
<if test="amount != null and amount != ''">amount = #{amount},if>
<if test="totalPrice != null and totalPrice != ''">total_price = #{totalPrice},if>
<if test="status != null and status != ''">status = #{status},if>
last_update_time = sysdate()
set>
where id = #{id}
update>
mapper>
(3)ProductMapper.xml
DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.system.mapper.ProductMapper">
<resultMap type="Product" id="ProductResult">
<id property="id" column="id" />
<result property="price" column="price" />
<result property="stock" column="stock" />
<result property="lastUpdateTime" column="last_update_time" />
resultMap>
<select id="selectById" parameterType="Product" resultMap="ProductResult">
select id, price, stock, last_update_time
from product where id = #{productId}
select>
<update id="updateById" parameterType="Product">
update product set price = #{price}, stock = #{stock}, last_update_time = sysdate() where id = #{id}
update>
mapper>
使用Postman工具测试接口,注意观察运行日志,至此分布式事务集成案例全流程完毕。
模拟正常下单,买一个商品 http://localhost:9201/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 1
}
模拟库存不足,事务回滚 http://localhost:9201/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 22
}
模拟用户余额不足,事务回滚 http://localhost:9201/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 6
}
(1)解压seata-server-$version.zip后修改conf/registry.conf文件:
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}
}
由于使用nacos作为注册中心,所以conf目录下的file.conf无需理会。然后就可以直接启动bin/seata-server.bat,可以在nacos里看到一个名为seata-server的服务了。
(2)由于seata使用mysql作为db高可用数据库,故需要在mysql创建一个ry-seata库,并导入数据库脚本。
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
(3)config.txt文件复制到seata目录
config.txt
service.vgroupMapping.ruoyi-system-group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/ry-seata?useUnicode=true
store.db.user=root
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
(4)nacos-config.sh复制到seata的conf目录
nacos-config.sh
#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
while getopts ":h:p:g:t:u:w:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
u)
username=$OPTARG
;;
w)
password=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
exit 1
;;
esac
done
urlencode() {
for ((i=0; i < ${#1}; i++))
do
char="${1:$i:1}"
case $char in
[a-zA-Z0-9.~_-]) printf $char ;;
*) printf '%%%02X' "'$char" ;;
esac
done
}
if [[ -z ${host} ]]; then
host=localhost
fi
if [[ -z ${port} ]]; then
port=8848
fi
if [[ -z ${group} ]]; then
group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
tenant=""
fi
if [[ -z ${username} ]]; then
username=""
fi
if [[ -z ${password} ]]; then
password=""
fi
nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"
echo "set nacosAddr=$nacosAddr"
echo "set group=$group"
failCount=0
tempLog=$(mktemp -u)
function addConfig() {
curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
if [[ -z $(cat "${tempLog}") ]]; then
echo " Please check the cluster status. "
exit 1
fi
if [[ $(cat "${tempLog}") =~ "true" ]]; then
echo "Set $1=$2 successfully "
else
echo "Set $1=$2 failure "
(( failCount++ ))
fi
}
count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
(( count++ ))
key=${line%%=*}
value=${line#*=}
addConfig "${key}" "${value}"
done
echo "========================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo "========================================================================="
if [[ ${failCount} -eq 0 ]]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi
(5)执行命令,后面填写nacos的IP地址,我的是本机所以是127.0.0.1
sh nacos-config.sh 127.0.0.1
成功后nacos配置列表也能查询到相关配置
(6)修改服务配置文件
# spring配置
spring:
datasource:
dynamic:
# 开启seata代理
seata: true
# seata配置
seata:
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${spring.application.name}
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${spring.application.name}-group
# 关闭自动代理
enable-auto-data-source-proxy: false
# 服务配置项
service:
# 虚拟组和分组的映射
vgroup-mapping:
ruoyi-system-group: default
config:
type: nacos
nacos:
serverAddr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
(7)测试验证
测试使用ruoyi-file添加Feign调用测试文件入库,验证分布式数据库调用执行结果,也适用于新的应用。
# 文件数据库信息 seata_file
DROP DATABASE IF EXISTS seata_file;
CREATE DATABASE seata_file;
DROP TABLE IF EXISTS seata_file.sys_file_info;
CREATE TABLE seata_file.sys_file_info
(
file_id BIGINT(11) NOT NULL AUTO_INCREMENT COMMENT '文件编号',
file_name VARCHAR(50) DEFAULT '' COMMENT '文件名称',
file_path VARCHAR(255) DEFAULT '' COMMENT '文件路径',
PRIMARY KEY (file_id)
) ENGINE = INNODB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_file.undo_log;
CREATE TABLE seata_file.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = INNODB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
`seata_file`
ruoyi-modules-file应用添加示例代码
SysFileController.java
package com.ruoyi.file.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.file.FileUtils;
import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.file.service.ISysFileInfoService;
import com.ruoyi.file.service.ISysFileService;
import com.ruoyi.system.api.domain.SysFile;
import com.ruoyi.system.api.domain.SysFileInfo;
/**
* 文件请求处理
*
* @author ruoyi
*/
@RestController
public class SysFileController
{
private static final Logger log = LoggerFactory.getLogger(SysFileController.class);
@Autowired
private ISysFileService sysFileService;
@Autowired
private ISysFileInfoService sysFileInfoService;
/**
* 文件上传请求
*/
@PostMapping("upload")
public R<SysFile> upload(MultipartFile file)
{
try
{
// 上传并返回访问地址
String url = sysFileService.uploadFile(file);
SysFile sysFile = new SysFile();
sysFile.setName(FileUtils.getName(url));
sysFile.setUrl(url);
return R.ok(sysFile);
}
catch (Exception e)
{
log.error("上传文件失败", e);
return R.fail(e.getMessage());
}
}
@PostMapping("/insertFile")
public AjaxResult insertFile(@RequestBody SysFileInfo sysFileInfo)
{
sysFileInfoService.insertFile(sysFileInfo);
return AjaxResult.success();
}
}
ISysFileInfoService.java
package com.ruoyi.file.service;
import com.ruoyi.system.api.domain.SysFileInfo;
public interface ISysFileInfoService
{
void insertFile(SysFileInfo fileInfo);
}
SysFileInfoServiceImpl.java
package com.ruoyi.file.service;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.ruoyi.file.mapper.SysFileInfoMapper;
import com.ruoyi.system.api.domain.SysFileInfo;
import io.seata.core.context.RootContext;
@Service
public class SysFileInfoServiceImpl implements ISysFileInfoService
{
private static final Logger log = LoggerFactory.getLogger(SysFileInfoServiceImpl.class);
@Resource
private SysFileInfoMapper sysFileInfoMapper;
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("file")
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void insertFile(SysFileInfo fileInfo)
{
log.info("=============FILE START=================");
log.info("当前 XID: {}", RootContext.getXID());
sysFileInfoMapper.insert(fileInfo);
log.info("=============FILE END=================");
}
}
SysFileInfoMapper.java
package com.ruoyi.file.mapper;
import com.ruoyi.system.api.domain.SysFileInfo;
public interface SysFileInfoMapper
{
public void insert(SysFileInfo fileInfo);
}
SysFileInfoMapper.xml
DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.file.mapper.SysFileInfoMapper">
<resultMap type="SysFileInfo" id="SysFileInfoResult">
<id property="fileId" column="file_id" />
<result property="fileName" column="file_name" />
<result property="filePath" column="file_path" />
resultMap>
<insert id="insert" parameterType="SysFileInfo">
insert into sys_file_info (file_name, file_path) values (#{fileName}, #{filePath})
insert>
mapper>
pom.xml
<dependency>
<groupId>com.mysqlgroupId>
<artifactId>mysql-connector-jartifactId>
dependency>
<dependency>
<groupId>com.ruoyigroupId>
<artifactId>ruoyi-common-datasourceartifactId>
dependency>
RuoYFileApplication.java
// 添加扫描mapper包路径
@MapperScan("com.ruoyi.**.mapper")
# 本地文件上传
file:
domain: http://127.0.0.1:9300
path: D:/ruoyi/uploadPath
prefix: /statics
# FastDFS配置
fdfs:
domain: http://8.129.231.12
soTimeout: 3000
connectTimeout: 2000
trackerList: 8.129.231.12:22122
# Minio配置
minio:
url: http://8.129.231.12:9000
accessKey: minioadmin
secretKey: minioadmin
bucketName: test
# spring配置
spring:
datasource:
druid:
stat-view-servlet:
enabled: true
loginUsername: admin
loginPassword: 123456
dynamic:
druid:
initial-size: 5
min-idle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,slf4j
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
datasource:
# 主库数据源
master:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/ry-cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: password
# seata_file数据源
file:
username: root
password: password
url: jdbc:mysql://localhost:3306/seata_file?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
seata: true
# seata配置
seata:
# 默认关闭,如需启用spring.datasource.dynami.seata需要同时开启
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${spring.application.name}
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${spring.application.name}-group
# 关闭自动代理
enable-auto-data-source-proxy: false
# 服务配置项
service:
# 虚拟组和分组的映射
vgroup-mapping:
ruoyi-file-group: default
config:
type: nacos
nacos:
serverAddr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
# mybatis配置
mybatis:
# 搜索指定包别名
typeAliasesPackage: com.ruoyi
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/**/*.xml
# swagger配置
swagger:
title: 文件模块接口文档
license: Powered By ruoyi
licenseUrl: https://ruoyi.vip
RemoteFileService.java
package com.ruoyi.system.api;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.multipart.MultipartFile;
import com.ruoyi.common.core.constant.ServiceNameConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.system.api.domain.SysFile;
import com.ruoyi.system.api.domain.SysFileInfo;
import com.ruoyi.system.api.factory.RemoteFileFallbackFactory;
/**
* 文件服务
*
* @author ruoyi
*/
@FeignClient(contextId = "remoteFileService", value = ServiceNameConstants.FILE_SERVICE, fallbackFactory = RemoteFileFallbackFactory.class)
public interface RemoteFileService
{
/**
* 上传文件
*
* @param file 文件信息
* @return 结果
*/
@PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public R<SysFile> upload(@RequestPart(value = "file") MultipartFile file);
/**
* 保存系统文件
*
* @param sysFileInfo 系统文件
* @return 结果
*/
@PostMapping("/insertFile")
R<Boolean> saveFile(@RequestBody SysFileInfo sysFileInfo);
}
RemoteFileFallbackFactory.java
package com.ruoyi.system.api.factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.system.api.RemoteFileService;
import com.ruoyi.system.api.domain.SysFile;
import com.ruoyi.system.api.domain.SysFileInfo;
import feign.hystrix.FallbackFactory;
/**
* 文件服务降级处理
*
* @author ruoyi
*/
@Component
public class RemoteFileFallbackFactory implements FallbackFactory<RemoteFileService>
{
private static final Logger log = LoggerFactory.getLogger(RemoteFileFallbackFactory.class);
@Override
public RemoteFileService create(Throwable throwable)
{
log.error("文件服务调用失败:{}", throwable.getMessage());
return new RemoteFileService()
{
@Override
public R<SysFile> upload(MultipartFile file)
{
return R.fail("上传文件失败:" + throwable.getMessage());
}
@Override
public R<Boolean> saveFile(SysFileInfo sysFileInfo)
{
return R.fail("文件入库失败:" + throwable.getMessage());
}
};
}
}
SysFileInfo.java
package com.ruoyi.system.api.domain;
public class SysFileInfo
{
/**
* 文件编号
*/
private Long fileId;
/**
* 文件名称
*/
private String fileName;
/**
* 文件路径
*/
private String filePath;
public Long getFileId()
{
return fileId;
}
public void setFileId(Long fileId)
{
this.fileId = fileId;
}
public String getFileName()
{
return fileName;
}
public void setFileName(String fileName)
{
this.fileName = fileName;
}
public String getFilePath()
{
return filePath;
}
public void setFilePath(String filePath)
{
this.filePath = filePath;
}
}
OrderServiceImpl.java
package com.ruoyi.system.service.impl;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.ruoyi.system.api.RemoteFileService;
import com.ruoyi.system.api.domain.SysFileInfo;
import com.ruoyi.system.domain.Order;
import com.ruoyi.system.domain.dto.PlaceOrderRequest;
import com.ruoyi.system.mapper.OrderMapper;
import com.ruoyi.system.service.AccountService;
import com.ruoyi.system.service.OrderService;
import com.ruoyi.system.service.ProductService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
@Service
public class OrderServiceImpl implements OrderService
{
private static final Logger log = LoggerFactory.getLogger(OrderServiceImpl.class);
@Resource
private OrderMapper orderMapper;
@Autowired
private AccountService accountService;
@Autowired
private ProductService productService;
@Autowired
private RemoteFileService remoteFileService;
@DS("order") // 每一层都需要使用多数据源注解切换所选择的数据库
@Override
@Transactional
@GlobalTransactional // 重点 第一个开启事务的需要添加seata全局事务注解
public void placeOrder(PlaceOrderRequest request)
{
log.info("=============ORDER START=================");
Long userId = request.getUserId();
Long productId = request.getProductId();
Integer amount = request.getAmount();
log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);
log.info("当前 XID: {}", RootContext.getXID());
Order order = new Order(userId, productId, 0, amount);
orderMapper.insert(order);
log.info("订单一阶段生成,等待扣库存付款中");
// 测试fegin调用
SysFileInfo sysFileInfo = new SysFileInfo();
sysFileInfo.setFileName("name" + order.getId());
sysFileInfo.setFilePath("/home/ruoyi/name" + order.getId() + ".png");
remoteFileService.saveFile(sysFileInfo);
// 扣减库存并计算总价
Double totalPrice = productService.reduceStock(productId, amount);
// 扣减余额
accountService.reduceBalance(userId, totalPrice);
order.setStatus(1);
order.setTotalPrice(totalPrice);
orderMapper.updateById(order);
log.info("订单已成功下单");
log.info("=============ORDER END=================");
}
}
config.txt
service.vgroupMapping.ruoyi-file-group=default
执行nacos-config.sh添加到nacos配置中心。
XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范 描述了全局的TM与局部的RM之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持。
seata的XA模式做了一些调整,但大体相似:
(1)RM一阶段的工作:
1-注册分支事务到TC
2-执行分支业务sql但不提交
3-报告执行状态到TC
(2)TC二阶段的工作:
TC检测各分支事务执行状态
1-如果都成功,通知所有RM提交事务
2-如果有失败,通知所有RM回滚事务
(3)RM二阶段的工作:
接收TC指令,提交或回滚事务
Seata的starter已经完成了XA模式的自动装配,实现非常简单,步骤如下:
(1)修改application.yml文件(每个参与事务的微服务),开启XA模式:
seata:
data-source-proxy-mode: XA # 开启数据源代理的XA模式
(2)给发起全局事务的入口方法添加@GlobalTransactional注解
本例中是OrderServiceImpl中的create方法:
@Override@GlobalTransactional
public Long create(Order order) {
// 创建订单
orderMapper.insert(order);
// 扣余额 ...略
// 扣减库存 ...略
return order.getId();
}
(3)重启服务并测试
AT模式同样是分阶段提交的事务模型,不过缺弥补了XA模型中资源锁定周期过长的缺陷。
(一)阶段一RM的工作:
(1)注册分支事务
(2)记录undo-log(数据快照)
(3)执行业务sql并提交
1.解析sql语义,找到“业务sql”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,
2.执行“业务sql”更新业务数据,在业务数据更新之后,
3.其保存成“after image”,最后生成行锁。
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
(4)报告事务状态
(二)阶段二提交时RM的工作:
二阶段如是顺利提交的话,
删除undo-log即可,因为“业务sql”在一阶段已经提交至数据库,所以seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。
(三)阶段二回滚时RM的工作:
二阶段如果是回滚的话,seata就需要回滚一阶段已经执行的“业务sql”,还原业务数据,根据undo-log恢复数据到更新前。
回滚方式便是使用“before image”还原业务数据;但在还原前要首先校验脏写,对比“数据库当前业务数据”和“after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。
流程如下:
(1)原子性:事务中的所有操作,要么全部成功,要么全部失败
(2)隔离性:读写发生在不同的线程中时,对同一资源操作的事务不能同时发生
(3)持久性:对数据库做的一切修改将永久保存,不管是否出现故障
(4)一致性:要保证数据库内部完整性约束,声明式约束
(1)每个库中的undo_log表
其中,每个库中的undo_log表,是Seata AT模式必须创建的表,主要用于分支事务的回滚。
TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:
(1)Try:资源的检测和预留;
(2)Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
(3)Cancel:预留资源释放,可以理解为try的反向操作。
TCC的工作模型图:
当某分支事务的try阶段阻塞时,可能导致全局事务超时而触发二阶段的cancel操作。在未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是空回滚。
对于已经空回滚的业务,如果以后继续执行try,就永远不可能confirm或cancel,这就是业务悬挂。应当阻止执行空回滚后的try操作,避免悬挂
(1)Saga模式是SEATA提供的长事务解决方案。也分为两个阶段:
1-一阶段:直接提交本地事务
2-二阶段:成功则什么都不做;失败则通过编写补偿业务来回滚
(2)Saga模式优点:
1-事务参与者可以基于事件驱动实现异步调用,吞吐高
2-一阶段直接提交事务,无锁,性能好
3-不用编写TCC中的三个阶段,实现简单
(3)缺点
1-软状态持续时间不确定,时效性差
2-没有锁,没有事务隔离,会有脏写