我这里下载的是windows0.9 GA版本,此版本适用于中小型企业,业务并发量不大服务可以自己搭建并且seata不支持集群模式,如果大型企业使用需要使用1.0.0的集群版本,也可以使用阿里云的现成服务GTS,本次搭建的模式为AT模式,下载地址参考seata github下载地址,seata官网请访问这里
下载后解压文件,删除conf目录下的file.conf文件,将file.conf.example文件复制一份为file.conf,修改文件的service和存储模块,自定义事务分组名称、事务日志存储模式为db+数据库连接信息
修改参数名称vgroup_mapping.my_test_tx_group="default"的值为自定义事务分组名称

注意自定义分组名称,否则可能会导致启动项目的时候无法连接

修改事务日志存储模式为db,另外配置MySQL数据库的账号和密码,需要初始化seata数据库名称
修改mode为db以及mysql的连接账号密码

初始化数据库,新建seata数据库,执行conf目录下的db_store.sql文件创建数据库,分别是分支、全局、锁的三张表,对应seata的TC (Transaction Coordinator) - 事务协调者、TM (Transaction Manager) - 事务管理器、RM (Resource Manager) - 资源管理器

初始化回滚日志表,前面的表是在seat对应存储数据库中,这边是在每个微服务下创建对应的表,执行conf目录下的db_undo_log.sql文件,创建对应的表,用于发生事务回滚时日志记录

修改seata所需要注册的注册中心以及地址,修改conf目录下的register.conf文件
修改为注册到nacos,type修改为nacos,另外配置连接地址

最后等待nacos启动完成,启动seata

本次分布式事务的演示架构图,以订单作为入口,驱动库存和余额的变更

仓储服务:对给定的商品扣除仓储数量。
订单服务:根据采购需求创建订单。
帐户服务:从用户帐户中扣除余额。
对应的数据库也是三个数据库

1.pom文件添加依赖
<!--nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>
需要注意的是seata依赖自带seata-all,由于版本不一致所以需要排除seata-all单独应用,本次使用的seata版本为0.9.0,所以seata-all也是对应版本,根据自身情况选择版本
2.application.yml中添加seata的相关配置,安装seata时自定义的事务分组名称
spring:
cloud:
alibaba:
seata:
#自定义事务组名称需要与seata-server中的对应
tx-service-group: fsp_tx_group
3.resource下新增file.conf文件,文件内容与seata的file.conf内容一致
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroup_mapping.my_test_tx_group = "fsp_tx_group"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
retry.policy.branch-rollback-on-conflict = true
}
report.retry.count = 5
table.meta.check.enable = false
report.success.enable = true
}
tm {
commit.retry.count = 5
rollback.retry.count = 5
}
undo {
data.validation = true
log.serialization = "jackson"
log.table = "undo_log"
}
log {
exceptionRate = 100
}
support {
# auto proxy the DataSource bean
spring.datasource.autoproxy = false
}
}
## transaction log store
store {
## store mode: file、db
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://192.168.17.22:3307/seata"
user = "root"
password = "yourPwd"
min-conn = 1
max-conn = 10
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
server {
recovery {
#schedule committing retry period in milliseconds
committing-retry-period = 1000
#schedule asyn committing retry period in milliseconds
asyn-committing-retry-period = 1000
#schedule rollbacking retry period in milliseconds
rollbacking-retry-period = 1000
#schedule timeout retry period in milliseconds
timeout-retry-period = 1000
}
undo {
log.save.days = 7
#schedule delete expired undo_log in milliseconds
log.delete.period = 86400000
}
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
## metrics settings
metrics {
enabled = false
registry-type = "compact"
# multi exporters use comma divided
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}
4.resource下新增register.conf文件,同样也与seata的配置相同
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
项目中的目录结构

5.修改MySQL的数据库源,自定义项目conf目录下新增DataSourceProxyConfig类创建相关的Bean对象
package com.atguigu.springcloud.alibaba.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
/**
* @auther Czw
* @create 2021-11-14 13:58
* 使用Seata对数据源进行代理
*/
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapperLocations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
6.项目启动类上排除自带的数据源
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建
7.在业务逻辑serviceImpl中添加全局事务注解 @GlobalTransactional(name = “fsp-create-order”,rollbackFor = Exception.class)
前面1-6的操作与Order Module步骤一致,注意修改数据库名称参数,第七步不需要在这两个模块中操作
为了模拟事务回滚,在Account的serviceImpl模块加入线程睡眠代码用于测试异常事务回滚
//模拟超时异常,全局事务回滚
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
//扣减金额业务.....