业务需求
业务上需要将公司其他系统下的一些数据同步到当前系统中,以下是需求和面临的问题:
- 在当前系统和其他系统存在键冲突,对于name这个字段,其他系统允许重复,当前系统不允许重复;
- 其他系统的数据同步不是一次性过程,是对其他系统的数据一个增量的持续操作;
- 其他系统的数据在转换到本系统的数据时需要添加本系统的主键;
- 对于重复键数据做合并;
- 考虑数据是否会重复落库;
- 初次数据同步数据量在 100万左右,后续增量数据数据量较小可能连续几天或者几十天就只有几条数据变动;对于增量数据实时性要求不是特别高;
思考方向
- 数据的一致性问题;
- 失败重试,和日常调用处理增量数据;
- 数据一致性校对;
- 如何捞取增量数据和初始化的全量数据;
- 重复键问题;
- 内存上限;
解决方案
- 增对数据一致性使用单机事务,确保当次失败数据事务一起回滚;
- 分布式任务调用,这里使用的单机调用方式,配置corn 表达式的方式,在指定时间调用单机运行同步任务(上面说了只有在第一次的时候会很久,数据量很大)
- 单次加载所有重复键数据(名称),执行一次同步;捞取数据方式同下;
- 单次加载所有符合需求的数据ID(这里给需要同步的表加上了 execute_time 为空或者小于更新时间的数据);
- 按照条数分配线程执行;(单线程执行条数逻辑 : (总数/批量新增或更新阈值 )/ 空闲线程数 = 单线程需要执行总数;如果新增或更新阈值 > 总条数 单线程执行;
- 通过countdownLatch 等待线程全部执行完毕,获取批量失败数据ID,做一次记录;
- 由于使用了分布式调度任务,这里如果批量失败数据不为空,可以给分布式任务调度返回执行失败,重复调用该任务,同时设置最大重试次数,达到最大次数以后,分布式任务平台同样会报警,这时候多次失败的数据可以从日志中查询到,手动处理这些异常数据(多次尝试失败,多半是数据有问题);
注意点
- 我这里基于mysql 8.0和mybatis-plus 3.x ;mysql 单条sql是有长度限制的,使用拼接的批量插入或者更新,需要临时开启mysql 的这个限制;
- 实测 mybatis-plus 3.x 提供的批量插入方法没有手动拼接的sql 快;
- 注意计算线程数和单线程执行总数的关系,单个线程内是有事务回滚的,所以一批插入尽量不要太多,否则回滚的也会很多;