• 基于DataX的海量时序数据迁移实战:从MySQL到TDengine3.x


    背景

    MySQL 数据库中,设备历史数据单表过亿,如何快速、低成本地迁移到 TDengine3.x 中?

    从标题可以看出,我们使用的数据迁移/同步工具是 DataX ,数据源( Source )为传统的关系型数据库 MySQL ,目标库( Sink )为新型的具有场景特色的时序数据库 TDengine

    DataX:是阿里云DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。 DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

    MySQL:略。。

    TDengine:是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。 TDengine 能被广泛运用于物联网、工业互联网、车联网、 IT 运维、金融等领域。除核心的时序数据库功能外, TDengine 还提供缓存、数据订阅、流式计算等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。

    MySQLTDengine3.x 进行数据迁移,即面临异构数据的迁移。首先要了解下 MySQLTDengine 的数据模型方面的区别,具体可参考涛思数据官方提供的一个关于电表数据的模型对比:写给MySQL开发者的 TDengine入门指南

    数据模型

    以水库水位监测的案例说明,在 MySQL 中我们会有1张设备信息表(设备编号、厂家、型号等信息)和1张设备数据表(传感器采集的时序数据)。

    2023-05-28-Device.jpg

    2023-05-28-WaterTable.jpg

    针对 MySQL 中的2张表,以 TDengine 的设计思想来建模:在迁移到 TDengine 后会变成1张超级表+N(设备的数量)张子表,且每张子表的名称对应 MySQL 设备信息表中的每个设备编码。具体地来说, TDengine 中的数据模型如下:

    create database if not exists sensor;
    create stable if not exists sensor.water(ts timestamp, level float, status int) tags(district_code nchar(6), unit_id nchar(36), sensor_code int);
    
    • 1
    • 2

    这里仅创建了1张超级表,具体的子表会在进行数据迁移时,根据 MySQL 设备信息表中的设备编码自动创建。

    2023-05-28-Desc.jpg

    准备迁移工具

    一开始我直接从https://github.com/taosdata/DataX的README中的:Download DataX下载地址下载的,但是后来才发现没有 TDengine3.x 版本的writer;然后直接下载https://github.com/taosdata/DataX的源码,本地编译生成了 jar 包,放到了 DataXplugin 目录中。

    2023-05-28-mvn.jpg
    Note:本地源码 mvn clean package -Dmaven.test.skip=true 构建生成 tdengine30writer-0.0.1-SNAPSHOT.jar 后,在 \datax\plugin\writer 下复制 tdenginewriter 目录,重命名为 tdengine30writer ,对应修改了其中的 plugin.jsonplugin_job_template.json ,以及 libs 目录下的 taos-jdbcdriver-3.0.2.jar

    2023-05-28-Plugin.jpg
    至此,工具就准备好了,剩下的就是编写数据迁移的配置脚本了。

    迁移设备信息表

    job-water.json :迁移配置脚本分两部分:一个是数据源,一个是目标库。迁移设备信息表这一步的结果就是创建了所有的子表:一个设备一张表。

    • 数据源
      “name”: “mysqlreader”, 迁移设备信息表时,对设备编码起别名为 tbnameTDengine 自动会将其作为子表的名称进行创建。

    Note:这里在设备编码前加了一个字母d,因为在 TDengine 中表名不可以为数字。

    • 目标库

    “name”: “tdengine30writer”, 在 column 部分罗列出数据源中查询出的列名,与 MySQL 数据源中的顺序和名称一一对应,表名 table 处直接写超级表的名称。

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "root",
                            "password": "your-password",
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:mysql://your-ip:3306/iotdata?useSSL=false&serverTimezone=Asia/Shanghai"
                                    ],
                                    "querySql": [
                                        "select concat('d', code) as tbname, create_time as ts, sensor_code, district_code, unit_id from b_device WHERE sensor_code=2;"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "tdengine30writer",
                        "parameter": {
                            "username": "root",
                            "password": "taosdata",
                            "column": [
                                "tbname",
                                "ts",
                                "sensor_code",
                                "district_code",
                                "unit_id"
                            ],
                            "connection": [
                                {
                                    "table": [
                                        "water"
                                    ],
                                    "jdbcUrl": "jdbc:TAOS-RS://192.168.44.158:6041/sensor"
                                }
                            ],
                            "batchSize": 1000,
                            "ignoreTagsUnmatched": true
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 1
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 执行迁移/同步脚本
    D:\datax\bin>datax.py ../job/job-water.json
    
    • 1

    迁移设备数据表

    job-water-data.json :迁移配置脚本分两部分:一个是数据源,一个是目标库。迁移设备数据表这一步的结果便会将传感器数据根据设备编号写入对应的子表中。

    • 数据源

    迁移设备数据表时,查询传感器采集的字段,同样对设备编码起别名为 tbnameTDengine 自动会将数据写入对应的子表。

    • 目标库

    column 部分罗列出数据源中查询出的列名,与 MySQL 数据源中的顺序和名称一一对应,配置设备数据表时,需要注意,表名 table 处要写所有子表的名称。

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "root",
                            "password": "your-password",
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:mysql://your-ip:3306/iotdata?useSSL=false&serverTimezone=Asia/Shanghai&net_write_timeout=600"
                                    ],
                                    "querySql": [
                                        "select concat('d', code) as tbname, create_time as ts, value as level, status from sensor_water;"
                                    ]
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "tdengine30writer",
                        "parameter": {
                            "username": "root",
                            "password": "taosdata",
                            "column": [
                                "tbname",
                                "ts",
                                "level",
                                "status"
                            ],
                            "connection": [
                                {
                                    "table": [
                                        "d66057408201830",
                                        "d66057408063030",
                                        "d66057408027630",
                                        "d66057408208130",
                                        "d66057408009630",
                                        "d66057408000530",
                                        "d66057408067330",
                                        "d66057408025430"
                                    ],
                                    "jdbcUrl": "jdbc:TAOS-RS://192.168.44.158:6041/sensor"
                                }
                            ],
                            "encoding": "UTF-8",
                            "batchSize": 1000,
                            "ignoreTagsUnmatched": true
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 1
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 执行迁移/同步脚本
    D:\datax\bin>datax.py ../job/job-water-data.json
    
    • 1

    使用DataX可能遇到的问题

    DataX中文乱码

    执行 D:\datax\bin>datax.py ../job/job.json 后,控制台上的中文输出乱码。

    • Solution:直接输入CHCP 65001回车,直到打开新窗口出现Active code page: 65001,再次执行job命令,中文正常显示。

    2023-05-28-SubTable.jpg

    插件加载失败, 未完成指定插件加载:[mysqlreader, tdengine20writer]

    • Solution:使用的插件名称要写正确

    com.alibaba.datax.common.exception. DataXException: Code:[TDengineWriter-00], Description:[parameter value is missing]. - The parameter [username] is not set.

    • Solution:TDengine2.0和3.0的配置项不一样,因为我一开始是采用的TDengine2.0的配置来迁移的,根据3.0的文档修改参数即可。

    java.lang. ClassCastException: java.lang. String cannot be cast to java.util. List

    • Solution:mysql的reader读取部分的jdbcUrl和querySql的值需要使用“[]”括起来,是jdbc固定的模板。

    com.alibaba.datax.common.exception. DataXException: Code:[TDengineWriter-02], Description:[runtime exception]. - No suitable driver found for [“jdbc: TAOS-RS://192.168.44.158:6041/sensor”]

    • Solution:writer这边的"jdbcUrl": “jdbc: TAOS-RS://192.168.44.158:6041/sensor”,使用字符串而不是数组。

    空指针错误:ERROR WriterRunner - Writer Runner Received Exceptions:

    java.lang.NullPointerException: null
            at com.taosdata.jdbc.rs.RestfulDriver.connect(RestfulDriver.java:111) ~[taos-jdbcdriver-2.0.37.jar:na]
            at java.sql.DriverManager.getConnection(Unknown Source) ~[na:1.8.0_311]
            at java.sql.DriverManager.getConnection(Unknown Source) ~[na:1.8.0_311]
            at com.alibaba.datax.plugin.writer.tdenginewriter.DefaultDataHandler.handle(DefaultDataHandler.java:75) ~[tdenginewriter-0.0.1-SNAPSHOT.jar:na]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • Solution:看到taos-jdbcdriver用的是2.0的jar包,下载DataX源码,编译生成tdengine30writer-0.0.1-SNAPSHOT.jar,并拷贝tdenginewriter文件夹为tdengine30writer,将tdengine30writer-0.0.1-SNAPSHOT.jar放到tdengine30writer中,删除tdenginewriter\libs其中taos-jdbcdriver-2.0.37.jar,添加taos-jdbcdriver-3.0.2.jar。

    com.alibaba.datax.common.exception. DataXException: Code:[TDengineWriter-02], Description:[运行时异常]. - TDengine ERROR (2600): sql: describe 66057408201830, desc: syntax error near “66057408201830”

    • Solution:表名不可以为数字,我在编号前加了个字母d。

    com.mysql.jdbc.exceptions.jdbc4. CommunicationsException: Application was streaming results when the connection failed. Consider raising value of ‘net_write_timeout’ on the server.

    • Solution:在数据源URL的连接上增加该参数,net_write_timeout/net_read_timeout设置稍微大一些,默认60s。
      例如: jdbc:mysql://your-ip:3306/iotdata?useSSL=false&serverTimezone=Asia/Shanghai&net_write_timeout=600

    MySQL 中查看变量值: SHOW VARIABLES LIKE "net%"

    2023-05-28-NetParam.jpg

    小总结

    以上便是基于 DataX 完成从 MySQLTDengine3.x 的时序数据迁移实战记录,借助 DataX 工具,通过配置文件驱动的方式完成了海量时序数据的快速迁移。

    实际的迁移测试结果是,3000+个水库水位传感设备,历史数据单表1亿+,半天时间迁移了5000万+。

    Reference


    If you have any questions or any bugs are found, please feel free to contact me.

    Your comments and suggestions are welcome!

  • 相关阅读:
    Premiere2024中Beat Edit插件点击没反应怎么办
    STM32开发(三十一)STM32F103 片内资源 —— 模拟/数字转换 DAC 正弦波 编程详解
    图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理
    Java基础-----正则表达式
    数据湖是什么?数据湖架构及应用(完)
    二叉树路径问题+递归+有关题目
    VBE6EXT.OLB不能被加载问题解决方法
    C++: 模板初阶
    链表经典题带刷(内含精华:链表深拷贝)
    Python 基础合集2:字符串格式化
  • 原文地址:https://blog.csdn.net/u013810234/article/details/130910778