在一些应用场景中,分片条件并不存在于SQL,而存在于外部业务逻辑。因此需要提供一种通过外部业务代码配置指定路由的一种方式,在ShardingSphere中叫做Hint。如果使用hint强制路由,那么sql将无视原有的分片逻辑,直接路由到指定的数据节点上操作。
Hint使用场景:
数据分片操作,例如分片键没有在SQL或数据表中,而是在业务逻辑代码中;
读写分离操作,例如强制在主库进行某些数据操作。
在resources目录下创建配置文件application-hint-database.properties:
# datasource
spring.shardingsphere.datasource.names=ds0,ds1
# master
spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://192.168.126.21:3306/zzx1?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=123456
# slave0
spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://192.168.126.22:3306/zzx1?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=123456
# hint
spring.shardingsphere.sharding.tables.city.database-strategy.hint.algorithm-class-name=com.zzx.hint.MyHintShardingAlgorithm
即配置数据源以及强制路由算法
创建一个强制路由算法实现类MyHintShardingAlgorithm:
public class MyHintShardingAlgorithm implements HintShardingAlgorithm<Integer> {
@Override
public Collection<String> doSharding(Collection<String> collection, HintShardingValue<Integer> hintShardingValue) {
ArrayList<String> result = new ArrayList();
//数据源datasource集合
for (String each : collection)
{
//路由键集合
for (Integer value : hintShardingValue.getValues())
{
if(each.endsWith(String.valueOf(value%2)))
{
result.add(each);
}
}
}
return result;
}
}
在主配置文件application.properties中指定配置文件的名称:
#指定Sharding-JDBC配置文件的名称
spring.profiles.active=hint-database
创建一个测试类TestHint:
@SpringBootTest(classes = RunBoot.class)
public class TestHint {
@Resource
private CityRepository cityRepository;
@Test
public void test1()
{
HintManager instance = HintManager.getInstance();
instance.setDatabaseShardingValue(1); //强制路由到ds${xx%2}
List<City> cityList = cityRepository.findAll();
cityList.forEach(city -> {
System.out.println(city.getId()+" "+city.getName()+" "+city.getProvince());
});
}
}
创建c_user表,在可视化工具或者Mysql命令行执行如下:
CREATE TABLE `c_user` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`name` varchar(256) DEFAULT NULL,
`pwd_plain` varchar(256) DEFAULT NULL,
`pwd_cipher` varchar(256) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
在com.zzx.entity包下,创建一个CUser类,添加如下:
@Entity
@Table(name="c_user")
@Data
public class CUser implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name="name")
private String name;
@Column(name="pwd")
private String pwd;
}
在com.zzx.repository包下,创建一个接口CUserRepository,添加如下:
public interface CUserRepository extends JpaRepository<CUser,Long> {
}
在resources目录下,创建配置文件application-encryptor.properties,添加如下:
# datasource
spring.shardingsphere.datasource.names=ds0
# ds0
spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://192.168.126.21:3306/zzx1?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=123456
#encryptor 密码列加密
spring.shardingsphere.encrypt.tables.c_user.columns.pwd.plain-column=pwd_plain
spring.shardingsphere.encrypt.tables.c_user.columns.pwd.cipher-column=pwd_cipher
spring.shardingsphere.encrypt.encryptors.my_pwd.type=aes
spring.shardingsphere.encrypt.encryptors.my_pwd.props.aes.key.value=123456
spring.shardingsphere.encrypt.tables.c_user.columns.pwd.encryptor=my_pwd
# 主键生成器
# c_user
spring.shardingsphere.sharding.tables.c_user.key-generator.column=id
spring.shardingsphere.sharding.tables.c_user.key-generator.type=SNOWFLAKE
# false即直接匹配pwd_plain,反之找pwd_cipher进行解密。默认为true
#spring.shardingsphere.props.query.with.cipher.column=false
修改主配置文件application.properties,指定配置文件:
#指定Sharding-JDBC配置文件的名称
spring.profiles.active=encryptor
在test目录创建一个测试类TestEncryptor,添加一条数据:
@SpringBootTest(classes = RunBoot.class)
public class TestEncryptor {
@Resource
private CUserRepository cUserRepository;
@Test
public void testAdd()
{
CUser cUser = new CUser();
cUser.setName("zzx");
cUser.setPwd("856");
cUserRepository.save(cUser);
}
}
即会将pwd列进行一个加密,加过密的放在密文列pwd_cipher,未加密的放在明文列pwd_plain。
在CUserRepository接口添加如下:
List<CUser> findByPwd(String pwd);
在TestEncryptor类,查询数据:
@Test
public void testFind()
{
List<CUser> cUsers = cUserRepository.findByPwd("856");
cUsers.forEach(cUser -> {
System.out.println(cUser.getId()+" "+cUser.getName()+" "+cUser.getPwd());
});
}
2PC模式(强一致性)
2PC是Two-Phase Commit缩写,即两阶段提交,就是将事务的提交过程分为两个阶段来处理。事务的发起者称协调者,事务的执行者称参与者。协调者统一协调参与者执行。
2PC方案实现起来简单,实际项目中使用比较少,主要因为以下问题:
3PC模式(强一致性)
3PC(三阶段提交),是两阶段提交的改进版本,与两阶段提交不同的是,引入超时机制。同时在协调者和参与者中都引入超时机制。三阶段提交将两阶段的准备阶段拆分为两个阶段,插入了一个preCommit阶段,解决了原先在两阶段提交中,参与者在准备之后,由于协调者或参与者发生崩溃或错误,而导致参与者无法知晓并处于长时间等待的问题。如果在指定的时间内,协调者没有收到参与者的消息则默认失败。
相比2PC模式,3PC模式降低了阻塞范围,在等待超时后协调者或参与者会中断事务。避免了协调者单点故障问题,阶段三中,协调者出现问题时(如网络中断等),参与者会继续提交事务。
ShardingSphere整合了XA,为分布式事务控制提供了极大的便利,可以在应用程序编程时,采用以下统一模式进行使用。
引入Maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/sharding-transaction-xa-core -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-transaction-xa-core</artifactId>
<version>4.1.1</version>
</dependency>
ShardingSphere默认的XA事务管理器为Atomikos,可以通过在项目的classpath中添加jta.properties(即Resources目录)来定制Atomikos配置项。具体配置如下:
#指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
com.atomikos.icatch.enable_logging=true
#JTA/XA资源是否应该自动注册
com.atomikos.icatch.automatic_resource_registration=true
#JTA事务的默认超时时间,默认为10000ms
com.atomikos.icatch.default_jta_timeout=10000
#事务的最大超时时间,默认为300000ms。这表示事务超时时间由
#UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间
com.atomikos.icatch.max_timeout=300000
#指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。
com.atomikos.icatch.threaded_2pc=false
#指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用
#UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactionsreached”异常信息,表示超出最大事务数限制
com.atomikos.icatch.max_actives=50
#是否支持subtransaction,默认为true
com.atomikos.icatch.allow_subtransactions=true
#指定在可能的情况下,是否应该join子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)
com.atomikos.icatch.serial_jta_transactions=true
#指定JVM关闭时是否强制(force)关闭事务管理器,默认为false
com.atomikos.icatch.force_shutdown_on_vm_exit=false
#在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE
com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807
#========= 日志记录配置=======
#事务日志目录,默认为./。
com.atomikos.icatch.log_base_dir=./
#事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。
com.atomikos.icatch.log_base_name=tmlog
#指定两次checkpoint的时间间隔,默认为500
com.atomikos.icatch.checkpoint_interval=500
#=========日志恢复配置=============
#指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000
#指定两次恢复扫描之间的延迟时间。默认值为与#com.atomikos.icatch.default_jta_timeout相同
com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
#提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5
com.atomikos.icatch.oltp_max_retries=5
#提交失败时,每次重试的时间间隔,默认10000ms
com.atomikos.icatch.oltp_retry_interval=10000
在test目录下创建一个类TestShardingTransaction,添加如下:
@SpringBootTest(classes = RunBoot.class)
public class TestShardingTransaction {
@Resource
private PositionRepository positionRepository;
@Resource
private PositionDetailRepository positionDetailRepository;
@Test
@Transactional
//@ShardingTransactionType(TransactionType.XA)
public void testAdd()
{
TransactionTypeHolder.set(TransactionType.XA);
for (int i = 1; i <= 20; i++) {
Position position = new Position();
//position.setId((long)i);
position.setName("zzx"+i);
position.setSalary(2000d);
position.setCity("shenzhen");
positionRepository.save(position);
if(i==3)
throw new RuntimeException("人造异常");
PositionDetail positionDetail = new PositionDetail();
positionDetail.setPid(position.getId());
positionDetail.setDescription("description"+i);
positionDetailRepository.save(positionDetail);
}
}
}
即使用XA事务来回滚或提交,并制造一个异常进行测试。使用注解或者使用代码的形式来指定事务。
修改application.properties主配置文件,指定数据源等配置文件:
#指定Sharding-JDBC配置文件的名称
spring.profiles.active=sharding-database
此时运行结果有异常,事务会进行回滚,所以表及其子表都没有数据。