• 分布式事务解决方案之【Hmily实现TCC事务】


    什么是Hmily

    Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等RPC框架进行分布式事务。它目前支持以下特性:

    • 支持嵌套事务(Nested transaction support).
    • 采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。
    • 支持SpringBoot-starter 项目启动,使用简单。
    • RPC框架支持 : dubbo,motan,springcloud。
    • 本地事务存储支持 : redis,mongodb,zookeeper,file,mysql。
    • 事务日志序列化支持 :java,hessian,kryo,protostuff。
    • 采用Aspect AOP 切面思想与Spring无缝集成,天然支持集群。
    • RPC事务恢复,超时异常恢复等。

    Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。

    Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存储。

    Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架发现即可,不需要被调用它的其他业务服务所感知。

    官网介绍:https://dromara.org/website/zh-cn/docs/hmily/index.html

    业务说明

    本实例通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。

    两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。

    上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。
    在这里插入图片描述

    程序组成部分

    数据库:MySQL-5.7.25
    JDK:64位 jdk1.8.0_201
    微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
    Hmily:hmily-springcloud.2.0.4-RELEASE

    微服务及数据库的关系 :
    dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户, 连接数据库bank1
    dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2
    服务注册中心:dtx/discover-server

    创建数据库

    导入数据库脚本:资料\sql\bank1.sql、资料\sql\bank2.sql、已经导过不用重复导入。
    创建hmily数据库,用于存储hmily框架记录的数据。

    CREATE DATABASE `hmily` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
    
    • 1

    创建bank1库,并导入以下表结构和数据(包含张三账户)

    CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
    
    • 1
    DROP TABLE IF EXISTS `account_info`;
    CREATE TABLE `account_info` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
    主姓名',
    `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
    卡号',
    `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
    '帐户密码',
    `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
    PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
    Dynamic;
    INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    创建bank2库,并导入以下表结构和数据(包含李四账户)

    CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
    
    • 1
    CREATE TABLE `account_info` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
    主姓名',
    `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
    卡号',
    `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
    '帐户密码',
    `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
    PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
    Dynamic;
    INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    每个数据库都创建try、confirm、cancel三张日志表:

    CREATE TABLE `local_try_log` (
    `tx_no` varchar(64) NOT NULL COMMENT '事务id',
    `create_time` datetime DEFAULT NULL,
    PRIMARY KEY (`tx_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    CREATE TABLE `local_confirm_log` (
    `tx_no` varchar(64) NOT NULL COMMENT '事务id',
    `create_time` datetime DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    CREATE TABLE `local_cancel_log` (
    `tx_no` varchar(64) NOT NULL COMMENT '事务id',
    `create_time` datetime DEFAULT NULL,
    PRIMARY KEY (`tx_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    discover-server

    discover-server是服务注册中心,测试工程将自己注册至discover-server。
    导入:资料\基础代码\dtx 父工程,此工程自带了discover-server,discover-server基于Eureka实现。
    已经导过不用重复导入。

    导入案例工程dtx-tcc-demo

    dtx-tcc-demo是tcc的测试工程,根据业务需求需要创建两个dtx-tcc-demo工程。

    (1)导入dtx-tcc-demo
    导入:资料\基础代码\dtx-tcc-demo到父工程dtx下。
    两个测试工程如下:
    dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户,连接数据库bank1
    dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2

    (2)引入maven依赖

    <dependency>
    	<groupId>org.dromaragroupId>
    	<artifactId>hmily‐springcloudartifactId>
    	<version>2.0.4‐RELEASEversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    (3)配置hmily
    application.yml:

    org:
    	dromara:
    		hmily :
    			serializer : kryo
    			recoverDelayTime : 128
    			retryMax : 30
    			scheduledDelay : 128
    			scheduledThreadMax : 10
    			repositorySupport : db
    			started: true
    			hmilyDbConfig :
    				driverClassName : com.mysql.jdbc.Driver
    				url : jdbc:mysql://localhost:3306/bank?useUnicode=true
    				username : root
    				password : root
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    新增配置类接收application.yml中的Hmily配置信息,并创建HmilyTransactionBootstrap Bean:

    @Bean
    public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){
    	HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService);
    	hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));
    			hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime")));
    	hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax")));
    	hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay")));
    	hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax")));
    	hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport"));
    	hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started")));
    	HmilyDbConfig hmilyDbConfig = new HmilyDbConfig();
    	hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName"));
    	hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url"));
    	hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username"));
    	hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password"));
    	hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);
    	return hmilyTransactionBootstrap;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项:

    @SpringBootApplication
    @EnableDiscoveryClient
    @EnableHystrix
    @EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"})
    @ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"})
    public class Bank1HmilyServer {
    	public static void main(String[] args) {
    		SpringApplication.run(Bank1HmilyServer.class, args);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    dtx-tcc-demo-bank1

    dtx-tcc-demo-bank1实现try和cancel方法,如下:

    trytry幂等校验
    	try悬挂处理
    	检查余额是够扣减金额
    	扣减金额
    confirm:
    	空
    cancel:
    	cancel幂等校验
    	cancel空回滚处理
    	增加可用余额
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    1)Dao

    @Mapper
    @Component
    public interface AccountInfoDao {
    
    	@Update("update account_info set account_balance=account_balance ‐ #{amount} where account_balance>#{amount} and account_no=#{accountNo} ")
    	int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
    
    	@Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
    	int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
    
    	/**
    	* 增加某分支事务try执行记录
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Insert("insert into local_try_log values(#{txNo},now());")
    	int addTry(String localTradeNo);
    	@Insert("insert into local_confirm_log values(#{txNo},now());")
    	int addConfirm(String localTradeNo);
    	@Insert("insert into local_cancel_log values(#{txNo},now());")
    	int addCancel(String localTradeNo);
    	/**
    	* 查询分支事务try是否已执行
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Select("select count(1) from local_try_log where tx_no = #{txNo} ")
    	int isExistTry(String localTradeNo);
    	/**
    	* 查询分支事务confirm是否已执行
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
    	int isExistConfirm(String localTradeNo);
    	/**
    	* 查询分支事务cancel是否已执行
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
    	int isExistCancel(String localTradeNo);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    2)try和cancel方法

    @Service
    @Slf4j
    public class AccountInfoServiceImpl implements AccountInfoService {
    	private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);
    	@Autowired
    	private AccountInfoDao accountInfoDao;
    	@Autowired
    	private Bank2Client bank2Client;
    	
    	@Override
    	@Transactional
    	@Hmily(confirmMethod = "commit", cancelMethod = "rollback")
    	public void updateAccountBalance(String accountNo, Double amount) {
    		//事务id
    		String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
    		log.info("******** Bank1 Service begin try... "+transId );
    		int existTry = accountInfoDao.isExistTry(transId);
    		//try幂等校验
    		if(existTry>0){
    			log.info("******** Bank1 Service 已经执行try,无需重复执行,事务id:{} "+transId );
    			return ;
    		}
    		//try悬挂处理
    		if(accountInfoDao.isExistCancel(transId)>0 || accountInfoDao.isExistConfirm(transId)>0){
    			log.info("******** Bank1 Service 已经执行confirm或cancel,悬挂处理,事务id:{} "+transId);
    			return ;
    		}
    		//从账户扣减
    		if(accountInfoDao.subtractAccountBalance(accountNo ,amount )<=0){
    			//扣减失败
    			throw new HmilyRuntimeException("bank1 exception,扣减失败,事务id:{}"+transId);
    		}
    		//增加本地事务try成功记录,用于幂等性控制标识
    		accountInfoDao.addTry(transId);
    		//远程调用bank2
    		if(!bank2Client.test2(amount,transId)){
    			throw new HmilyRuntimeException("bank2Client exception,事务id:{}"+transId);
    		}
    		if(amount==10){//异常一定要抛在Hmily里面
    			throw new RuntimeException("bank1 make exception 10");
    		}
    		log.info("******** Bank1 Service end try... "+transId );
    	}
    	
    	@Transactional
    	public void commit( String accountNo, double amount) {
    		String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
    		logger.info("******** Bank1 Service begin commit..."+localTradeNo );
    	}
    	
    	@Transactional
    	public void rollback( String accountNo, double amount) {
    		String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
    		log.info("******** Bank1 Service begin rollback... " +localTradeNo);
    		if(accountInfoDao.isExistTry(localTradeNo) == 0){ //空回滚处理,try阶段没有执行什么也不用做
    			log.info("******** Bank1 try阶段失败... 无需rollback "+localTradeNo );
    			return;
    		}
    		if(accountInfoDao.isExistCancel(localTradeNo) > 0){ //幂等性校验,已经执行过了,什么也不用做
    			log.info("******** Bank1 已经执行过rollback... 无需再次rollback " +localTradeNo);
    			return;
    		}
    		//再将金额加回账户
    		accountInfoDao.addAccountBalance(accountNo,amount);
    		//添加cancel日志,用于幂等性控制标识
    		accountInfoDao.addCancel(localTradeNo);
    		log.info("******** Bank1 Service end rollback... " +localTradeNo);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    3)feignClient

    @FeignClient(value = "seata‐demo‐bank2", fallback = Bank2Fallback.class)
    public interface Bank2Client {
    	@GetMapping("/bank2/transfer")
    	@Hmily
    	Boolean transfer(@RequestParam("amount") Double amount);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. Controller
    @RestController
    public class Bank1Controller {
    	@Autowired
    	AccountInfoService accountInfoService;
    	
    	@RequestMapping("/transfer")
    	public String test(@RequestParam("amount") Double amount) {
    		this.accountInfoService.updateAccountBalance("1", amount);
    		return "cn/itcast/dtx/tccdemo/bank1" + amount;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    dtx-tcc-demo-bank2

    dtx-tcc-demo-bank2实现如下功能:

    try:
    	空
    confirm:
    	confirm幂等校验
    	正式增加金额
    cancel:
    	空
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1)Dao

    @Component
    @Mapper
    public interface AccountInfoDao {
    	@Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
    	int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
    	/**
    	* 增加某分支事务try执行记录
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Insert("insert into local_try_log values(#{txNo},now());")
    	int addTry(String localTradeNo);
    	@Insert("insert into local_confirm_log values(#{txNo},now());")
    	int addConfirm(String localTradeNo);
    	@Insert("insert into local_cancel_log values(#{txNo},now());")
    	int addCancel(String localTradeNo);
    	/**
    	* 查询分支事务try是否已执行
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Select("select count(1) from local_try_log where tx_no = #{txNo} ")
    	int isExistTry(String localTradeNo);
    	/**
    	* 查询分支事务confirm是否已执行
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
    	int isExistConfirm(String localTradeNo);
    	/**
    	* 查询分支事务cancel是否已执行
    	* @param localTradeNo 本地事务编号
    	* @return
    	*/
    	@Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
    	int isExistCancel(String localTradeNo);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2)实现confirm方法

    @Service
    @Slf4j
    public class AccountInfoServiceImpl implements AccountInfoService {
    	@Autowired
    	private AccountInfoDao accountInfoDao;
    	@Override
    	@Transactional
    	@Hmily(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
    	public void updateAccountBalance(String accountNo, Double amount) {
    		String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
    		log.info("******** Bank2 Service Begin try ..."+localTradeNo);
    	}
    	@Transactional
    	public void confirmMethod(String accountNo, Double amount) {
    		String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
    		log.info("******** Bank2 Service commit... " +localTradeNo);
    		if(accountInfoDao.isExistConfirm(localTradeNo) > 0){ //幂等性校验,已经执行过了,什么也不用做
    			log.info("******** Bank2 已经执行过confirm... 无需再次confirm "+localTradeNo );
    			return ;
    		}
    		//正式增加金额
    		accountInfoDao.addAccountBalance(accountNo,amount);
    		//添加confirm日志
    		accountInfoDao.addConfirm(localTradeNo);
    	}
    	@Transactional
    	public void cancelMethod(String accountNo, Double amount) {
    		String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
    		log.info("******** Bank2 Service begin cancel... "+localTradeNo );
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3)Controller

    @RestController
    public class Bank2Controller {
    	@Autowired
    	AccountInfoService accountInfoService;
    	@RequestMapping("/transfer")
    	public Boolean test2(@RequestParam("amount") Double amount) {
    		this.accountInfoService.updateAccountBalance("2", amount);
    		return true;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    测试场景

    张三向李四转账成功。
    李四事务失败,张三事务回滚成功。
    张三事务失败,李四分支事务回滚成功。
    分支事务超时测试。

    小结

    如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。

    而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

  • 相关阅读:
    可视化并理解CNN
    Feign源码分析(without spring)
    诸葛智能荣登《2022中国企业数智化转型升级创新服务企业》榜单!
    spring源码解析
    【C++笔试强训】第十七天
    完成Zookeeper集群部署
    【机械】二维钢桁架分析与设计附matlab代码
    【HTML-CSS】总结-6种实现元素 上下左右居中 方法--附演示效果
    Java面试题-UDP\TCP\HTTP
    半监督短语挖掘:autophrase是什么?
  • 原文地址:https://blog.csdn.net/A_art_xiang/article/details/127743927