AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。
在介绍AT 模式的时候它是无侵入的分布式事务解决方案, 那么如何做到对业务的无侵入的呢?
一阶段:
在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
二阶段:
提交:二阶段如果是提交的话,因为“业务 SQL”在一阶段已经提交支数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。
回滚:二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全⼀致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。
AT 模式的一阶段、二阶段提交和回滚均由 Seata 框架自动生成,用户只需编写“业务SQL”,便能轻松接入分布式事务,AT 模式是一种对业务无任何侵入的分布式事务解决方案。
介绍了 seata 事务的三个模块:TC(事务协调器)、TM(事务管理器)和RM(资源管理器),其中 TM 和 RM 是嵌入在业务应用中的,而 TC 则是一个独立服务。
Seata Server 就是 TC,直接从官方仓库下载启动即可,
下载地址:https://github.com/seata/seata/releases
registry.conf:
Seata Server 要向注册中心进行注册,这样,其他服务就可以通过注册中心去发现 Seata Server,与 Seata Server 进行通信。
Seata 支持多款注册中心服务:nacos 、eureka、redis、zk、consul、etcd3、sofa。我们项目中要使用 nacos注册中心,nacos服务的连接地址、注册的服务名,这需要在seata/conf/registry.conf文件中进行配置:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"nacos {
application = "seata-tc-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}
consul {
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
apollo {
appId = "seata-server"
## apolloConfigService will cover apolloMeta
apolloMeta = "http://192.168.1.204:8801"
apolloConfigService = "http://192.168.1.204:8080"
namespace = "application"
apolloAccesskeySecret = ""
cluster = "seata"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
nodePath = "/seata/seata.properties"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
向nacos中添加配置信息:
访问 https://seata.io/zh-cn/docs/user/configurations.html,其里面有针对每个一项配置介绍
Server端存储的模式(store.mode)现有file,db,redis三种。主要存储全局事务会话信息,分支事务信息, 锁记录表信息,seata-server默认是file模式。file只能支持单机模式, 如果想要高可用模式的话可以切换db或者redis. 为了方便查看全局事务会话信息本次课程采用db数据库模式
存储模式:
store.mode=db
mysql数据库连接信息
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
注意1: 需要创建seata数据库
注意2: 需要创建global_table/branch_table/lock_table三张表,seata1.0以上就不自带数据库文件了,要自己去github下载,https://github.com/seata/seata/tree/develop/script/server/db
- -- -------------------------------- The script used when storeMode is 'db' --------------------------------
- -- the table to store GlobalSession data
- CREATE TABLE
- IF
- NOT EXISTS `global_table` (
- `xid` VARCHAR ( 128 ) NOT NULL,
- `transaction_id` BIGINT,
- `status` TINYINT NOT NULL,
- `application_id` VARCHAR ( 32 ),
- `transaction_service_group` VARCHAR ( 32 ),
- `transaction_name` VARCHAR ( 128 ),
- `timeout` INT,
- `begin_time` BIGINT,
- `application_data` VARCHAR ( 2000 ),
- `gmt_create` DATETIME,
- `gmt_modified` DATETIME,
- PRIMARY KEY ( `xid` ),
- KEY `idx_status_gmt_modified` ( `status`, `gmt_modified` ),
- KEY `idx_transaction_id` ( `transaction_id` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;-- the table to store BranchSession data
- CREATE TABLE
- IF
- NOT EXISTS `branch_table` (
- `branch_id` BIGINT NOT NULL,
- `xid` VARCHAR ( 128 ) NOT NULL,
- `transaction_id` BIGINT,
- `resource_group_id` VARCHAR ( 32 ),
- `resource_id` VARCHAR ( 256 ),
- `branch_type` VARCHAR ( 8 ),
- `status` TINYINT,
- `client_id` VARCHAR ( 64 ),
- `application_data` VARCHAR ( 2000 ),
- `gmt_create` DATETIME ( 6 ),
- `gmt_modified` DATETIME ( 6 ),
- PRIMARY KEY ( `branch_id` ),
- KEY `idx_xid` ( `xid` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;-- the table to store lock data
- CREATE TABLE
- IF
- NOT EXISTS `lock_table` (
- `row_key` VARCHAR ( 128 ) NOT NULL,
- `xid` VARCHAR ( 128 ),
- `transaction_id` BIGINT,
- `branch_id` BIGINT NOT NULL,
- `resource_id` VARCHAR ( 256 ),
- `table_name` VARCHAR ( 32 ),
- `pk` VARCHAR ( 36 ),
- `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
- `gmt_create` DATETIME,
- `gmt_modified` DATETIME,
- PRIMARY KEY ( `row_key` ),
- KEY `idx_status` ( `status` ),
- KEY `idx_branch_id` ( `branch_id` ),
- KEY `idx_xid` ( `xid` )
- ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;
-
- CREATE TABLE IF NOT EXISTS `distributed_lock`
- (
- `lock_key` CHAR(20) NOT NULL,
- `lock_value` VARCHAR(20) NOT NULL,
- `expire` BIGINT,
- primary key (`lock_key`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4;
-
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
下载地址:https://github.com/seata/seata/tree/develop/script/config-center/nacos
将nacos-config.sh放在seata/conf文件夹中
打开git bash here 执行nacos-config.sh,需要提前将nacos启动
输入命令 :
sh nacos-config.sh -h 127.0.0.1
登录nacos查看配置信息
启动seata-server
观察nacos服务列表
AT 模式在RM端需要 UNDO_LOG 表,来记录每个RM的事务信息,主要包含数据修改前,后的相关信息,用于回滚处理,所以在所有数据库中分别执行
- -- 注意此处0.3.0+ 增加唯⼀索引 ux_undo_log
- CREATE TABLE `undo_log` (
- `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
- `branch_id` BIGINT ( 20 ) NOT NULL,
- `xid` VARCHAR ( 100 ) NOT NULL,
- `context` VARCHAR ( 128 ) NOT NULL,
- `rollback_info` LONGBLOB NOT NULL,
- `log_status` INT ( 11 ) NOT NULL,
- `log_created` datetime NOT NULL,
- `log_modified` datetime NOT NULL,
- `ext` VARCHAR ( 100 ) DEFAULT NULL,
- PRIMARY KEY ( `id` ),
- UNIQUE KEY `ux_undo_log` ( `xid`, `branch_id` )
- ) ENGINE = INNODB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
TM/RM端整合seata一共有五个步骤:
事务分组:
RM(事务管理器)端整合Seata与TM(事务管理器)端步骤类似,只不过不需要在方法添加@GlobalTransactional注解,针对我们工程lagou_bussiness是事务的发起者,所以是TM端,其它工程为RM端. 所以我们只需要在lagou_common_db完成前4步骤即可
工程中添加Seata依赖
- <dependencyManagement>
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-dependenciesartifactId>
- <version>Greenwich.RELEASEversion>
- <type>pomtype>
- <scope>importscope>
- dependency>
-
-
-
- <dependency>
- <groupId>com.alibaba.cloudgroupId>
- <artifactId>spring-cloud-alibaba-dependenciesartifactId>
- <version>2.1.0.RELEASEversion>
- <type>pomtype>
- <scope>importscope>
- dependency>
-
- <dependency>
- <groupId>mysqlgroupId>
- <artifactId>mysql-connector-javaartifactId>
- <version>5.1.47version>
- dependency>
-
-
-
- <dependency>
- <groupId>io.seatagroupId>
- <artifactId>seata-allartifactId>
- <version>1.3.0version>
- dependency>
- dependencies>
- dependencyManagement>
- <dependency>
- <groupId>com.alibaba.cloudgroupId>
- <artifactId>spring-cloud-alibaba-seataartifactId>
-
- <exclusions>
- <exclusion>
- <groupId>io.seatagroupId>
- <artifactId>seata-allartifactId>
- exclusion>
- exclusions>
- dependency>
-
- <dependency>
- <groupId>io.seatagroupId>
- <artifactId>seata-allartifactId>
- dependency>
在common工程添加registry.conf依赖
添加公共配置
spring.cloud.alibaba.seata.tx-service-group=default_tx_group
logging.level.io.seata=debug
在每个模块下引入公共配置文件
profiles
active: seata
编译数据源代理(公共模块)
- package com.lagou.common_db;
-
- import com.alibaba.druid.pool.DruidDataSource;
- import io.seata.rm.datasource.DataSourceProxy;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
-
- import javax.sql.DataSource;
-
- /**
- * 数据源代理
- */
- @Configuration
- public class DataSourceConfiguration {
- /**
- * 使用druid连接池
- *
- * @return
- */
- @Bean
- @ConfigurationProperties(prefix = "spring.datasource")
- public DataSource druidDataSource() {
- return new DruidDataSource();
- }
-
- /**
- * 设置数据源代理,完成分支事务注册/事务提交与回滚等操作
- *
- * @param druidDataSource
- * @return
- */
- @Primary // 设置首选数据源对象
- @Bean("dataSource")
- public DataSourceProxy dataSource(DataSource druidDataSource) {
- return new DataSourceProxy(druidDataSource);
- }
- }
启动扫描配置类,分别加载每个工程的启动类中
- @SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
- scanBasePackages = "com.lagou")
添加注解@GlobalTransactional
- /**
- * 商品销售
- *
- * @param goodsId 商品id
- * @param num 销售数量
- * @param username 用户名
- * @param money 金额
- */
- // @Transactional
- @GlobalTransactional(name = "sale", timeoutMills = 100000, rollbackFor = Exception.class)
- public void sale(Integer goodsId, Integer num, Double money, String username) {
- //创建订单
- orderServiceFeign.addOrder(idWorker.nextId(), goodsId, num, money, username);
- //增加积分
- pointsServiceFeign.increase(username, (int) (money / 10));
- //扣减库存
- storageServiceFeign.decrease(goodsId, num);
- }