• 一次分表的技术方案分享


    我的博客阅读本文

    1. 业务场景

    主要考虑两个因素:

    • 纯粹的数据量角度:

      • 单表数据量过大已经影响查询性能(查看当前数据量)
      • 数据量增长迅速,防患于未然(查看近一月数据量增长情况,对未来做出预期)

      我们目前存在一个数据表1600w+数据,月增长200w左右。

    • 客户实际感知的业务响应速度带来的直观用户体验影响

      我们线上这个千万级的表查询速度已经5s+

    2. 方案

    2.1. 分表方式

    预算这里已经单独使用一个MySQL实例,拥有不用与其他服务共享的,完全的CPU和磁盘IO性能,理论上性能够用,无需再做分库处理。

    大多数场景依然还是单表数据量大,扫描行过多,增加了不必要的CPU负担。

    水平分表,减少单表扫描行数,应该是较为简单快速的提升性能的一个方式。同时由于单表数据量变小,后续执行DDL时,也能更迅速。

    因此使用水平分表。

    2.2. 分表算法与分片键的选择。

    分表时必须将“同一个客户”的数据落到一个表中,避免触发全表路由,不符预期。

    **作为SaaS业务,我们采用企业的订阅号(企业编码)字段作为分片键。**对“同一个客户”的业务理解是字符串类型的企业编码相同的数据,这个字段也是一个表中的字段。

    这里采用简单的哈希取模的方式,即:

    Math.abs(企业编码.hashCode() % table_number)
    
    • 1

    缺点:

    • 每家客户的数据量不一样,可能出现几个分表的“数据倾斜”的问题。

    优点:

    • 简单易实现,能够保证同一客户数据落到一个表中的业务预期。
    • 通过一致性哈希算法来实现,后期扩容影响范围小,减少业务影响

    后续优化方向上可以考虑单独pick出一些数据量大的企业,路由到单独的表中。

    根据一致性hash算法,table_number我们需要约定一个固定的值,也就是一致性哈希环中的最大允许数量,使用2的幂数,因此可以用位运算取代模运算提高性能(参考HashMap的容量计算逻辑):

    Math.abs(企业编码.hashCode() & (table_number - 1))
    
    • 1

    2.3. 分表数量

    分表的数量上,以3年作为一个业务展望时间:

    • 4500w数据 → 300w一张表 → 15张表 → 取2的4次方 → 16张表 → 表的序号从0到15

    一致性hash环的数量,扩大2个量级,取2的6次方64,应该已经绰绰有余了,因此我们有了一个具体的分表算法来计算这个表序号:

    int temp = Math.abs(企业编码.hashCode() & (64 - 1));
    return temp / (64 / 16);
    
    // ==> 进一步优化 
    return (Math.abs(企业编码.hashCode() & 63)) / 4;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    后期假设第0张表数据量过多,我们需要二次分表第0张表扩容,我们假设增加一个序号为16的分表,只需要对表0做一次数据迁移即可,其他表按照一致性hash算法,不会发生分表变化。分表算法修改为:

    int hashMod = Math.abs(企业编码.hashCode() & 63;
    int temp = hashMod / 4;
    if (0 == temp) {
    	return hashMod / 2 == 0 ? 0 : 16;
    } else {
    	return temp;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如果序号为2的幂数,这里的算法会更简单,也是更标准的一致性hash算法,不过相应的影响的表数量也会更多,不做展开。

    2.4. 迁移数据

    2.4.1. 数据迁移技术选型

    水平分表本质上是将原来一个表的数据按照分表算法将数据分配到n个分表中,因此存在数据迁移问题,即原表→若干分表

    • 阿里云DTS工具(开源版Canal

      由于需要支持分表算法,一个思路是在MySQL中实现一个Java的String#hashCode方法,然后在迁移数据过程中对数据进行清洗,过滤符合当前分表算法的数据进入指定分表。

      但是很遗憾,DTS工具商业版暂不支持该功能,后续Canal可能可以通过代码的方式支持,这一块待完善。

      Untitled

    • 阿里云Dataworks工具(开源版DataX

      这个工具本质是一个大数据治理平台,支持自定义函数(可以用Java语言写UDF),能够满足需求。

    选型上,只有Dataworks能够完成需求,下文说明如何通过阿里云Dataworks工具完成数据迁移。

    1. 首先,需要将上文的分表算法用Java语言描述,需要开发一个【UDF】发布到阿里云的Dataworks的函数上:

      1. 我这里使用的是Function Studio的方式发布上去的,代码参考:

        package com.alibaba.dataworks.udf;
        import com.aliyun.odps.udf.UDF;
        
        public class DataTransfer extends UDF {
            public Integer evaluate(String s) {
                int hash = s.hashCode();
                return (Math.abs(hash & 63)) / 4;
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9

        需要注意的是,Function Studio后续不再维护,阿里云官方鼓励自己上传jar的方式:

        Untitled

    2. 用【ODPS SQL】创建两个ODPS表,一个用来从源库同步全量数据,一个用来存储分表数据,这里的数据是写文件的,速度很快:

      Untitled

      DROP TABLE IF EXISTS log_odps;
      CREATE TABLE IF NOT EXISTS log_odps(
      	...
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;
      
      DROP TABLE IF EXISTS log_odps_sharding;
      CREATE TABLE IF NOT EXISTS log_odps_sharding(
      	...
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    3. 用【数据集成】从MySQL数据源同步数据到ODPS中,需要注意这里的清理规则建议选择“Insert Overwrite”,这样后续可以重复执行不需要清理ODPS数据,分区可以随便填写一个字符串:

      Untitled

      Untitled

    4. 同步到ODPS数据后,用【ODPS SQL】,可以用“create … as …”语法,将ODPS中的数据同步到一个新表中,每个分表为一个分区:

      set odps.sql.allow.fullscan=true;
      insert overwrite table log_odps_sharding partition(pt)
      select `(pt)?+.+`, DataTransfer(企业编码) as pt from log_odps;
      
      • 1
      • 2
      • 3
      1. set odps.sql.allow.fullscan=true 开启全表扫描。也可以指定下面的select语句的分区,否则需要开启全表扫描(只有一个分区,无意义)
      2. insert overwrite 写数据前清理数据。
      3. (pt)?+.+除了pt字段以外的字段(Hive SQL这里分区字段会作为SELECT *的一列返回,因此需要排除)
      4. DataTransfer为我们定义的UDF函数,这里我们直接调用函数的结果作为log_odps_sharding的分区字段
      5. 至此,我们的log_odps数据会分布在log_odps_sharding对各个分区中,分区字段则为分表的表序号。
    5. 用【数据集成】将ODPS数据写回原数据源。

      Untitled

      1. 这里的pt值,按照上面的分表算法,我这里直接写1,2,3之类的分表序号即可
      2. 主键冲突策略选择“on duplicate key update”策略,这样可以多次执行实行不停机迁移(下文说明)

    2.4.2. 不停机迁移

    不停机迁移的方案基本上是两个步骤:

    • 全量数据迁移(数据量大,比较慢,需要提前执行,可以在发版前执行)
    • 增量数据同步(数据量小,比较快,可以在发版后执行)

    需要注意,这里增量数据同步追平全量数据时:

    • 会有一段时间的数据不一致的。这个如果有很高的要求可以尝试“实时计算”同步的方案,尽可能缩小增量数据同步时间也就缩小了数据不一致的时间区间(dataworks中有【实时同步】选项)
    • 最终一致性。当增量数据同步完成,追平全量数据的时刻,数据是一致的。

    不停机迁移这里由于Dataworks是即时读取数据源数据,而不是像DTS那样可以感知到binlog,因此需要保证数据的修改有“标记”:

    • 数据行上要有逻辑删字段
    • 数据行上要有代表修改时间的字段

    在Dataworks上的具体实践:

    1. 全量数据迁移:按照上文流程走即可

    2. 增量数据迁移:

      1. 在新发布的分表代码中,需要对数据进行**“双写”,即:新写入/修改的数据,在对对应分表发生操作的同时,也对原始表进行一次同样的操作,二者需要保证原子性。(如果是同数据库,可以使用本地事务),在此基础上,我们就可以确定:原数据表一定是最新的数据。**
      2. 在上文迁移方案中的第三步“用【数据集成】从MySQL数据源同步数据到ODPS中”过程中,我们可以指定只同步修改时间大于“全量数据同步开始的时间点”的数据
      3. 其余流程不变,再次执行即可。

      由于我们遇到相同数据时选择了“on duplicate key update”策略,Dataworks会帮助我们update数据。由于我们加入了逻辑删字段,即时数据发生了删除,Dataworks也会理解为一次update,因此不会发生数据没有正常删除的问题。

    2.5. 异常回滚

    由于上述不停机迁移过程中采用了“双写”,因此这里一旦发生异常,只需要切回异常前代码与配置即可。

    2.6. 框架选择

    市面上分表的框架还是比较多的,比如kingshard**,**ShardingSphere,Mycat等。

    这次选型不严谨选用ShardingSphere下的ShardingSphere,主要出于以下考虑:

    • 社区活跃度上,ShardingSphere的社区活跃度很高。
    • ShardingSphere-JDBC无需额外部署服务,JAR包方式接入。
    • 隔壁小伙伴有使用的ShardingSphere经验,技术栈一致,减少维护和学习成本。

    ShardingSphere下又有几个产品,ShardingSphere-JDBC,Sharding-Proxy,Sharding-Sidecar(规划中):

    Untitled

    这里选择了ShardingSphere-JDBC,主要考虑ShardingSphere-JDBC运维成本比较低,无需部署额外的中心化服务,去中心化,分表的配置主动权在开发这里。

    当然ShardingSphere-JDBC也会存在一些缺点:

    • 不支持弹性伸缩,每次调整分片算法后,需要将数据手动迁移到各分片上。

      4.1.0版本之后,提供了Sharding-Scaling方案解决弹性伸缩问题,只支持Sharding-Proxy方案。

      Untitled

    2.7. ShardingSphere-JDBC使用流程

    Maven坐标(SpringBoot starter方式接入):

    <dependency>
        <groupId>org.apache.shardingspheregroupId>
        <artifactId>sharding-jdbc-spring-boot-starterartifactId>
    		<version>latest.versionversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    配置参考:

    # sharding-jdbc
    # datasource config
    spring.shardingsphere.datasource.names = ds0
    spring.shardingsphere.datasource.ds0.type = com.alibaba.druid.pool.DruidDataSource
    spring.shardingsphere.datasource.ds0.url = jdbc:mysql:xxx
    spring.shardingsphere.datasource.ds0.username = xxx
    spring.shardingsphere.datasource.ds0.password = xxx
    
    # sharding table config
    
    spring.shardingsphere.sharding.tables.[table_name].actual-data-nodes = ${['ds0']}.[table_name]_$->{0..15}
    spring.shardingsphere.sharding.tables.[table_name].table-strategy.standard.sharding-column = [shard_key]
    spring.shardingsphere.sharding.tables.[table_name].table-strategy.standard.precise-algorithm-class-name = com.maycur.budget.config.sharding.MaycurPreciseShardingAlgorithm
    
    # show sql
    spring.shardingsphere.props.sql.show = true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • spring.shardingsphere.datasource.xx 指定数据源配置
    • spring.shardingsphere.sharding.tables.xx 指定分表配置
    • spring.shardingsphere.props.sql.show 展示实际路由的分表的SQL

    这里分片的策略是自己写了一个类:

    import java.util.Collection;
    import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
    import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
    
    /**
     * Sharding algorithm.
     *
     * @author masaiqi
     * @date 2022/7/18 17:39
     */
    public class MaycurPreciseShardingAlgorithm implements PreciseShardingAlgorithm<String> {
        @Override
        public String doSharding(Collection<String> collection,
            PreciseShardingValue<String> preciseShardingValue) {
            String value = preciseShardingValue.getValue();
            int shardingKey = getShardingKey(value);
    
            return collection.toArray()[shardingKey].toString();
        }
    
        private int getShardingKey(String originKey) {
            return Math.abs(originKey.hashCode() & 63) / 4;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    需要注意的是,这个类无需交给Spring IOC管理,Sharding-JDBC会负责实例化对象:

    public static <T extends ShardingAlgorithm> T newInstance(final String shardingAlgorithmClassName, final Class<T> superShardingAlgorithmClass) {
        Class<?> result = Class.forName(shardingAlgorithmClassName);
        if (!superShardingAlgorithmClass.isAssignableFrom(result)) {
            throw new ShardingSphereException("Class %s should be implement %s", shardingAlgorithmClassName, superShardingAlgorithmClass.getName());
        }
        return (T) result.newInstance();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    备选方案:分片的策略也可以使用ShardingSphere的行表达式,不过分片key为字符串时,groovy不允许直接取模,需要转换为字符串,再加上没有Java中的String#hashcode方法,比较复杂。

    spring.shardingsphere.sharding.tables.[tablename].table-strategy.inline.algorithm-expression=[groovy expression]
    
    • 1

    Done!启动运行。

    3. 参考

  • 相关阅读:
    MySql查询慢-如何提高查询速度呢?--解决方法
    引用 随笔
    痛心:实验室服务器被黑挖矿怎么办?
    天翼云Web应用防火墙(边缘云版)支持检测和拦截Apache Spark shell命令注入漏洞
    #边学边记 必修5 高项:对人管理 第1章 项目人力资源管理 之 项目团队组建
    读书笔记:JavaScript DOM 编程艺术(第二版)
    【Qt】Qt下配置OpenCV
    抓到Dubbo异步调用的小BUG,再送你一个贡献开源代码的机会
    OpenAI重大更新!为ChatGPT推出语音和图像交互功能
    揭秘小程序上线不到一周,每天2万销售额,究竟怎么做到的?
  • 原文地址:https://blog.csdn.net/qq_20021569/article/details/126234225