不光是管理多个数据源,是对sql的优化、改写、归并等一系列操作的解决方案。关注的是sql语句。以shardingSphere为例,虽然也支持跟sql无关的hint策略提供路由功能,但是在sql改写以及归并过程中,依旧对sql有限制。
如果只是简单的切换多个数据源,而对sql的逻辑没有任何限制,就不要选择分库分表了。直接选用多数据源切换多方案更简单。spring-jdbc模块提供了AbstractRoutingDataSource抽象类,其内部可以包含多个DataSource,只需要实现其抽象方法,在运行时就可以动态访问指定的数据库。但是需要自己实现一些aop的切换能力,这个mybaitis-plus都帮我们做好了。
业界主要有两种实现方案:
我们来查看AbstractRoutingDataSource源码,来更好的理解多数据源配置。
首先查看该类的属性,根据名称我们能看出他们的作用。
private Map<Object, Object> targetDataSources;
private Object defaultTargetDataSource;
private boolean lenientFallback = true;
private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
private Map<Object, DataSource> resolvedDataSources;
private DataSource resolvedDefaultDataSource;
对数据源赋值的代码如下:
public void setTargetDataSources(Map<Object, Object> targetDataSources) {
this.targetDataSources = targetDataSources;
}
public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
this.defaultTargetDataSource = defaultTargetDataSource;
}
因为方法是set开头,我们便能把这两个方法配置在spring中,继续向下看。
public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException("Property 'targetDataSources' is required");
} else {
this.resolvedDataSources = new HashMap(this.targetDataSources.size());
Iterator var1 = this.targetDataSources.entrySet().iterator();
while(var1.hasNext()) {
Entry<Object, Object> entry = (Entry)var1.next();
Object lookupKey = this.resolveSpecifiedLookupKey(entry.getKey());
DataSource dataSource = this.resolveSpecifiedDataSource(entry.getValue());
this.resolvedDataSources.put(lookupKey, dataSource);
}
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}
}
这个afterPropertiesSet方法是遍历我们的targetDataSources数据源集合,并添加resolvedDataSources的map数据,map的key和value是根据resolveSpecifiedLookupKey方法和resolveSpecifiedDataSource方法得到。接着找到resolveSpecifiedLookupKey和resolveSpecifiedDataSource。
protected Object resolveSpecifiedLookupKey(Object lookupKey) {
return lookupKey;
}
protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
if (dataSource instanceof DataSource) {
return (DataSource)dataSource;
} else if (dataSource instanceof String) {
return this.dataSourceLookup.getDataSource((String)dataSource);
} else {
throw new IllegalArgumentException("Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
}
}
resolveSpecifiedLookupKey方法返回的实际就是targetDataSources的key,而resolveSpecifiedDataSource返回的是targetDataSources的value转成的DataSource。afterPropertiesSet方法的作用实际就是将原targetDataSources转成resolvedDataSources。
继续向下看,我们能看到数据库的连接方法。
public Connection getConnection() throws SQLException {
return this.determineTargetDataSource().getConnection();
}
public Connection getConnection(String username, String password) throws SQLException {
return this.determineTargetDataSource().getConnection(username, password);
}
我们接着去看determineTargeDataSource方法,估计这个方法是返回指定数据源的。
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = this.determineCurrentLookupKey();
DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
} else {
return dataSource;
}
}
果然,这个方法是返回数据源的,我们来仔细读这个方法,从第3行开始”Object lookupKey = this.determineCurrentLookupKey();”,这个determineCurrentLookupKey返回了一个key,第四句是根据这个key去resolvedDataSources中拿到对应DataSource,接下来的代码是DataSource不存在便返回默认的数据源。determineCurrentLookupKey方法就是返回key的逻辑处理部分,联系spring中的配置,它返回的就是”cms”、”epg”中的一个。
新建一个springboot项目,pom.xml文件中引入如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
server:
port: 8090
spring:
application:
name: springboot-dynamic-aop
datasource:
type: com.alibaba.druid.pool.DruidDataSource
master:
jdbc-url: jdbc:mysql://localhost:3306/dynamic-master?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
slave:
jdbc-url: jdbc:mysql://localhost:3306/dynamic-slave?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
mybatis:
mapper-locations: classpath:mapper/*.xml
configuration:
use-actual-param-name: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSourceTransactionManager masterDataSourceTransactionManager(DynamicDataSource dynamicDataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dynamicDataSource);
return dataSourceTransactionManager;
}
@Bean
public DataSourceTransactionManager slaveDataSourceTransactionManager(DynamicDataSource dynamicDataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dynamicDataSource);
return dataSourceTransactionManager;
}
}
新建一个类继承AbstractRoutingDataSource,实现其抽象类
@Primary
@Component
public class DynamicDataSource extends AbstractRoutingDataSource {
public static final ThreadLocal<String> name = new ThreadLocal<>();
@Autowired
DataSource masterDataSource;
@Autowired
DataSource slaveDataSource;
@Override
protected Object determineCurrentLookupKey() {
return name.get();
}
@Override
public void afterPropertiesSet() {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource);
targetDataSources.put("slave", slaveDataSource);
//设置目标数据源
super.setTargetDataSources(targetDataSources);
//设置默认数据源
super.setDefaultTargetDataSource(masterDataSource);
super.afterPropertiesSet();
}
}
一般情况下,读写分离的数据源使用MyBatis插件实现动态切换数据源,不同业务来源的数据源使用AOP结合自定义注解实现动态切换数据源,或者定义多个mybatis sqlsessionFactory来实现
新建一个插件类,实现Interceptor接口
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
})
public class DynamicDataSourcePlugin implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object[] objects = invocation.getArgs();
MappedStatement mappedStatement = (MappedStatement) objects[0];
if (mappedStatement.getSqlCommandType().equals(SqlCommandType.SELECT)) {
DynamicDataSource.name.set("slave");
} else {
DynamicDataSource.name.set("master");
}
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
} else {
return target;
}
}
@Override
public void setProperties(Properties properties) {
}
}
再将DynamicDataSourcePlugin类加入DataSourceConfig配置类
@Bean
public Interceptor interceptor() {
return new DynamicDataSourcePlugin();
}
新建一个自定义注解DS
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DS {
String value() default "";
}
新建切面类
@Aspect
@Component
public class DynamicDataSourceAspect implements Ordered {
@Before("@within(ds)")
public void before(JoinPoint joinPoint, DS ds) {
DynamicDataSource.name.set(ds.value());
}
@Override
public int getOrder() {
return 0;
}
}
SpringBoot配置文件 配置多个数据库 分别为 his/pt/lis
spring:
datasource:
his:
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
jdbc-url: jdbc:sqlserver://192.168.200.200\HIS;DatabaseName=his_fy
username: sa
password: 123456
#初始化连接池的连接数量 大小 最小 最大
initial-size: 5
min-idle: 5
max-active: 20
#配置获取连接等待超时的时间
max-wait: 60000
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
time-between-eviction-runs-millis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
min-evictable-idle-time-millis: 30000
# 配置一个连接在池中最大生存的时间,单位是毫秒
max-evictable-idle-time-millis: 300000
pt:
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
jdbc-url: jdbc:sqlserver://192.168.200.200\HIS;DatabaseName=his_pt_data
username: sa
password: 123456
#初始化连接池的连接数量 大小 最小 最大
initial-size: 5
min-idle: 5
max-active: 20
#配置获取连接等待超时的时间
max-wait: 60000
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
time-between-eviction-runs-millis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
min-evictable-idle-time-millis: 30000
# 配置一个连接在池中最大生存的时间,单位是毫秒
max-evictable-idle-time-millis: 300000
lis:
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
jdbc-url: jdbc:sqlserver://192.168.200.200\HIS;DatabaseName=LIS
username: sa
password: 123456
#初始化连接池的连接数量 大小 最小 最大
initial-size: 5
min-idle: 5
max-active: 20
#配置获取连接等待超时的时间
max-wait: 60000
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
time-between-eviction-runs-millis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
min-evictable-idle-time-millis: 30000
# 配置一个连接在池中最大生存的时间,单位是毫秒
max-evictable-idle-time-millis: 300000
创建his对应的配置文件
/**
* 多数据源配置类 此类配置读取his_fy数据库
* @author zhaogx
* @date 2022/5/18 14:28
*/
@Configuration
@MapperScan(
basePackages = {"com.thwy.mapper.his"},
sqlSessionFactoryRef = "hisSqlSessionFactory"
)
public class HisDataSourceConfig {
/**
* @ConfigurationProperties(prefix = "spring.datasource.his") 读取配置文件中的数据源信息
* @return 返回一个数据源 名字为 hisDataSource
*/
@Bean(name = "hisDataSource")
@ConfigurationProperties(prefix = "spring.datasource.his")
public DataSource hisDataSource(){
return DataSourceBuilder.create().build();
}
/**
* 配置SqlSessionFactory
* @Qualifier("hisDataSource") 类型相同时指定注入哪一个名称的bean
* @param hisDataSource hisDataSource方法中创建的指定数据源
* @return
* @throws Exception
*/
@Bean(name = "hisSqlSessionFactory")
public SqlSessionFactory hisSqlSessionFactory(@Qualifier("hisDataSource") DataSource hisDataSource) throws Exception{
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
//设置数据源
sqlSessionFactoryBean.setDataSource(hisDataSource);
//设置mybtais配置 驼峰命名配置
org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();
config.setMapUnderscoreToCamelCase(true);
sqlSessionFactoryBean.setConfiguration(config);
//设置mapper.xml所在目录
sqlSessionFactoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath:mapper/his/*.xml"));
return sqlSessionFactoryBean.getObject();
}
/**
* 配置SqlSessionTemplate 可省略此步骤
* @param hisSqlSessionFactory
* @return
*/
public SqlSessionTemplate hisSqlSessionTemplate(@Qualifier("hisSqlSessionFactory") SqlSessionFactory hisSqlSessionFactory){
return new SqlSessionTemplate(hisSqlSessionFactory);
}
}
创建pt对应的配置文件
@Configuration
@MapperScan(
basePackages = {"com.thwy.mapper.pt"},
sqlSessionFactoryRef = "ptSqlSessionFactory"
)
public class PTDataSourceConfig {
/**
* @ConfigurationProperties(prefix = "spring.datasource.pt") 读取配置文件中的数据源信息
* @return 返回一个数据源 名字为 ptDataSource
*/
@Bean(name = "ptDataSource")
@ConfigurationProperties(prefix = "spring.datasource.pt")
public DataSource ptDataSource(){
return DataSourceBuilder.create().build();
}
/**
* 配置SqlSessionFactory
* @Qualifier("ptDataSource") 类型相同时指定注入哪一个名称的bean
* @param ptDataSource ptDataSource方法中创建的指定数据源
* @return
* @throws Exception
*/
@Bean(name = "ptSqlSessionFactory")
public SqlSessionFactory ptSqlSessionFactory(@Qualifier("ptDataSource") DataSource ptDataSource) throws Exception{
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
//设置数据源
sqlSessionFactoryBean.setDataSource(ptDataSource);
//设置mybtais配置 驼峰命名配置
org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();
config.setMapUnderscoreToCamelCase(true);
sqlSessionFactoryBean.setConfiguration(config);
//设置mapper.xml所在目录
sqlSessionFactoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath:mapper/pt/*.xml"));
return sqlSessionFactoryBean.getObject();
}
/**
* 配置SqlSessionTemplate 可省略此步骤
* @param ptSqlSessionFactory
* @return
*/
public SqlSessionTemplate ptSqlSessionTemplate(@Qualifier("ptSqlSessionFactory") SqlSessionFactory ptSqlSessionFactory){
return new SqlSessionTemplate(ptSqlSessionFactory);
}
}
创建lis对应的配置文件
/**
* 多数据源配置类 此类配置读取LIS数据库
* @author zhaogx
* @date 2022/5/18 14:28
*/
@Configuration
@MapperScan(
basePackages = {"com.thwy.mapper.lis"},
sqlSessionFactoryRef = "lisSqlSessionFactory"
)
public class LisDataSourceConfig {
/**
* @ConfigurationProperties(prefix = "spring.datasource.lis") 读取配置文件中的数据源信息
* @return 返回一个数据源 名字为 lisDataSource
*/
@Bean(name = "lisDataSource")
@ConfigurationProperties(prefix = "spring.datasource.lis")
public DataSource lisDataSource(){
return DataSourceBuilder.create().build();
}
/**
* 配置SqlSessionFactory
* @Qualifier("lisDataSource") 类型相同时指定注入哪一个名称的bean
* @param lisDataSource lisDataSource方法中创建的指定数据源
* @return
* @throws Exception
*/
@Bean(name = "lisSqlSessionFactory")
public SqlSessionFactory lisSqlSessionFactory(@Qualifier("lisDataSource") DataSource lisDataSource) throws Exception{
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
//设置数据源
sqlSessionFactoryBean.setDataSource(lisDataSource);
//设置mybtais配置 驼峰命名配置
org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();
config.setMapUnderscoreToCamelCase(true);
sqlSessionFactoryBean.setConfiguration(config);
//设置mapper.xml所在目录
sqlSessionFactoryBean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath:mapper/lis/*.xml"));
return sqlSessionFactoryBean.getObject();
}
/**
* 配置SqlSessionTemplate 可省略此步骤
* @param lisSqlSessionFactory
* @return
*/
public SqlSessionTemplate lisSqlSessionTemplate(@Qualifier("lisSqlSessionFactory") SqlSessionFactory lisSqlSessionFactory){
return new SqlSessionTemplate(lisSqlSessionFactory);
}
}
在配置文件中我们配置了 mapper的包扫描与xml文件的存放路径
此时当我们执行指定包下的mapper中的方法时,就会走与之对应的数据库
自己整合实现多数据源多有麻烦,baomidou提供的dynamic-datasource-spring-boot-starter已实现了上述功能,只需要引入该依赖即可,可以参阅SpringBoot整合dynamic-datasource实现动态切换多数据源
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>
server:
port: 8226
spring:
application:
name: springboot-dynamic-mybatis-plus
datasource:
type: com.alibaba.druid.pool.DruidDataSource
dynamic:
primary: master
strict: true #严格匹配数据源
datasource:
master:
url: jdbc:mysql://localhost:3306/dynamic-master?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: LIU81&yj
driver-class-name: com.mysql.cj.jdbc.Driver
slave:
url: jdbc:mysql://localhost:3306/dynamic-slave?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: LIU81&yj
driver-class-name: com.mysql.cj.jdbc.Driver
druid:
initial-size: 5 #初始连接数
min-idle: 10 #最小连接池
max-active: 20 #最大连接池
max-wait: 60000 #连接等待超时时间
time-between-eviction-runs-millis: 60000 #检测间隔时间,毫秒
min-evictable-idle-time-millis: 300000 #连接池最小生存时间,毫秒
max-evictable-idle-time-millis: 900000 #连接池最大生存时间,毫秒
validation-query: SELECT 1 FROM DUAL #连接检测
mybatis-plus:
mapper-locations: classpath*:/mapper/**/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
@Service
@DS(value = "master")
public class CustomerServiceImpl extends ServiceImpl<CustomerMapper, Customer> implements CustomerService {
}
若需要使用到事务,只需要在最外层加注解@DSTransactional即可
当然了mybatisPlus在很多位置给我们留了拓展,比如如何加载数据源、对接其它连接池、自定义负责均衡策略、自定义路由查找:
其主要类图如下:感兴趣的可以去读读源码
引入多数据源后,解决了多数据源访问的问题,同时也带来另外2个问题:
//数据源1
@Bean
public DataSource dataSource1() {
org.apache.tomcat.jdbc.pool.DataSource dataSource = new org.apache.tomcat.jdbc.pool.DataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/ds1?characterEncoding=UTF-8");
dataSource.setUsername("root");
dataSource.setPassword("root123");
dataSource.setInitialSize(5);
return dataSource;
}
//事务管理器1,对应数据源1
@Bean
public PlatformTransactionManager transactionManager1(@Qualifier("dataSource1")DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
//数据源2
@Bean
public DataSource dataSource2() {
org.apache.tomcat.jdbc.pool.DataSource dataSource = new org.apache.tomcat.jdbc.pool.DataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/ds2?characterEncoding=UTF-8");
dataSource.setUsername("root");
dataSource.setPassword("root123");
dataSource.setInitialSize(5);
return dataSource;
}
//事务管理器2,对应数据源2
@Bean
public PlatformTransactionManager transactionManager2(@Qualifier("dataSource2")DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
使用@Transaction 中时,需通过@Transaction 注解的 value 或 transactionManager 属性指定事务管理器 bean 名称,如:
@Transactional(transactionManager = "transactionManager1", propagation = Propagation.REQUIRED)
public void required(String name) {
this.jdbcTemplate1.update("insert into user1(name) VALUES (?)", name);
}
多数据源事务的使用就这么简单,下面我们来看案例,案例才是精华。
这里先给大家解释一下 REQUIRED 传播行为下,事务管理器的大致的运行过程,方便理解后面的案例代码。
Service1中:
@Transactional(transactionManager = "transactionManager1", propagation = Propagation.REQUIRED)
public void m1(){
this.jdbcTemplate1.update("insert into user1(name) VALUES ('张三')");
service2.m2();
}
Service2中:
@Transactional(transactionManager = "transactionManager1", propagation = Propagation.REQUIRED)
public void m2(){
this.jdbcTemplate1.update("insert into user1(name) VALUES ('李四')");
}
spring 事务中有个 resources 的 ThreadLocal,static 修饰的,用来存放共享的资源,稍后过程中会用到。
private static final ThreadLocal
1、TransactionInterceptor拦截m1方法
2、获取m1方法的事务配置信息:事务管理器bean名称:transactionManager1,事务传播行为:REQUIRED
3、从spring容器中找到事务管理器transactionManager1,然后问一下transactionManager1,当前上下文中有没有事务,显然现在是没有的
4、创建一个新的事务
//获取事务管理器对应的数据源,即dataSource1
DataSource dataSource1 = transactionManager1.getDataSource();
//即从dataSource1中获取一个连接
Connection conn = transactionManager1.dataSource1.getConnection();
//开启事务手动提交
conn.setAutoCommit(false);
//将dataSource1->conn放入map中
map.put(dataSource1,conn);
//将map丢到上面的resources ThreadLocal中
resources.set(map);
5、下面来带m1放的第一行代码:this.jdbcTemplate1.update("insert into user1(name) VALUES ('张三')");
6、jdbctemplate内部需要获取数据连接,获取连接的过程
//从resources这个ThreadLocal中获取到map
Map map = resources.get();
//通过jdbcTemplate1.datasource从map看一下没有可用的连接
Connection conn = map.get(jdbcTemplate1.datasource);
//如果从map没有找到连接,那么重新从jdbcTemplate1.datasource中获取一个
//大家应该可以看出来,jdbcTemplate1和transactionManager1指定的是同一个dataSource,索引这个地方conn是不为null的
if(conn==null){
conn = jdbcTemplate1.datasource.getConnection();
}
7、通过上面第6步获取的conn执行db操作,插入张三
8、下面来到m1方法的第2行代码:service2.m2();
9、m2方法上面也有@Transactional,TransactionInterceptor拦截m2方法
10、获取m2方法的事务配置信息:事务管理器bean名称:transactionManager1,事务传播行为:REQUIRED
11、从spring容器中找到事务管理器transactionManager1,然后问一下transactionManager1,当前上下文中有没有事务,显然是是有的,m1开启的事务正在执行中,所以m2方法就直接加入这个事务了
12、下面来带m2放的第一行代码:this.jdbcTemplate1.update("insert into user1(name) VALUES ('李四')");
13、jdbctemplate内部需要获取数据连接,获取连接的过程
//从resources这个ThreadLocal中获取到map
Map map = resources.get();
//通过jdbcTemplate1.datasource从map看一下没有可用的连接
Connection conn = map.get(jdbcTemplate1.datasource);
//如果从map没有找到连接,那么重新从jdbcTemplate1.datasource中获取一个
//大家应该可以看出来,jdbcTemplate1和transactionManager1指定的是同一个dataSource,索引这个地方conn是不为null的
if(conn==null){
conn = jdbcTemplate1.datasource.getConnection();
}
14、通过第13步获取的conn执行db操作,插入李四
15、最终TransactionInterceptor发现2个方法都执行完毕了,没有异常,执行事务提交操作,如下
//获取事务管理器对应的数据源,即dataSource1
DataSource dataSource1 = transactionManager1.getDataSource();
//从resources这个ThreadLocal中获取到map
Map map = resources.get();
//通过map拿到事务管理器开启的连接
Connection conn = map.get(dataSource1);
//通过conn提交事务
conn.commit();
//管理连接
conn.close();
16、清理ThreadLocal中的连接:通过map.remove(dataSource1)将连接从resource ThreadLocal中移除
17、清理事务
从上面代码中可以看出:整个过程中有 2 个地方需要用到数据库连接 Connection 对象,第 1 个地方是:spring 事务拦截器启动事务的时候会从 datasource 中获取一个连接,通过这个连接开启事务手动提交,第 2 个地方是:最终执行 sql 操作的时候,也需要用到一个连接。那么必须确保这两个连接必须是同一个连接的时候,执行 sql 的操作才会受 spring 事务控制,那么如何确保这 2 个是同一个连接呢?从代码中可以看出必须让事务管理器中的 datasource 和 JdbcTemplate 中的 datasource 必须是同一个,那么最终 2 个连接就是同一个对象。
什么是事务挂起操作?
这里以事务传播行为 REQUIRED_NEW 为例说明一下,REQUIRED_NEW 表示不管当前事务管理器中是否有事务,都会重新开启一个事务,如果当前事务管理器中有事务,会把当前事务挂起。
所谓挂起,你可以这么理解:对当前存在事务的现场生成一个快照,然后将事务现场清理干净,然后重新开启一个新事务,新事务执行完毕之后,将事务现场清理干净,然后再根据前面的快照恢复旧事务。
下面我们再回到本文的内容,多数据源事务管理。
事务管理器如何判断当前是否有事务?
简化版的过程如下:
Map map=resource的ThreadLocal.get();
DataSource datasource = transactionManager.getDataSource();
Connection conn = map.get(datasource);
//如果conn不为空,就表示当前有事务
if(conn!=null){
}
从这段代码可以看出:判断是否存在事务,主要和 datasource 有关,和事务管理器无关,即使是不同的事务管理器,只要事务管理器的 datasource 是一样的,那么就可以发现当前存在的事务。
DataSource: Alibaba Druid
Database: MySQL 5.7
SpringBoot: 2.2.2.RELEASE
ORM: MyBatis
JTA: Atomikos
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!--atomikos transaction management-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.21</version>
</dependency>
简单逻辑,两张表,分别在两个不同的库中,然后一个service方法操作两个库的数据。
第一张表:是账户表
第二章表:是订单表
spring:
application:
name: two-data-source
datasource:
account:
url: jdbc:mysql://127.0.0.1:3306/transaction_account?useSSL=false&characterEncoding=UTF-8
username: root
password: xxxxx
order:
url: jdbc:mysql://127.0.0.1:3306/transaction_order?useSSL=false&characterEncoding=UTF-8
username: root
password: xxxxx
#logging:
# level:
# root: DEBUG
主要包括以下步骤
1、分别注册对应DataSource、SqlSessionFactory、SqlSessionTemplate的Bean
2、然后指定表的Mapper的位置,并且把Mybatis中原有的sqlSessionTemplate设置成你注册的。
需要注意的点:
DataSource不能直接使用Druid提供的DruidDataSource, 需要使用atomikos来包装一下Druid提供的DruidXADataSource,来支持XA规范
如果你不想用Druid,可以考虑使用MysqlXADataSource(我没试过)
注册的Bean的对应关系要正确
```c
@Configuration
@MapperScan(basePackages = {"io.ilss.transaction.twodatasource.dao.account"}, sqlSessionTemplateRef = "accountSqlSessionTemplate")
public class AccountConfiguration {
@Value("${spring.datasource.account.url}")
private String url;
@Value("${spring.datasource.account.username}")
private String username;
@Value("${spring.datasource.account.password}")
private String password;
@Bean(name = "accountDataSource")
public DataSource accountDataSource() {
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
DruidXADataSource druidXADataSource = new DruidXADataSource();
druidXADataSource.setUrl(url);
druidXADataSource.setUsername(username);
druidXADataSource.setPassword(password);
druidXADataSource.setName("druidDataSource-account");
atomikosDataSourceBean.setXaDataSource(druidXADataSource);
atomikosDataSourceBean.setUniqueResourceName("accountResource");
return atomikosDataSourceBean;
}
@Bean(name = "accountSqlSessionFactory")
public SqlSessionFactory accountSqlSessionFactory(DataSource accountDataSource) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(accountDataSource);
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mappers/account/*.xml"));
return factoryBean.getObject();
}
@Bean(name = "accountSqlSessionTemplate")
@Primary
public SqlSessionTemplate accountSqlSessionTemplate(@Qualifier("accountSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
配置正确后会有如下日志信息
c.atomikos.jdbc.AbstractDataSourceBean : AtomikosDataSoureBean 'orderResource': poolSize equals default - this may cause performance problems!
com.alibaba.druid.pool.DruidDataSource : {dataSource-1,druidDataSource-order} inited
c.atomikos.jdbc.AbstractDataSourceBean : AtomikosDataSoureBean 'accountResource': poolSize equals default - this may cause performance problems!
com.alibaba.druid.pool.DruidDataSource : {dataSource-2,druidDataSource-account} inited
c.a.icatch.provider.imp.AssemblerImp : Loaded jar:file:/Users/feng/.m2/repository/com/atomikos/transactions/4.0.6/transactions-4.0.6.jar!/transactions-defaults.properties
c.a.icatch.provider.imp.AssemblerImp : Thanks for using Atomikos! Evaluate http://www.atomikos.com/Main/ExtremeTransactions for advanced features and professional support...略
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.default_max_wait_time_on_shutdown = 9223372036854775807
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.allow_subtransactions = true
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.recovery_delay = 10000
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.automatic_resource_registration = true
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.oltp_max_retries = 5
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.client_demarcation = false
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.threaded_2pc = false
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.serial_jta_transactions = true
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.log_base_dir = /Users/feng/Projects/java/transaction-example/transaction-logs
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.rmi_export_class = none
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.max_actives = 50
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.checkpoint_interval = 500
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.enable_logging = true
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.log_base_name = tmlog
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.max_timeout = 300000
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.trust_client_tm = false
c.a.icatch.provider.imp.AssemblerImp : USING: java.naming.factory.initial = com.sun.jndi.rmi.registry.RegistryContextFactory
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.tm_unique_name = 10.11.11.11.tm
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.forget_orphaned_log_entries_delay = 86400000
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.oltp_retry_interval = 10000
c.a.icatch.provider.imp.AssemblerImp : USING: java.naming.provider.url = rmi://localhost:1099
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.force_shutdown_on_vm_exit = false
c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.default_jta_timeout = 10000
c.a.icatch.provider.imp.AssemblerImp : Using default (local) logging and recovery...
c.a.d.xa.XATransactionalResource : orderResource: refreshed XAResource
c.a.d.xa.XATransactionalResource : accountResource: refreshed XAResource
首先初始化两个Atomikos包裹的Druid的数据源,
然后设置atomikos的参数,都是默认的
最后XAResource刷新
至此,配置完毕,可能有人好奇,JTA的代码一个都没有,因为SpringBoot使用JTA的时候引入的starter做了
简单模拟订单生成支付过程,从账户中扣除一比钱,然后新增一比订单。
编程的方式和Spring事务的方式一毛一样,没什么不同。
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderInfoDAO orderInfoDAO;
@Autowired
private AccountDAO accountDAO;
@Autowired
PlatformTransactionManager transactionManager;
@Override
@Transactional
public String createOrder(OrderInfoDO orderInfoDO) {
AccountDO accountDO = accountDAO.selectByPrimaryKey(orderInfoDO.getAccountId());
if (null == accountDO) {
log.error("createOrder user is not present, accountId: {}", orderInfoDO.getAccountId());
return "用户不存在!";
}
// 用户费用扣除
accountDO.setBalance(accountDO.getBalance().subtract(orderInfoDO.getAmount()));
accountDAO.updateByPrimaryKey(accountDO);
orderInfoDAO.insertSelective(orderInfoDO);
return "成功";
}
@Override
public String createOrderCode(OrderInfoDO orderInfoDO) {
TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
// 获取事务 开始业务执行
TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
try {
AccountDO accountDO = accountDAO.selectByPrimaryKey(orderInfoDO.getAccountId());
if (null == accountDO) {
log.error("createOrder user is not present, accountId: {}", orderInfoDO.getAccountId());
return "用户不存在!";
}
// 用户费用扣除
accountDO.setBalance(accountDO.getBalance().subtract(orderInfoDO.getAmount()));
accountDAO.updateByPrimaryKey(accountDO);
orderInfoDAO.insertSelective(orderInfoDO);
error("createOrderCode error");
transactionManager.commit(transaction);
return "成功";
} catch (Exception e) {
log.error("create order failed, accountId: {}, errMsg: {}", orderInfoDO.getAccountId(), e.getMessage());
transactionManager.rollback(transaction);
}
return "失败";
}
public static void error(String msg) {
throw new RuntimeException(msg);
}
}
mybatis plus从3.3.0开始支持本地多数据源事务,无需第三方。
尝试手动构建数据源结合JTA方案 如https://www.cnblogs.com/cicada-smile/p/13289306.html。
多数据源事务方案一直是一个难题,通常的解决方案有以下二种。
利用atomiks手动构建多数据源事务,适合数据源较少,配置的参数也不太多的项目。难点就是手动配置量大,需要耗费一定时间。
用seata类似的分布式事务解决方案,难点就是需要搭建维护如seata-server的统一管理中心。
不支持spring原生事务,不支持spring事务,不支持spring事务,可分别使用,千万不能混用。
再次强调不支持spring事务注解,可理解成独立写了一套事务方案。
只适合简单本地多数据源场景, 如果涉及异步和微服务等场景,请使用seata方案
在需要切换数据源且需要事务支持的方法上加@DSTransactional.
PS:一般需要分布式事务的场景大多数都是微服务化,个人并不建议在单体项目引入多数据源+分布式事务,有能力尽早拆开,可为过度方案。
seata Github地址https://github.com/seata/seata
seata 文档https://seata.io/zh-cn/docs/overview/what-is-seata.html
seata 示例https://github.com/seata/seata-samples
seata 最新版本
使用mybatisplus来实现,基于上面的集中场景,完成配置式的开发。
支持 数据源分组 ,适用于多种场景 纯粹多库 读写分离 一主多从 混合模式。
支持数据库敏感配置信息 加密 ENC()。
支持每个数据库独立初始化表结构schema和数据库database。
支持无数据源启动,支持懒加载数据源(需要的时候再创建连接)。
支持 自定义注解 ,需继承DS(3.2.0+)。
提供并简化对Druid,HikariCp,BeeCp,Dbcp2的快速集成。
提供对Mybatis-Plus,Quartz,ShardingJdbc,P6sy,Jndi等组件的集成方案。
提供 自定义数据源来源 方案(如全从数据库加载)。
提供项目启动后 动态增加移除数据源 方案。
提供Mybatis环境下的 纯读写分离 方案。
提供使用 spel动态参数 解析数据源方案。内置spel,session,header,支持自定义。
支持 多层数据源嵌套切换 。(ServiceA >>> ServiceB >>> ServiceC)。
提供 **基于seata的分布式事务方案。
提供 本地多数据源事务方案。