http://download.oracle.com/otn-pub/jcp/jta-1.1-spec-oth-JSpec/jta-1_1-spec.pdf
Java事务API(JTA:Java Transaction API)和Java事务服务(JTS:Java Transaction Service),为J2EE平台提供了分布式事务服务(distributed transaction)的能力。可以认为JTA规范是XA规范的Java版,其把XA规范中规定的DTP模型交互接口抽象成Java接口中的方法,并规定每个方法要实现什么样的功能。
JTA比XA多了一个Application Server(应用服务器):应用程序运行的容器。JTA规范规定,事务管理器的功能应该由application server提供。并不是所有的web容器都实现了JTA规范,如tomcat并没有实现JTA规范,因此并不能提供事务管理器的功能
Java JTA的接口包
implementation 'javax.transaction:jta:1.1'
JTA规范中定义的接口作用(由各个厂商去实现):
com.atomikos.icatch.jta.UserTransactionImp
,用户只需要直接操作这个类com.atomikos.icatch.jta.UserTransactionManager
com.atomikos.icatch.jta.TransactionImp
AtomikosDataSourceBean
com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
com.alibaba.druid.pool.xa.DruidXADataSource
org.apache.tomcat.jdbc.pool.XADataSource
项目源码:https://github.com/jannal/transaction/tree/master/atomikos-jta
新建数据库表(Mysql 5.6)
//1. 创建数据库atomikos_account0
CREATE DATABASE IF NOT EXISTS atomikos_account0 DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_bin;
USE atomikos_account0;
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account`
(
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`account_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '账户标识',
`amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '金额',
`freezed_amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '冻结金额',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_accout` (`account_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='账户表';
INSERT INTO `t_account`(`id`, `account_id`, `amount`, `freezed_amount`, `create_time`, `update_time`)
VALUES (1, 'jannal', 10000.00, 0.00, '2022-05-03 17:23:37', '2022-05-03 17:23:39');
//2. 创建数据库atomikos_account0
CREATE DATABASE IF NOT EXISTS atomikos_account1 DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_bin;
USE atomikos_account1;
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account`
(
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`account_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '账户标识',
`amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '金额',
`freezed_amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '冻结金额',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_accout` (`account_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='账户表';
INSERT INTO `t_account`(`id`, `account_id`, `amount`, `freezed_amount`, `create_time`, `update_time`)
VALUES (1, 'tom', 1000.00, 0.00, '2022-05-03 17:23:37', '2022-05-03 17:23:39');
添加项目依赖
dependencies {
testAnnotationProcessor('org.springframework.boot:spring-boot-configuration-processor:2.2.6.RELEASE')
//mybatis
compile "org.mybatis.spring.boot:mybatis-spring-boot-starter:2.1.1",
"org.mybatis:mybatis-typehandlers-jsr310:1.0.2",
'org.springframework.boot:spring-boot-starter-aop',
'mysql:mysql-connector-java:5.1.46'
}
创建数据库配置类
//主数据源配置(atomikos_account0数据库)
@ConfigurationProperties(prefix = "spring.datasource.primary")
@Data
public class PrimaryDataSourceProperties {
private String url;
private String username;
private String password;
private int minPoolSize = 2;
private int maxPoolSize = 10;
/** max-lifetime 连接最大存活时间 s**/
private int maxLifetime = 60;
/** borrow-connection-timeout 获取连接失败重新获等待最大时间s,在这个时间内如果有可用连接,将返回 **/
private int borrowConnectionTimeout = 20;
/** login-timeout java数据库连接池,最大可等待获取datasouce的时间s **/
private int loginTimeout = 30;
/** maintenance-interval 连接回收时间s **/
private int maintenanceInterval = 600;
/** max-idle-time 最大闲置时间s,超过最小连接池连接的连接将将关闭 **/
private int maxIdleTime = 600;
/** test-query 测试SQL **/
private String testQuery = "SELECT 1";
}
//次数据源配置(atomikos_account1数据库)
@ConfigurationProperties(prefix = "spring.datasource.secondary")
@Data
public class SecondaryDataSourceProperties {
//与PrimaryDataSourceProperties属性一样
...省略....
}
//主数据源配置
@Configuration
@EnableConfigurationProperties(PrimaryDataSourceProperties.class)
@MapperScan(basePackages = {"org.jannal.jta.core.ds0.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory")
public class PrimaryDataSourceConfiguration {
@Bean(name = "dataSource")
@Primary
public DataSource dataSource(PrimaryDataSourceProperties primaryDataSourceProperties) throws SQLException {
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUrl(primaryDataSourceProperties.getUrl());
mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXADataSource.setPassword(primaryDataSourceProperties.getPassword());
mysqlXADataSource.setUser(primaryDataSourceProperties.getUsername());
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName("dataSource");
atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
atomikosDataSourceBean.setMinPoolSize(primaryDataSourceProperties.getMinPoolSize());
atomikosDataSourceBean.setMaxPoolSize(primaryDataSourceProperties.getMaxPoolSize());
atomikosDataSourceBean.setMaxLifetime(primaryDataSourceProperties.getMaxLifetime());
atomikosDataSourceBean.setBorrowConnectionTimeout(primaryDataSourceProperties.getBorrowConnectionTimeout());
atomikosDataSourceBean.setLoginTimeout(primaryDataSourceProperties.getLoginTimeout());
atomikosDataSourceBean.setMaintenanceInterval(primaryDataSourceProperties.getMaintenanceInterval());
atomikosDataSourceBean.setMaxIdleTime(primaryDataSourceProperties.getMaxIdleTime());
atomikosDataSourceBean.setTestQuery(primaryDataSourceProperties.getTestQuery());
return atomikosDataSourceBean;
}
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setLazyLoadingEnabled(true);
//configuration.setLogImpl(StdOutImpl.class);
configuration.setMapUnderscoreToCamelCase(true);
configuration.setDefaultExecutorType(ExecutorType.REUSE);
configuration.setCacheEnabled(false);
configuration.setDefaultStatementTimeout(5000);
bean.setConfiguration(configuration);
return bean.getObject();
}
@Bean(name = "sqlSessionTemplate")
@Primary
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
// 次数据源配置
@Configuration
@EnableConfigurationProperties(SecondaryDataSourceProperties.class)
@MapperScan(basePackages = {"org.jannal.jta.core.ds1.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory2")
public class SecondaryDataSourceConfiguration {
@Bean(name = "dataSource2")
public DataSource dataSource(SecondaryDataSourceProperties secondaryDataSourceProperties) throws SQLException {
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUrl(secondaryDataSourceProperties.getUrl());
mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXADataSource.setPassword(secondaryDataSourceProperties.getPassword());
mysqlXADataSource.setUser(secondaryDataSourceProperties.getUsername());
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName("dataSource2");
atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
atomikosDataSourceBean.setMinPoolSize(secondaryDataSourceProperties.getMinPoolSize());
atomikosDataSourceBean.setMaxPoolSize(secondaryDataSourceProperties.getMaxPoolSize());
atomikosDataSourceBean.setMaxLifetime(secondaryDataSourceProperties.getMaxLifetime());
atomikosDataSourceBean.setBorrowConnectionTimeout(secondaryDataSourceProperties.getBorrowConnectionTimeout());
atomikosDataSourceBean.setLoginTimeout(secondaryDataSourceProperties.getLoginTimeout());
atomikosDataSourceBean.setMaintenanceInterval(secondaryDataSourceProperties.getMaintenanceInterval());
atomikosDataSourceBean.setMaxIdleTime(secondaryDataSourceProperties.getMaxIdleTime());
atomikosDataSourceBean.setTestQuery(secondaryDataSourceProperties.getTestQuery());
return atomikosDataSourceBean;
}
@Bean(name = "sqlSessionFactory2")
public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource2") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setLazyLoadingEnabled(true);
//configuration.setLogImpl(StdOutImpl.class);
configuration.setMapUnderscoreToCamelCase(true);
configuration.setDefaultExecutorType(ExecutorType.REUSE);
configuration.setCacheEnabled(false);
configuration.setDefaultStatementTimeout(5000);
bean.setConfiguration(configuration);
return bean.getObject();
}
@Bean(name = "sqlSessionTemplate2")
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory2") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
事务管理器配置
@EnableTransactionManagement
@Configuration
public class TransactionConfiguration {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() {
//UserTransactionImp用于开启、提交、回滚事务
return new UserTransactionImp();
}
@Bean(name = "atomikosTransactionManager")
public TransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({"userTransaction", "atomikosTransactionManager"})
public PlatformTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager atomikosTransactionManager) {
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
}
Mapper代码
//Primary数据源Mapper
public interface Account0Mapper {
int update(Account account);
Account findByAccountId(@Param("accountId") String accountId);
Account findByAmountIdForUpdate(@Param("accountId") String accountId);
}
//Secondary数据源Mapper
public interface Account0Mapper {
//与Primary数据源Mapper方法一样
...省略...
}
Service代码
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private Account0Mapper account0Mapper;
@Autowired
private Account1Mapper account1Mapper;
@Override
@Transactional
public void transfer(AccountTransfer accountTransfer) {
String accountFromId = accountTransfer.getAccountFromId();
String accountToId = accountTransfer.getAccountToId();
BigDecimal amount = accountTransfer.getAmount();
//转账账户
Account accountFromExist = account0Mapper.findByAccountId(accountFromId);
if (accountFromExist == null) {
throw new RuntimeException(accountFromId + "不存在");
}
accountFromExist = account0Mapper.findByAmountIdForUpdate(accountFromId);
if (accountFromExist.getAmount().subtract(amount).compareTo(BigDecimal.ZERO) < 0) {
throw new RuntimeException(accountFromId + "账户余额不足");
}
Account accountNew = new Account();
accountNew.setId(accountFromExist.getId());
accountNew.setAmount(accountFromExist.getAmount().subtract(amount));
account0Mapper.update(accountNew);
//接收账户
Account accountToExist = account1Mapper.findByAccountId(accountToId);
if (accountToExist == null) {
throw new RuntimeException(accountToId + "不存在");
}
Account accountNew1 = new Account();
accountNew1.setId(accountToExist.getId());
accountNew1.setAmount(accountToExist.getAmount().add(amount));
account1Mapper.update(accountNew1);
//
if (accountTransfer.isMockException()) {
throw new RuntimeException("模拟出现异常");
}
}
}
@Setter
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AccountTransfer {
/**
* 转账账户
*/
private String accountFromId;
/**
* 进账账户
*/
private String accountToId;
private BigDecimal amount;
private boolean mockException;
}
@Setter
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Account implements Serializable {
private static final long serialVersionUID = 5454155825314635342L;
private Integer id;
private String accountId;
private BigDecimal amount;
private BigDecimal freezedAmount;
private Date createTime;
private Date updateTime;
}
启动代码
@SpringBootApplication
public class AccountJTAApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(AccountJTAApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
}
单元测试代码
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class AccountJTATest {
@Autowired
private AccountService accountService;
/**
* 正常
*/
@Test
public void testTransfer() {
AccountTransfer accountTransfer = new AccountTransfer();
accountTransfer.setAccountFromId("jannal");
accountTransfer.setAccountToId("tom");
accountTransfer.setAmount(new BigDecimal(1000));
accountService.transfer(accountTransfer);
}
/**
* 第二个数据源业务异常
*/
@Test
public void testTransferException() {
AccountTransfer accountTransfer = new AccountTransfer();
accountTransfer.setAccountFromId("jannal");
//不存在的账户
accountTransfer.setAccountToId("jack");
accountTransfer.setAmount(new BigDecimal(1000));
accountService.transfer(accountTransfer);
}
/**
* 第一个和第二个数据源业务都正常,方法结束前异常(模拟)
*/
@Test
public void testTransferMockException() {
AccountTransfer accountTransfer = new AccountTransfer();
accountTransfer.setAccountFromId("jannal");
accountTransfer.setAccountToId("tom");
accountTransfer.setAmount(new BigDecimal(1000));
accountTransfer.setMockException(true);
accountService.transfer(accountTransfer);
}
}
运行日志
# 1. 正常操作日志,可以看到commit
...省略...
c.a.d.xa.XATransactionalResource : dataSource: refreshed XAResource
c.a.datasource.xa.XAResourceTransaction : XAResource.start ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D31 , XAResource.TMNOFLAGS ) on resource dataSource represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@20de05e5
c.a.d.xa.XATransactionalResource : dataSource2: refreshed XAResource
c.a.datasource.xa.XAResourceTransaction : XAResource.start ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D32 , XAResource.TMNOFLAGS ) on resource dataSource2 represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@459b187a
c.a.datasource.xa.XAResourceTransaction : XAResource.end ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D31 , XAResource.TMSUCCESS ) on resource dataSource represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@20de05e5
c.a.datasource.xa.XAResourceTransaction : XAResource.end ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D32 , XAResource.TMSUCCESS ) on resource dataSource2 represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@459b187a
c.a.datasource.xa.XAResourceTransaction : XAResource.prepare ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D31 ) returning OK on resource dataSource represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@20de05e5
c.a.datasource.xa.XAResourceTransaction : XAResource.prepare ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D32 ) returning OK on resource dataSource2 represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@459b187a
c.a.datasource.xa.XAResourceTransaction : XAResource.commit ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D31 , false ) on resource dataSource represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@20de05e5
c.a.datasource.xa.XAResourceTransaction : XAResource.commit ( 3139322E3136382E3130312E382E746D313635313732373939303436353030303031:3139322E3136382E3130312E382E746D32 , false ) on resource dataSource2 represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@459b187a
# 1.1 正常的事务日志
{"id":"192.168.1.6.tm165175116472200001","wasCommitted":true,"participants":[{"uri":"192.168.1.6.tm1","state":"COMMITTING","expires":1651751175821,"resourceName":"dataSource"},{"uri":"192.168.1.6.tm2","state":"COMMITTING","expires":1651751175821,"resourceName":"dataSource2"}]}
{"id":"192.168.1.6.tm165175116472200001","wasCommitted":true,"participants":[{"uri":"192.168.1.6.tm1","state":"TERMINATED","expires":1651751175832,"resourceName":"dataSource"},{"uri":"192.168.1.6.tm2","state":"TERMINATED","expires":1651751175832,"resourceName":"dataSource2"}]}
# 2. 异常操作日志,可以看到rollback
...省略...
c.a.d.xa.XATransactionalResource : dataSource: refreshed XAResource
c.a.datasource.xa.XAResourceTransaction : XAResource.start ( 3139322E3136382E3130312E382E746D313635313732383132393437383030303031:3139322E3136382E3130312E382E746D31 , XAResource.TMNOFLAGS ) on resource dataSource represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@27bcb4ad
c.a.datasource.xa.XAResourceTransaction : XAResource.end ( 3139322E3136382E3130312E382E746D313635313732383132393437383030303031:3139322E3136382E3130312E382E746D31 , XAResource.TMSUCCESS ) on resource dataSource represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@27bcb4ad
c.a.datasource.xa.XAResourceTransaction : XAResource.rollback ( 3139322E3136382E3130312E382E746D313635313732383132393437383030303031:3139322E3136382E3130312E382E746D31 ) on resource dataSource represented by XAResource instance com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection@27bcb4ad
# 2.1 异常的事务日志
{"id":"192.168.1.6.tm165175128383300001","wasCommitted":false,"participants":[{"uri":"192.168.1.6.tm1","state":"TERMINATED","expires":1651751294869,"resourceName":"dataSource"},{"uri":"192.168.1.6.tm2","state":"TERMINATED","expires":1651751294869,"resourceName":"dataSource2"}]}