• DataX做数据迁移的几个技巧


    1. 背景

    最近在做系统重构,由原来的单体系统重构成分布式微服务系统,原始是单个数据库,也要按业务进行数据库的垂直拆分,便于以后的业务发展扩展和数据量大了之后的分片处理。因为规范落实不够,导致之前的表和字段有部分不够规范。主要有一下的两个问题:

    • 字段类型:时间上用了int存放时间戳,表示时间,但是有2038年的上限。
    • 字段名:不够见名知义,不统一,比如表示用户ID,有的地方用了uid,有的地方用了user_id。
      这导致系统和代码不够简洁,并且存在隐患。

    2. 解决之道

    2.1 技术选型

    采用DataX官方链接)作为可平行复制的数据迁移,原因有一些几方面:

    • 阿里巴巴开源的Java工具:方便调试和查看底层原理。
    • 可并行处理:针对于数据量较大的表,可以分段后并行迁移。
    • 迁移脚本Json化:每一个表都对应一个Json数据文件,清晰明了。
    • 扩展灵活:可以自己实现转换器。

    示例脚本:

    {
      "job": {
        "setting": {
          "speed": {
            "channel": 5
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0.02
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "username": "root",
                "password": "123456",
                "splitPk": "id",
                "column": [],
                "connection": [
                  {
                    "querySql": [
                      "SELECT `id`, `mobile` FROM user WHERE 1"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://127.0.0.1:3306/test_db"
                    ]
                  }
                ]
              }
            },
            "writer": {
              "name": "mysqlwriter",
              "parameter": {
                "writeMode": "insert",
                "username": "root",
                "password": "123456",
                "column": [
                  "id",
                  "mobile"
                ],
                "preSql": [
                  ""
                ],
                "connection": [
                  {
                    "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_db?characterEncoding=utf-8&com.mysql.jdbc.faultInjection.serverCharsetIndex=45",
                    "table": [
                      "user"
                    ]
                  }
                ]
              }
            }
          }
        ]
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    2.2 字段类型问题

    时间戳int格式,转换成dataetime。
    DataX指定reader的列的方式有两种(只有一种有效):

    • column:可以是,*或者某一些字段
    • querySql:类似于,select a,b,c from tbl_1;;

    column方式适合于两个表的字段数和名字一致的场景。querySql就比较灵活。当字段名一致,但是类型不一致的时候可以使用SQL做转换即可。
    select id,from_unixtime(createtime) from tbl_1;
    这样reader读取到的就是可以写入writer的数据格式。

    2.3 字段名字问题

    当字段名更改时,也可以采用querySql方式,将字段做一个别名即可:
    如原字段为createtime,新字段为create_time
    select id,from_unixtime(createtime) as create_time from tbl_1;

    2.4 字段值内容转换

    对于数据的裁剪和转换,DataX有Transformer工具可以实现:

    • dx_substr:从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
    • dx_pad:如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
    • dx_replace:从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
    • dx_filter :如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。
    • dx_groovy:groovy表达式处理。

    当然还可以自定义 Transformer 实现所需功能。

    3. 总结

    在做系统重构,无法避免数据迁移,对于可以直接复制的数据,可以选型DataX作为迁移工具。灵活结合SQL语法和DataX的插件可以满足我们大部分需求。

  • 相关阅读:
    企业微信机器人对接GPT
    攻防世界Encode
    12--Django-批量插入数据、分页原理、分页器的使用
    部署并应用ByteTrack实现目标跟踪
    .sql数据库导入错误:/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */
    网页基本标签
    NLP入门之——Word2Vec词向量Skip-Gram模型代码实现(Pytorch版)
    序列查询 CSP202112-1
    【自动驾驶】ROS小车系统
    afl-cov计算代码覆盖率
  • 原文地址:https://blog.csdn.net/oschina_41731918/article/details/126384809