• 一个注解解决ShardingJdbc不支持复杂SQL


    背景介绍

    公司最近做分库分表业务,接入了 Sharding JDBC,接入完成后,回归测试时发现好几个 SQL 执行报错,关键这几个表都还不是分片表。报错如下:

    这下糟了嘛。熟悉 Sharding JDBC 的同学应该知道,有很多 SQL 它是不支持的。官方截图如下:

    如果要去修改这些复杂 SQL 的话,可能要花费很多时间。那怎么办呢?只能从 Sharding JDBC 这里找突破口了,两天的研究,出来了下面这个只需要加一个注解轻松解决 Sharding Jdbc 不支持复杂 SQL 的方案。

    问题复现

    我本地写了一个复杂 SQL 进行测试:

    1. public List<Map<String, Object>> queryOrder(){
    2. List<Map<String, Object>> orders = borderRepository.findOrders();
    3. return orders;
    4. }
    1. public interface BOrderRepository extends JpaRepository<BOrder,Long> {
    2. @Query(value = "SELECT * FROM (SELECT id,CASE WHEN company_id =1 THEN '小' WHEN company_id=4 THEN '中' ELSE '大' END AS com,user_id as userId FROM b_order0) t WHERE t.com ='中'",nativeQuery =true)
    3. List<Map<String, Object>> findOrders();
    4. }

    写了个测试 controller 来调用,调用后果然报错了。

    解决思路

    因为查询的复杂 SQL 的表不是分片表,那能不能指定这几个复杂查询的时候不用 Sharding JDBC 的数据源呢?

    1. 在注入 Sharding JDBC 数据源的地方做处理,注入一个我们自定义的数据源
    2. 这样我们获取连接的时候就能返回原生数据源了
    3. 另外我们声明一个注解,对标识了注解的就返回原生数据源,否则还是返回 Sharding 数据源

    具体实现

    1. 编写一个 autoConfig 类,来替换 ShardingSphereAutoConfiguration 类
    1. /**
    2. * 动态数据源核心自动配置类
    3. *
    4. *
    5. */
    6. @Configuration
    7. @ComponentScan("org.apache.shardingsphere.spring.boot.converter")
    8. @EnableConfigurationProperties(SpringBootPropertiesConfiguration.class)
    9. @ConditionalOnProperty(prefix = "spring.shardingsphere", name = "enabled", havingValue = "true", matchIfMissing = true)
    10. @AutoConfigureBefore(DataSourceAutoConfiguration.class)
    11. public class DynamicDataSourceAutoConfiguration implements EnvironmentAware {
    12. private String databaseName;
    13. private final SpringBootPropertiesConfiguration props;
    14. private final Map<String, DataSource> dataSourceMap = new LinkedHashMap<>();
    15. public DynamicDataSourceAutoConfiguration(SpringBootPropertiesConfiguration props) {
    16. this.props = props;
    17. }
    18. /**
    19. * Get mode configuration.
    20. *
    21. * @return mode configuration
    22. */
    23. @Bean
    24. public ModeConfiguration modeConfiguration() {
    25. return null == props.getMode() ? null : new ModeConfigurationYamlSwapper().swapToObject(props.getMode());
    26. }
    27. /**
    28. * Get ShardingSphere data source bean.
    29. *
    30. * @param rules rules configuration
    31. * @param modeConfig mode configuration
    32. * @return data source bean
    33. * @throws SQLException SQL exception
    34. */
    35. @Bean
    36. @Conditional(LocalRulesCondition.class)
    37. @Autowired(required = false)
    38. public DataSource shardingSphereDataSource(final ObjectProvider<List<RuleConfiguration>> rules, final ObjectProvider modeConfig) throws SQLException {
    39. Collection<RuleConfiguration> ruleConfigs = Optional.ofNullable(rules.getIfAvailable()).orElseGet(Collections::emptyList);
    40. DataSource dataSource = ShardingSphereDataSourceFactory.createDataSource(databaseName, modeConfig.getIfAvailable(), dataSourceMap, ruleConfigs, props.getProps());
    41. return new WrapShardingDataSource((ShardingSphereDataSource) dataSource,dataSourceMap);
    42. }
    43. /**
    44. * Get data source bean from registry center.
    45. *
    46. * @param modeConfig mode configuration
    47. * @return data source bean
    48. * @throws SQLException SQL exception
    49. */
    50. @Bean
    51. @ConditionalOnMissingBean(DataSource.class)
    52. public DataSource dataSource(final ModeConfiguration modeConfig) throws SQLException {
    53. DataSource dataSource = !dataSourceMap.isEmpty() ? ShardingSphereDataSourceFactory.createDataSource(databaseName, modeConfig, dataSourceMap, Collections.emptyList(), props.getProps())
    54. : ShardingSphereDataSourceFactory.createDataSource(databaseName, modeConfig);
    55. return new WrapShardingDataSource((ShardingSphereDataSource) dataSource,dataSourceMap);
    56. }
    57. /**
    58. * Create transaction type scanner.
    59. *
    60. * @return transaction type scanner
    61. */
    62. @Bean
    63. public TransactionTypeScanner transactionTypeScanner() {
    64. return new TransactionTypeScanner();
    65. }
    66. @Override
    67. public final void setEnvironment(final Environment environment) {
    68. dataSourceMap.putAll(DataSourceMapSetter.getDataSourceMap(environment));
    69. databaseName = DatabaseNameSetter.getDatabaseName(environment);
    70. }
    71. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    72. @Bean
    73. @ConditionalOnProperty(prefix = "spring.datasource.dynamic.aop", name = "enabled", havingValue = "true", matchIfMissing = true)
    74. public Advisor dynamicDatasourceAnnotationAdvisor() {
    75. DynamicDataSourceAnnotationInterceptor interceptor = new DynamicDataSourceAnnotationInterceptor(true);
    76. DynamicDataSourceAnnotationAdvisor advisor = new DynamicDataSourceAnnotationAdvisor(interceptor, DS.class);
    77. return advisor;
    78. }
    79. }
    1. 自定义数据源
    1. public class WrapShardingDataSource extends AbstractDataSourceAdapter implements AutoCloseable{
    2. private ShardingSphereDataSource dataSource;
    3. private Map dataSourceMap;
    4. public WrapShardingDataSource(ShardingSphereDataSource dataSource, Map dataSourceMap) {
    5. this.dataSource = dataSource;
    6. this.dataSourceMap = dataSourceMap;
    7. }
    8. public DataSource getTargetDataSource(){
    9. String peek = DynamicDataSourceContextHolder.peek();
    10. if(StringUtils.isEmpty(peek)){
    11. return dataSource;
    12. }
    13. return dataSourceMap.get(peek);
    14. }
    15. @Override
    16. public Connection getConnection() throws SQLException {
    17. return getTargetDataSource().getConnection();
    18. }
    19. @Override
    20. public Connection getConnection(final String username, final String password) throws SQLException {
    21. return getConnection();
    22. }
    23. @Override
    24. public void close() throws Exception {
    25. DataSource targetDataSource = getTargetDataSource();
    26. if (targetDataSource instanceof AutoCloseable) {
    27. ((AutoCloseable) targetDataSource).close();
    28. }
    29. }
    30. @Override
    31. public int getLoginTimeout() throws SQLException {
    32. DataSource targetDataSource = getTargetDataSource();
    33. return targetDataSource ==null ? 0 : targetDataSource.getLoginTimeout();
    34. }
    35. @Override
    36. public void setLoginTimeout(final int seconds) throws SQLException {
    37. DataSource targetDataSource = getTargetDataSource();
    38. targetDataSource.setLoginTimeout(seconds);
    39. }
    40. }
    1. 声明指定数据源注解
    1. @Target({ElementType.TYPE, ElementType.METHOD})
    2. @Retention(RetentionPolicy.RUNTIME)
    3. @Documented
    4. public @interface DS {
    5. /**
    6. * 数据源名
    7. */
    8. String value();
    9. }
    1. 另外使用 AOP 的方式拦截使用了注解的类或方法,并且要将这些用了注解的方法存起来,在获取数据源连接的时候取出来进行判断。这就还要用到 ThreadLocal。

    aop 拦截器:

    1. public class DynamicDataSourceAnnotationInterceptor implements MethodInterceptor {
    2. private final DataSourceClassResolver dataSourceClassResolver;
    3. public DynamicDataSourceAnnotationInterceptor(Boolean allowedPublicOnly) {
    4. dataSourceClassResolver = new DataSourceClassResolver(allowedPublicOnly);
    5. }
    6. @Override
    7. public Object invoke(MethodInvocation invocation) throws Throwable {
    8. String dsKey = determineDatasourceKey(invocation);
    9. DynamicDataSourceContextHolder.push(dsKey);
    10. try {
    11. return invocation.proceed();
    12. } finally {
    13. DynamicDataSourceContextHolder.poll();
    14. }
    15. }
    16. private String determineDatasourceKey(MethodInvocation invocation) {
    17. String key = dataSourceClassResolver.findKey(invocation.getMethod(), invocation.getThis());
    18. return key;
    19. }
    20. }

    aop 切面定义:

    1. /**
    2. * aop Advisor
    3. */
    4. public class DynamicDataSourceAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
    5. private final Advice advice;
    6. private final Pointcut pointcut;
    7. private final Class annotation;
    8. public DynamicDataSourceAnnotationAdvisor(MethodInterceptor advice,
    9. Class annotation) {
    10. this.advice = advice;
    11. this.annotation = annotation;
    12. this.pointcut = buildPointcut();
    13. }
    14. @Override
    15. public Pointcut getPointcut() {
    16. return this.pointcut;
    17. }
    18. @Override
    19. public Advice getAdvice() {
    20. return this.advice;
    21. }
    22. @Override
    23. public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    24. if (this.advice instanceof BeanFactoryAware) {
    25. ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
    26. }
    27. }
    28. private Pointcut buildPointcut() {
    29. Pointcut cpc = new AnnotationMatchingPointcut(annotation, true);
    30. Pointcut mpc = new AnnotationMethodPoint(annotation);
    31. return new ComposablePointcut(cpc).union(mpc);
    32. }
    33. /**
    34. * In order to be compatible with the spring lower than 5.0
    35. */
    36. private static class AnnotationMethodPoint implements Pointcut {
    37. private final Class annotationType;
    38. public AnnotationMethodPoint(Class annotationType) {
    39. Assert.notNull(annotationType, "Annotation type must not be null");
    40. this.annotationType = annotationType;
    41. }
    42. @Override
    43. public ClassFilter getClassFilter() {
    44. return ClassFilter.TRUE;
    45. }
    46. @Override
    47. public MethodMatcher getMethodMatcher() {
    48. return new AnnotationMethodMatcher(annotationType);
    49. }
    50. private static class AnnotationMethodMatcher extends StaticMethodMatcher {
    51. private final Class annotationType;
    52. public AnnotationMethodMatcher(Class annotationType) {
    53. this.annotationType = annotationType;
    54. }
    55. @Override
    56. public boolean matches(Method method, Class targetClass) {
    57. if (matchesMethod(method)) {
    58. return true;
    59. }
    60. // Proxy classes never have annotations on their redeclared methods.
    61. if (Proxy.isProxyClass(targetClass)) {
    62. return false;
    63. }
    64. // The method may be on an interface, so let's check on the target class as well.
    65. Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
    66. return (specificMethod != method && matchesMethod(specificMethod));
    67. }
    68. private boolean matchesMethod(Method method) {
    69. return AnnotatedElementUtils.hasAnnotation(method, this.annotationType);
    70. }
    71. }
    72. }
    73. }
    1. /**
    2. * 数据源解析器
    3. *
    4. */
    5. public class DataSourceClassResolver {
    6. private static boolean mpEnabled = false;
    7. private static Field mapperInterfaceField;
    8. static {
    9. Class<?> proxyClass = null;
    10. try {
    11. proxyClass = Class.forName("com.baomidou.mybatisplus.core.override.MybatisMapperProxy");
    12. } catch (ClassNotFoundException e1) {
    13. try {
    14. proxyClass = Class.forName("com.baomidou.mybatisplus.core.override.PageMapperProxy");
    15. } catch (ClassNotFoundException e2) {
    16. try {
    17. proxyClass = Class.forName("org.apache.ibatis.binding.MapperProxy");
    18. } catch (ClassNotFoundException ignored) {
    19. }
    20. }
    21. }
    22. if (proxyClass != null) {
    23. try {
    24. mapperInterfaceField = proxyClass.getDeclaredField("mapperInterface");
    25. mapperInterfaceField.setAccessible(true);
    26. mpEnabled = true;
    27. } catch (NoSuchFieldException e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. }
    32. /**
    33. * 缓存方法对应的数据源
    34. */
    35. private final Map<Object, String> dsCache = new ConcurrentHashMap<>();
    36. private final boolean allowedPublicOnly;
    37. /**
    38. * 加入扩展, 给外部一个修改aop条件的机会
    39. *
    40. * @param allowedPublicOnly 只允许公共的方法, 默认为true
    41. */
    42. public DataSourceClassResolver(boolean allowedPublicOnly) {
    43. this.allowedPublicOnly = allowedPublicOnly;
    44. }
    45. /**
    46. * 从缓存获取数据
    47. *
    48. * @param method 方法
    49. * @param targetObject 目标对象
    50. * @return ds
    51. */
    52. public String findKey(Method method, Object targetObject) {
    53. if (method.getDeclaringClass() == Object.class) {
    54. return "";
    55. }
    56. Object cacheKey = new MethodClassKey(method, targetObject.getClass());
    57. String ds = this.dsCache.get(cacheKey);
    58. if (ds == null) {
    59. ds = computeDatasource(method, targetObject);
    60. if (ds == null) {
    61. ds = "";
    62. }
    63. this.dsCache.put(cacheKey, ds);
    64. }
    65. return ds;
    66. }
    67. /**
    68. * 查找注解的顺序
    69. * 1. 当前方法
    70. * 2. 桥接方法
    71. * 3. 当前类开始一直找到Object
    72. * 4. 支持mybatis-plus, mybatis-spring
    73. *
    74. * @param method 方法
    75. * @param targetObject 目标对象
    76. * @return ds
    77. */
    78. private String computeDatasource(Method method, Object targetObject) {
    79. if (allowedPublicOnly && !Modifier.isPublic(method.getModifiers())) {
    80. return null;
    81. }
    82. //1. 从当前方法接口中获取
    83. String dsAttr = findDataSourceAttribute(method);
    84. if (dsAttr != null) {
    85. return dsAttr;
    86. }
    87. Class<?> targetClass = targetObject.getClass();
    88. Class<?> userClass = ClassUtils.getUserClass(targetClass);
    89. // JDK代理时, 获取实现类的方法声明. method: 接口的方法, specificMethod: 实现类方法
    90. Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass);
    91. specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    92. //2. 从桥接方法查找
    93. dsAttr = findDataSourceAttribute(specificMethod);
    94. if (dsAttr != null) {
    95. return dsAttr;
    96. }
    97. // 从当前方法声明的类查找
    98. dsAttr = findDataSourceAttribute(userClass);
    99. if (dsAttr != null && ClassUtils.isUserLevelMethod(method)) {
    100. return dsAttr;
    101. }
    102. //since 3.4.1 从接口查找,只取第一个找到的
    103. for (Class<?> interfaceClazz : ClassUtils.getAllInterfacesForClassAsSet(userClass)) {
    104. dsAttr = findDataSourceAttribute(interfaceClazz);
    105. if (dsAttr != null) {
    106. return dsAttr;
    107. }
    108. }
    109. // 如果存在桥接方法
    110. if (specificMethod != method) {
    111. // 从桥接方法查找
    112. dsAttr = findDataSourceAttribute(method);
    113. if (dsAttr != null) {
    114. return dsAttr;
    115. }
    116. // 从桥接方法声明的类查找
    117. dsAttr = findDataSourceAttribute(method.getDeclaringClass());
    118. if (dsAttr != null && ClassUtils.isUserLevelMethod(method)) {
    119. return dsAttr;
    120. }
    121. }
    122. return getDefaultDataSourceAttr(targetObject);
    123. }
    124. /**
    125. * 默认的获取数据源名称方式
    126. *
    127. * @param targetObject 目标对象
    128. * @return ds
    129. */
    130. private String getDefaultDataSourceAttr(Object targetObject) {
    131. Class<?> targetClass = targetObject.getClass();
    132. // 如果不是代理类, 从当前类开始, 不断的找父类的声明
    133. if (!Proxy.isProxyClass(targetClass)) {
    134. Class<?> currentClass = targetClass;
    135. while (currentClass != Object.class) {
    136. String datasourceAttr = findDataSourceAttribute(currentClass);
    137. if (datasourceAttr != null) {
    138. return datasourceAttr;
    139. }
    140. currentClass = currentClass.getSuperclass();
    141. }
    142. }
    143. // mybatis-plus, mybatis-spring 的获取方式
    144. if (mpEnabled) {
    145. final Class<?> clazz = getMapperInterfaceClass(targetObject);
    146. if (clazz != null) {
    147. String datasourceAttr = findDataSourceAttribute(clazz);
    148. if (datasourceAttr != null) {
    149. return datasourceAttr;
    150. }
    151. // 尝试从其父接口获取
    152. return findDataSourceAttribute(clazz.getSuperclass());
    153. }
    154. }
    155. return null;
    156. }
    157. /**
    158. * 用于处理嵌套代理
    159. *
    160. * @param target JDK 代理类对象
    161. * @return InvocationHandler 的 Class
    162. */
    163. private Class<?> getMapperInterfaceClass(Object target) {
    164. Object current = target;
    165. while (Proxy.isProxyClass(current.getClass())) {
    166. Object currentRefObject = AopProxyUtils.getSingletonTarget(current);
    167. if (currentRefObject == null) {
    168. break;
    169. }
    170. current = currentRefObject;
    171. }
    172. try {
    173. if (Proxy.isProxyClass(current.getClass())) {
    174. return (Class<?>) mapperInterfaceField.get(Proxy.getInvocationHandler(current));
    175. }
    176. } catch (IllegalAccessException ignore) {
    177. }
    178. return null;
    179. }
    180. /**
    181. * 通过 AnnotatedElement 查找标记的注解, 映射为 DatasourceHolder
    182. *
    183. * @param ae AnnotatedElement
    184. * @return 数据源映射持有者
    185. */
    186. private String findDataSourceAttribute(AnnotatedElement ae) {
    187. AnnotationAttributes attributes = AnnotatedElementUtils.getMergedAnnotationAttributes(ae, DS.class);
    188. if (attributes != null) {
    189. return attributes.getString("value");
    190. }
    191. return null;
    192. }
    193. }

    ThreadLocal:

    1. public final class DynamicDataSourceContextHolder {
    2. /**
    3. * 为什么要用链表存储(准确的是栈)
    4. *
    5. * 为了支持嵌套切换,如ABC三个service都是不同的数据源
    6. * 其中A的某个业务要调B的方法,B的方法需要调用C的方法。一级一级调用切换,形成了链。
    7. * 传统的只设置当前线程的方式不能满足此业务需求,必须使用栈,后进先出。
    8. *
  • */
  • private static final ThreadLocal<Deque<String>> LOOKUP_KEY_HOLDER = new NamedThreadLocal>("dynamic-datasource") {
  • @Override
  • protected Deque<String> initialValue() {
  • return new ArrayDeque<>();
  • }
  • };
  • private DynamicDataSourceContextHolder() {
  • }
  • /**
  • * 获得当前线程数据源
  • *
  • * @return 数据源名称
  • */
  • public static String peek() {
  • return LOOKUP_KEY_HOLDER.get().peek();
  • }
  • /**
  • * 设置当前线程数据源
  • *

  • * 如非必要不要手动调用,调用后确保最终清除
  • *

  • *
  • * @param ds 数据源名称
  • */
  • public static String push(String ds) {
  • String dataSourceStr = StringUtils.isEmpty(ds) ? "" : ds;
  • LOOKUP_KEY_HOLDER.get().push(dataSourceStr);
  • return dataSourceStr;
  • }
  • /**
  • * 清空当前线程数据源
  • *

  • * 如果当前线程是连续切换数据源 只会移除掉当前线程的数据源名称
  • *

  • */
  • public static void poll() {
  • Deque<String> deque = LOOKUP_KEY_HOLDER.get();
  • deque.poll();
  • if (deque.isEmpty()) {
  • LOOKUP_KEY_HOLDER.remove();
  • }
  • }
  • /**
  • * 强制清空本地线程
  • *

  • * 防止内存泄漏,如手动调用了push可调用此方法确保清除
  • *

  • */
  • public static void clear() {
  • LOOKUP_KEY_HOLDER.remove();
  • }
  • }
    1. 启动类上做如下配置:

    引入我们写的自动配置类,排除 ShardingJdbc 的自动配置类。

    1. @SpringBootApplication(exclude = ShardingSphereAutoConfiguration.class)
    2. @Import({DynamicDataSourceAutoConfiguration.class})
    3. public class ShardingRunApplication {
    4. public static void main(String[] args) {
    5. SpringApplication.run(ShardingRunApplication.class);
    6. }
    7. }

    最后,我们给之前写的 Repository 加上注解:

    1. public interface BOrderRepository extends JpaRepository<BOrder,Long> {
    2. @DS("slave0")
    3. @Query(value = "SELECT * FROM (SELECT id,CASE WHEN company_id =1 THEN '小' WHEN company_id=4 THEN '中' ELSE '大' END AS com,user_id as userId FROM b_order0) t WHERE t.com ='中'",nativeQuery =true)
    4. List<Map<String, Object>> findOrders();
    5. }

    再次调用,查询成功!!!

  • 相关阅读:
    MySQL5.7读写分离
    每天五分钟机器学习:神经网络和支持向量机的基础——感知机模型
    刷题笔记20——各种顺序的二叉树构造
    RabbitMQ:hello结构
    图解MySQL中的JOIN类型
    cadence SPB17.4 - 中文UI设置
    计数排序算法
    SpringCloud 常见问题
    Spring Security 6.1.x 系列 (1)—— 初识Spring Security
    一文看懂推荐系统:特征交叉01:Factorized Machine (FM) 因式分解机
  • 原文地址:https://blog.csdn.net/ch98000/article/details/126781038