分布式解决方案seata。
本章代码已分享至Gitee: https://gitee.com/lengcz/springcloudalibaba01.git
Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造了一站式的分布式事务解决方案。
seata的设计目标是对业务的无侵入,因此对业务无侵入的2PC方案,在传统2PC 的基础上演进。它把所有的事务理解为一个包含了若干分支事务的全局事务。全局事务的职责是协调其管辖的分支事务达成一致,要么同时成功,要么一起回滚。此外分支事务本身就是一个关系型数据库的本地事务。
Seata的执行流程
Seata的一大特色是AT对业务代码完全无入侵,使用非常简单,改造成本低。用户只需要关注自己的业务SQL ,Seata会通过分析用户的业务SQL反向生成回滚数据。AT模式分为两阶段,如下图
Seata实现2PC和传统2PC的差别。
XA模式是Seata另一种无侵入的分布式事务解决方案,它在seata定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对XA协议的支持,以XA协议的机制来管理分支事务,XA要求数据库本身对规范和协议的支持。
从编程模式上,XA模式和AT模式保持完全一致,只需要修改数据源代理即,即可实现XA模式与AT模式之间的切换,代码如下。
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource){
// return new DataSourceProxy(druidDataSource);//AT模式
return new DataSourceProxyXA(druidDataSource);//XA模式
}
Seata中的TCC模式同样包含了三个阶段
Saga是长事务解决方案,每个参与者都需要实现事务的正向操作和补偿操作。当参与者的正向操作执行失败时,在回滚本地事务的同时会调用上一阶段的补偿操作,在业务失败时最终会使事务回到初始状态。
Saga与TCC类似,同样没有全局锁。由于缺少锁定资源这一步,在某些适合的场景,Saga要比TCC实现起来更简单。
Saga模式是长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统,Saga模式一阶段就会提交本地事务且无锁,在长流程情况下可以保证性能,多用于渠道层、集成层业务系统、事务参与者可能是其它公司的服务或者是遗留系统的服务,无法进行改造或者提供TCC要求的接口,也可以使用Saga模式。
用户下单,创建订单,并调用远程服务扣减库存
public interface ProductService {
Product findById(Integer pid);
/**
* 扣除库存
* @param pid
* @param num
*/
void reduceInventory(Integer pid, Integer num);
}
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductDao productDao;
@Override
public Product findById(Integer pid) {
return productDao.findById(pid).get();
}
@Transactional //事务注解
public void reduceInventory(Integer pid, Integer num) {
Product product = productDao.findById(pid).get();
product.setStock(product.getStock()-num);
productDao.save(product);
}
}
@RequestMapping("/product/reduceInventory")
public void reduceInventory(Integer pid,Integer num) {
log.info("扣库存:pid" + pid+",num:"+num);
productService.reduceInventory( pid, num);
}
/**
* 订单service
*/
public interface OrderService2 {
public Order createOrder(Integer pid,Integer num);
}
package com.lcz.service.impl;
import com.alibaba.fastjson.JSON;
import com.lcz.dao.OrderDao;
import com.lcz.pojo.Order;
import com.lcz.pojo.Product;
import com.lcz.service.OrderService2;
import com.lcz.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class OrderService2Impl implements OrderService2 {
@Autowired
private OrderDao orderDao;
@Autowired
private ProductService productService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public Order createOrder(Integer pid, Integer num) {
log.info("----------------调用商品服务,调用商品微服务查询此商品------------------------");
/**
* Ribbon负载均衡
*/
Product product = productService.findById(pid);
log.info("查询到的商品内容:" + JSON.toJSONString(product));
Order order = new Order();
order.setUid(1);
order.setUsername("测试");
order.setPid(pid);
order.setPrice(product.getPrice());
order.setPname(product.getPname());
order.setNumber(num);
orderDao.save(order);//本地事务
log.info("用户下单成功,订单信息为:" + JSON.toJSON(order));
log.info("扣减库存");
//扣减库存
productService.reduceInventory(pid,num);//远程事务
//发送订单到消息队列
rocketMQTemplate.convertAndSend("order-topic",order);
return order;
}
}
package com.lcz.service.listener;
import com.lcz.pojo.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 监听器
*/
@Component
@RocketMQMessageListener(
topic = "order-topic",//消费主题
consumerGroup = "group_rocketmq" //消费者分组
)
@Slf4j
public class OrderRocketMessageListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
System.out.println("-----监听到订单-----");
log.info("监听到用户下单成功,向用户发送待支付通知"+ order);
}
}
需要注释掉服务容错的代码
/**
* value用于指定调用的nacos下哪个微服务
* fallback用于指定当前feign接口容错类
*/
//@FeignClient(value="server-product",fallback = ProductServiceFallback.class)
@FeignClient(value="server-product"
// ,fallbackFactory = ProductServiceFallbackFactory.class
)
public interface ProductService {
@RequestMapping("/product/{pid}")
Product findById(@PathVariable Integer pid);
/**
* 扣库存的远程调用
* @param pid 商品id
* @param num 扣除数量
*/
@RequestMapping("/product/reduceInventory")
void reduceInventory(@RequestParam("pid") Integer pid,
@RequestParam("num") Integer num);
}
/**
* seata处理事务
*/
@RestController
@Slf4j
public class OrderController3 {
@Autowired
private OrderService2 orderService2;
@GetMapping("/order3/prod/{pid}")
public Order order(@PathVariable Integer pid) {
//这里直接写死了下单两个
return orderService2.createOrder(pid,2);
}
}
到这里,我们完成了下单和扣减库存的业务代码。
很显然order和product是两个微服务内,商品id为3时下单成功,但是扣减库存可能会失败(抛异常)。
@Transactional //事务注解
@Override
public void reduceInventory(Integer pid, Integer num) {
Product product = productDao.findById(pid).get();
product.setStock(product.getStock()-num);
if(pid==3){
int i=1/0;//模拟异常
}
productDao.save(product);
}
我们调用下单,虽然下单成功,但是扣减库存失败了,这就是电信的分布式事务问题。我们需要保证事务的一致性,不可分割,要么同时成功,要么同时失败。
清空订单表,并将shop_product库存设置为500,此时发送下单请求,显示请求失败了,但是订单表里已经下单,但是库存没有减少。
发送下单请求显示失败,数据库有此订单,但是库存未减少。
通过上面的例子,我们复现了发生故障时,分布式事务的不一致问题。
前面我们构造了一个分布式事务的典型问题,如何解决分布式事务问题呢?下面我们引入seata,使用seata解决分布式事务问题。
Seata Server是Seata中的事务协调器,从官网下载Seata-Server解压安装。
下载地址: https://github.com/seata/seata/releases
下载seata-server
修改配置conf下的registry.conf,注册和配置均使用nacos
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}
#nacos-config.sh在conf目录下,直接执行命令即可
nacos-config.sh 127.0.0.1
如果你需要指定其它参数,请参考eg
eg: sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u username -w password
seata-server.bat -p 8091 -m file
5. 到nacos后台查看,如果看到serverAddr即可启动成功。(seata-server各版本的服务名称有些不同,也有叫seata-server)
注意:需要在使用到seata全局事务的所有微服务对应的数据库创建undo_log表。
以下是mysql的建表语句,其它类型数据库请见https://github.com/seata/seata/tree/develop/script/client/at/db
CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = INNODB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
此处的下单模型中,order微服务调用product微服务,order创建订单,product扣减库存,这两个行为属于子事务流程,所以都需要配置seata。我们要如何操作呢?
注意: 下面的操作需要在product和order两个模块都需要配置,因为它们两个模块都是全局事务的一部分。
<dependency>
<groupId>com.alibaba.cloudgroupId>
<artifactId>spring-cloud-starter-alibaba-nacos-configartifactId>
dependency>
<dependency>
<groupId>com.alibaba.cloudgroupId>
<artifactId>spring-cloud-starter-alibaba-seataartifactId>
dependency>
Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy 的
Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务
package com.lcz.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* seata配置代理数据源
*/
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource(){
DruidDataSource druidDataSource=new DruidDataSource();
return druidDataSource;
}
@Bean
@Primary
public DataSourceProxy dataSource(DruidDataSource druidDataSource){
return new DataSourceProxy(druidDataSource);
}
}
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}
spring:
application:
name: server-product
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848
namespace: public #命名空间
group: SEATA_GROUP
alibaba:
seata:
tx-service-group: my_test_tx_group
只需要在入口方法添加@GlobalTransactional
@GlobalTransactional //开启全局事务控制
public Order createOrder(Integer pid, Integer num) {}
我们前面的模拟异常,商品id为3时,会抛出异常。我们先用对id为2商品下单,可以下单成功,扣除两个,并且有一笔id为106的订单。
此时我们对id为3的商品下单http://localhost:8092/order3/prod/3,发现报错,数据库中的库存没有减少,也没有生成这笔订单。
此时,我们再次对id为2的商品下单,发起一笔请求http://localhost:8092/order3/prod/2,可以发现库存减少,生成了id为108的订单。
订单表的id为什么不连续?
这个问题很明显说明了前面id为3的商品虽然最终下单失败(扣减库存异常),但是实际已经生成了id为107的订单,在事务的最终被seata反向删除了。
流程重点解析:
1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连
接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务
操作就一定有undo_log。
2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成
就已经将分支事务提交,也就释放了锁资源。
3、 TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支
事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
4、第二阶段全局事务提交, TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事
务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚, TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应
的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失
败则会重试回滚操作。
这里我直接使用的seata默认的事务分组,也可以自己创建事务分组。