今天分享一款非常实用的异构数据库迁移工具,可以进行多种常见数据库之间的数据或结构迁移,相比于其他的数据迁移工具,它还支持字段索引,默认值,字段约束的信息同步,mysql 的主键递增等,支持基于正则表达式转换的表名与字段名映射转换。
同步方式分为全量和增量同步两种类型,同步算法借鉴了 kettle 的数据同步算法,支持手动和定时执行。目前支持的数据库类型有 MySQL/MariaDB/Oracle/SQL Server/PostgreSQL/Greenplum/DB2/DMDB/Kingbase8/HighGo/Hive/SQLite,后续会不断开放更多的数据库类型。(源码文档获取方式见文末)
提供源端数据库向目的端数据库的批量迁移同步功能,支持数据的全量和增量方式同步。包括:
支持字段类型、主键信息、建表语句等的转换,并生成建表SQL语句。
支持基于正则表达式转换的表名与字段名映射转换。
支持字段索引,默认值,字段约束的信息同步。
基于JDBC的分批次读取源端数据库数据,分批次写入目的数据库。
支持有主键表的增量变更同步,算法借鉴了 kettle 的基于主键的同步算法。
middle-service-etl-web – 前端工程[8080]
middle-ground-api – 后端工程
├── middle-common – 系统公共模块
├ ├── middle-core – 公共核心包,控制环境变量
├ ├── middle-db-core – 数据库核心包
├ ├── middle-dbswtich-- 数据库迁移核心组件包
├ ├── middle-http – 网络相关组件包
├── middle-eureka – 注册中心[8610]
├── middle-gateway – Spring Cloud Gateway网关[8082]
├── middle-service-etl-- 数据库迁移业务代码
该系统的核心代码是它的数据迁移的相关组件包,位于 middle-common 下的 middle-dbswtich 中,其中 middle.ground.dbswitch.data.service.MigrationService 和middle.ground.dbswitch.data.handler.MigrationHandler 两个类是数据迁移的主要逻辑和算法相关的代码,通过调用 MigrationService 的 run 方法可以启动数据迁移,然后异步调用 MigrationHandler 的 get 方法运行相关逻辑,完成数据迁移。感兴趣的朋友可以自行查阅相关代码,静下心来看就能捋顺其中的逻辑。
jdk 1.8(特别注意: 在Java9及以上版本默认情况下不允许应用程序查看来自JDK的所有类,但在数据迁移组件中利用反射计算对象的字节大小,所以需要在JVM启动时需要增加如下参数:)
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED
数据库版本为 mysql5.7 及以上版本
导入 db 文件夹下的 sql 脚本:middle_service_etl.sql
把系统导入 idea 中,等待 maven 依赖下载完毕
修改 middle-service-etl 模块的 yml 配置文件(配置文件生效配置在 middle-common/middle-core 的 yml 文件中指定),把 mysql 的用户名密码改成自己数据库的用户名和密码
依次执行 middle-eureka 下的 EurekaApplication,middle-gateway 下的 GatewayApplication,middle-service-etl 下的 MiddleServiceEtlApplication
本地安装稳定版 node.js,把 middle-service-etl-web 导入 hbuidx 或 vscode,执行 npm install 命令,然后执行 npm run dev 即可,若运行报错,安装 jsx 依赖,再次执行 npm run dev 即可:
npm install babel-plugin-transform-vue-jsx
npm install babel-helper-vue-jsx-merge-props
npm install babel-plugin-syntax-jsx
访问 http://localhost:8080
配置参数 | 配置说明 | 示例 | 备注 |
---|---|---|---|
dbswitch.source[i].url | 来源端JDBC连接的URL | jdbc:oracle:thin:@127.0.0.1:1521:orcl | 可为:oracle/mysql/mariadb/sqlserver/postgresql/db2/dm/kingbase8/highgo 等 |
dbswitch.source[i].driver-class-name | 来源端数据库的驱动类名称 | oracle.jdbc.driver.OracleDriver | 对应数据库的驱动类 |
dbswitch.source[i].username | 来源端连接帐号名 | root | 无 |
dbswitch.source[i].password | 来源端连接帐号密码 | root | 无 |
dbswitch.source[i].fetch-size | 来源端数据库查询时的fetch_size设置 | 10000 | 需要大于100 有效 |
dbswitch.source[i].source-schema | 来源端的schema名称 | test | 多个之间用英文逗号分隔 |
dbswitch.source[i].source-includes | 来源端schema下的表中需要包含的表名称 | users1 | 支持多个表(多个之间用英文逗号分隔);支持正则表达式(不能含有逗号) |
dbswitch.source[i].source-excludes | 来源端schema下的表中需要过滤的表名称 | users,orgs | 不包含的表名称,多个之间用英文逗号分隔 |
dbswitch.source[i].regex-table-mapper | 基于正则表达式的表名称映射关系 | [{“from-pattern”: “^”,“to-value”: “T_”}] | 为list类型,元素存在顺序关系 |
dbswitch.source[i].regex-column-mapper | 基于正则表达式的字段名映射关系 | [{“from-pattern”: “$”,“to-value”: “_x”}] | 为list类型,元素存在顺序关系 |
dbswitch.target.url | 目的端JDBC连接的URL | jdbc:postgresql://127.0.0.1:5432/test | 可为:oracle/sqlserver/postgresql/greenplum/mysql/mariadb/db2/dm/kingbase8/highgo 等 |
dbswitch.target.driver-class-name | 目的端 数据库的驱动类名称 | org.postgresql.Driver | 对应数据库的驱动类 |
dbswitch.target.username | 目的端连接帐号名 | root | 无 |
dbswitch.target.password | 目的端连接帐号密码 | root | 无 |
dbswitch.target.target-schema | 目的端的schema名称 | public | 目的端的schema名称只能有且只有一个 |
dbswitch.target.target-drop | 是否执行先drop表然后create表命令,当target.datasource-target.drop=true时有效 | true | 可选值为:true、false |
dbswitch.target.create-table-auto-increment | 是否执启用支持create表时主键自增(只支持mysql) | true | 可选值为:true、false |
dbswitch.target.writer-engine-insert | 是否使用insert写入数据 | false | 可选值为:true为insert写入、false为copy写入,只针对目的端数据库为PostgreSQL/Greenplum的有效 |
dbswitch.target.change-data-sync | 是否启用增量变更同步,dbswitch.target.target-drop为 false 时且表有主键情况下有效 | false | 可选值为:true、false |
注意:
dbswitch.source[i]
为数组类型,i为编号,从0开始的整数;dbswitch.source[i].source-includes
不为空,则按照包含表的方式来执行;dbswitch.source[i].source-includes
为空,则按照dbswitch.source[i].source-excludes
排除表的方式来执行。dbswitch.target.target-drop=false
,dbswitch.target.change-data-synch=true
;时会对有主键表启用增量变更方式同步regex-table-mapper
和regex-column-mappe
,为基于正则表达式替换的表名映射和字段名映射,均可以为空(代表原名映射,即源的表t_a映射到目的端也为t_a)提示:如果要将源端所有表名(或者字段名)添加前缀,可以配置
"from-pattern": "^","to-value": "T_"
;
MySQL/MariaDB数据库
jdbc连接地址:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&autoReconnect=true&failOverReadOnly=false&nullNamePatternMatchesAll=true&useSSL=false&rewriteBatchedStatements=true
jdbc驱动名称: com.mysql.jdbc.Driver
jdbc连接地址:jdbc:mariadb://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true&tinyInt1isBit=false&rewriteBatchedStatements=true&useCompression=true
jdbc驱动名称: org.mariadb.jdbc.Driver
Oracle数据库
jdbc连接地址:jdbc:oracle:thin:@127.0.0.1:1521:ORCL 或 jdbc:oracle:thin:@//127.0.0.1:1521/ORCL
jdbc驱动名称:oracle.jdbc.driver.OracleDriver
SQL Server(>=2005)数据库
jdbc连接地址:jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test
jdbc驱动名称:com.microsoft.sqlserver.jdbc.SQLServerDriver
PostgreSQL/Greenplum数据库
jdbc连接地址:jdbc:postgresql://127.0.0.1:5432/test
jdbc驱动名称:org.postgresql.Driver
DB2数据库
jdbc连接地址:jdbc:db2://127.0.0.1:50000/testdb:driverType=4;fullyMaterializeLobData=true;fullyMaterializeInputStreams=true;progressiveStreaming=2;progresssiveLocators=2;
jdbc驱动名称:com.ibm.db2.jcc.DB2Driver
达梦DMDB数据库
jdbc连接地址:jdbc:dm://127.0.0.1:5236
jdbc驱动名称:dm.jdbc.driver.DmDriver
人大金仓Kingbase8数据库
jdbc连接地址:jdbc:kingbase8://127.0.0.1:54321/MYTEST
jdbc驱动名称:com.kingbase8.Driver
翰高HighGo数据库(可按PostgreSQL使用)
jdbc连接地址:jdbc:postgresql://127.0.0.1:5866/highgo
jdbc驱动名称:org.postgresql.Driver
Apache Hive数据库
jdbc连接地址:jdbc:hive2://172.17.2.12:10000/default
jdbc驱动名称:org.apache.hive.jdbc.HiveDriver
注意:当前只支持hive version 3.x的账号密码认证方式。
SQLite数据库
jdbc连接地址:jdbc:sqlite:/tmp/test.db 或者 jdbc:sqlite::resource:http://172.17.2.12:8080/test.db
jdbc驱动名称:org.sqlite.JDBC
数据迁移相关组件可以引入到其他项目中单独使用,把 middle-common 下的 middle-dbswtich 模块安装到 maven 中(在文件夹下执行 mvn clean install)
引入 maven 依赖:
<dependency>
<artifactId>middle-dbswtichartifactId>
<groupId>middle-groundgroupId>
<version>1.0version>
dependency>
代码开发示例:
DbswichProperties dbswichProperties = new DbswichProperties();
//构建源表参数
SourceDataSourceProperties dataSourceProperties = new SourceDataSourceProperties();
List<SourceDataSourceProperties> sourceProperties = new ArrayList<>(10);
sourceProperties.add(dataSourceProperties);
dbswichProperties.setSource(sourceProperties);
dataSourceProperties.setUrl("jdbc:oracle:thin:@127.0.0.1:1521:orcl");
dataSourceProperties.setDriverClassName(DbType.ORACLE.getDriveClassName());
dataSourceProperties.setUsername("TEST");
dataSourceProperties.setPassword("root");
dataSourceProperties.setFetchSize(10000);
dataSourceProperties.setSourceSchema("TEST");
//表名映射关系
List<PatternMapper> tableNameMapper = new ArrayList<>(1);
PatternMapper patternMapper = new PatternMapper();
patternMapper.setFromPattern("test");
patternMapper.setToValue("sync_test");
tableNameMapper.add(patternMapper);
dataSourceProperties.setRegexTableMapper(tableNameMapper);
//字段映射关系
List<PatternMapper> columnMapper = new ArrayList<>(10);
dataSourceProperties.setRegexColumnMapper(columnMapper);
PatternMapper columnPattern = new PatternMapper();
columnPattern.setFromPattern("test");
columnPattern.setToValue("test");
columnMapper.add(columnPattern);
PatternMapper columnPattern1 = new PatternMapper();
columnPattern.setFromPattern("test1");
//设置为null会不同步这个字段
columnPattern.setToValue(null);
columnMapper.add(columnPattern1);
//目标表参数
TargetDataSourceProperties targetDataSourceProperties = new TargetDataSourceProperties();
dbswichProperties.setTarget(targetDataSourceProperties);
targetDataSourceProperties.setUrl("jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&autoReconnect=true&failOverReadOnly=false&nullNamePatternMatchesAll=true&useSSL=false&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true");
targetDataSourceProperties.setDriverClassName(DbType.MYSQL.getDriveClassName());
targetDataSourceProperties.setUsername("root");
targetDataSourceProperties.setPassword("root");
targetDataSourceProperties.setTargetSchema("test");
targetDataSourceProperties.setTargetDrop(true);
targetDataSourceProperties.setIndexCreate(true);
targetDataSourceProperties.setLowercase(true);
//主键自动递增
targetDataSourceProperties.setCreateTableAutoIncrement(true);
targetDataSourceProperties.setChangeDataSync(false);
MigrationService service = new MigrationService(dbswichProperties);
//运行数据同步
service.run();
经测试,该 ETL 系统的同步效率跟市面上主流的 ETL 工具无异,但如果有大字段类型比如:text,clob 等,同步速度会明显降低,这一点需要注意。通过集成系统的数据同步模块,也可以将其应用到自己的系统之中,对于大数据应用,数据处理系统有着不错的使用和参考价值,代码模块划分明确,方便进行二次开发。
好了,本文到这里就结束了,希望对你有所帮助,我们下次更新,再见!
关注公众号 螺旋编程极客 可进群讨论,反馈相关问题,进群备注 “etl交流”。