• Datax 数据同步-使用总结(二)


    一、前言

    这部分主要记录 datax 实现增量同步的方案。

    二、核心思路

    结合datax 提供的preSql、 postSql以及占位符,外加另外一张表同步日志表来记录相关同步信息。

    三、版本迭代

    3.1 初版本

    where tbq.opera_date > cast(date_format(DATE_SUB(NOW(), interval 5 minute), '%Y%m%d%H%i%s000') as unsigned)"
    
    • 1

    这个版本,是直接以执行时时间为时间戳。
    缺点,显而易见。当同步时间比较久的时候,5 分钟就远远不够。

    3.2 版本

    阅读 datax 的使用说明里,对于 mysql 的写,支持 presql 和 postsql 的方式。

    因此考虑新建一个表,

    1. 在同步之前,利用 preSql,往该表中插入一条数据记录,记录同步开始时间。
    2. 同步完成后,利用 postSql 更新当前同步的这条记录,记录同步结束时间
    3. 读取时,从该表中获取上次同步开始时间的数据,作为同步时间戳。
      最终 json 脚本变成如下
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "column": [
                                xxxx
    			],
                            "connection": [
                                {
                                    xxxx
                                }
                            ],
    			"where":" update_date > (select l.sync_start_date from sys_sync_log l where l.sync_business_type = 'gongdan' and l.sync_result = 1 order by l.sync_start_date desc limit 1)",
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "column": [
                                xxx
    			],
                            "connection": [
                              {
                                    xxxx
                              }
                            ],
    			"preSql":[
    				"insert into sys_sync_log(sync_start_date,sync_result) values(now(),2)"
    			],
    			"postSql":[
    				"update sys_sync_log l set l.sync_end_date = now(),l.sync_result = 1 where l.id = ( select t.id from (select l1.id from sys_sync_log l1 where l1.sync_result = 2 order by l1.sync_start_date desc limit 1) t )"
    			],
    			"writerMode":"replace"
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": "5"
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    此版本相对于上个版本,时间戳的获取上,比较固定,能避免因为同步代码问题,导致时间戳获取不准。

    3.3 版本

    上述版本写的相对复杂,需要先查询当前同步记录之后,再更新同步结束时间。无法保证一致性,即preSql 的插入的记录和 postSql 更新记录,可能不是同一个记录。
    再结合 datax 的占位符特性,可以将记录的主键由外部传入。
    因此 json 脚本变成

    {
    	"preSql":[
    		"insert into sys_sync_log(id,sync_start_date,sync_result) values('${logId}',now(),2)"
    	],
    	"postSql":[
    		"update sys_sync_log l set l.sync_end_date = now(),l.sync_result = 1 where l.id = '${logId}'"
    	],
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    其中 ${logId}为占位符
    liunx 中通过 uuidgen 命令可以获取 uuid。
    因此执行同步脚本时,参考如下命令执行即可

    python ../bin/datax.py -p "-DlogId=`uuidgen`" ./ssss.json
    
    • 1

    其中-p “-DlogId=uuidgen” 为获取 uuid,并传给 sss.json中
    这个版本,可以保证 preSql 和 postSql 处理的记录,是同一条。

    四、扩展

    应该还有更优方案,还需继续研究。

  • 相关阅读:
    Hbase简介
    怎么恢复永久删除的文件?这3个方法很实用!
    【图像融合】差异的高斯:一种简单有效的通用图像融合方法[用于融合红外和可见光图像、多焦点图像、多模态医学图像和多曝光图像](Matlab代码实现)
    达梦(DM)数据库常用SQL。
    javascript深度理解数组的sort()排序
    java.util.IllegalFormatConversionException: f != java.lang.String 问题解决!
    Android平台GB28181设备接入端对接编码前后音视频源类型浅析
    Windows 通过注册表添加:鼠标右键文件夹内空白处,提示打开 Cmd、Powershell
    程序分享--排序算法--冒泡排序
    【王道】计算机网络数据链路层(二)
  • 原文地址:https://blog.csdn.net/a80C51/article/details/132838067