• ETL之apache hop系列4-hop开发数据增量同步功能


    ETL增量数据抽取CDC

    概念:Change Data Capture,变化的数据捕获,也称:【增量数据抽取】(名词解释)

    CDC是一种实现数据的增量抽取解决方案,是实现【ETL整体解决方案】中的一项子方案/子问题。(对CDC的定位)

    如何捕获变化的数据是增量抽取的关键,对捕获方法一般有2点要求:

    • 准确性:能够将业务系统中的变化数据准确地捕获到;
    • 性能:尽量减少对业务系统造成太大的压力,影响现有业务。

    2 CDC 常见解决方案
    按CDC方案的任一操作是否对数据源系统产生影响(性能、功能等),分为:【侵入式CDC】、【非侵入式CDC】
    按CDC方案所抽取的数据与数据源系统的变化数据是否在规定时间内同步,分为:【同步CDC】、【异步CDC】

    一、侵入式

    1、基于触发器
    创建中间表,编写触发器或者在后端服务插入增删改的操作记录

    2、基于时间戳
    区分插入操作和更新操作:只有当源系统包含了插入时间戳和更新时间戳两个字段,才能区别插入和更新,否则无法区分。
    删除记录的操作:不能捕获到删除操作,除非是逻辑删除,即记录没有真的删除,只是做了逻辑上的标志。
    多次更新检测:如果在一次同步周期内,数据被更新了多次,只能同步最后一次更新操作,中间的更新操作都丢失了。
    实时能力:时间戳和基于序列的数据抽取一般适用于批量操作,不适合于实时场景下的数据加载。

    二、非侵入式

    3、基于快照
    1基于快照的CDC可检测到插入、更新和删除的数据 (相比基于时间戳的CDC的优点)
    2需要大量存储空间来保存快照
    4、基于日志
    源数据库会把每个插入、更新、删除操作记录到日志里。
    通过分析已经发生的事件提交(commit)的日志记录来得到增量数据信息,有一定的时间延迟。
    【特点】复杂、异步、非侵入式

    参考文档:

    https://zhuanlan.zhihu.com/p/362471672

    https://www.cnblogs.com/johnnyzen/p/12781942.html

    基于以上的几种增量同步方式优缺点,采用第一种基于触发器方式
    本文中的示例是源数据库Sql Server 数据库的数据同步到目标数据库MySql数据库中,被同步的源数据库为Test,Schema 为innovator,表名为MyTable

    ​ 流程示意图

    一、在数据库层面上的系列操作

    表的创建SQL如下

    CREATE TABLE [innovator].[MyTable] (
      [Id] char(32) COLLATE Chinese_PRC_CI_AS  NOT NULL  PRIMARY KEY ,
      [Name] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
      [CreatedTime] datetime  NOT NULL,
    )  
    
    1、建立SQL Server中间数据库temp_db,需要同步的所有表放在dbo下
    2、创建需要同步的中间表,比如 ,中间表的表名addOrEdit_MyTable,字段和源表一样
    -- 只复制基础表结构不复制索引触发器
    SELECT * INTO temp_db.dbo.addOrEdit_MyTable FROM Test.innovator.MyTable WHERE 1 = 0;
    
    3、创建需要同步删除的中间表Table_Delete
    CREATE TABLE [dbo].[Table_Delete] (
      [Id] char(32) COLLATE Chinese_PRC_CI_AS  NOT NULL  PRIMARY KEY,
      [TableName] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
      [DeletedTime] datetime  NOT NULL,
    )  
    
    4、在源数据库Test里的MyTable表里创建两个触发器

    (1) 插入修改触发器

    CREATE TRIGGER [innovator].[insertUpdateTrigger_MyTable]
    ON [innovator].[MyTable]
    WITH EXECUTE AS CALLER
    FOR INSERT, UPDATE
    AS
    BEGIN
      -- Type the SQL Here.
      --检查插入或更新的数据在A表中是否存在,有则更新,无则添加
    	if EXISTS(select 1 from temp_db.dbo.addOrEdit_MyTable as A,inserted as B where A.id=B.id)
    		 UPDATE  A set 
    			 A.Name=B.Name,A.CreatedTime=B.CreatedTime
    		 from temp_db.dbo.addOrEdit_MyTable A join inserted B on A.id= B.id
    	else
    		 insert into temp_db.dbo.addOrEdit_MyTable select * from inserted
    END
    

    (2) 删除触发器

    CREATE TRIGGER [innovator].[deleteTrigger_MyTable]
    ON [innovator].[MyTable]
    WITH EXECUTE AS CALLER
    FOR DELETE
    AS
    BEGIN
      -- Type the SQL Here.
    	insert into temp_db.dbo.Table_Delete select Id ,'MyTable' as TableName, GETDATE() as DeletedTime from deleted;
    END
    

    (3) 触发器图片参考

    5、创建Mysql目标数据库 Test_Mysql,字符集选择为utf8mb4
     -- CREATE DATABASE Test_Mysql DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_chinese_ci;
     CREATE DATABASE IF NOT EXISTS Test_Mysql DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci;
    
    6、在Test_Mysql数据库里创建表MyTable
    DROP TABLE IF EXISTS `MyTable`;
    CREATE TABLE `MyTable`  (
      `Id` char(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
      `Name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
      `CreatedTime` datetime(0) NOT NULL,
      PRIMARY KEY (`ID`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
    
    7、可能存在触发器中不能访问temp_db数据库,解决如下:
    -- 1、查询所有数据库信息
    SELECT name, database_id, is_trustworthy_on FROM sys.databases
    -- 2、修改 SQL Server 的实例是否信任该数据库以及其中的内容(注意:必须是 sysadmin 固定服务器角色的成员才能设置此选项。)
    ALTER  DATABASE  temp_db set TRUSTWORTHY ON 
    -- 3、查询系统所有用户
    SELECT name FROM Sysusers
    -- 4、给用户innovator_regular授权访问temp_db数据库
    ALTER AUTHORIZATION ON DATABASE::temp_db TO innovator_regular
    

    二、在apache hop上编写工作流和管道

    这里偷点懒,不想再写一遍这些管道和工作流,拿之前写的截个图

    创建项目示意图 ,该图是hop-gui 客户端创建的,hop-web的是差不多的,只是Home folder为:

    /usr/local/tomcat/webapps/ROOT/config/projects/ArasProject

    hop-web创建项目保存位置示意图

    创建的工作流和管道保存位置示意图

    1、编写源表数据第一次全量同步到目标表的管道,命名为Init_MyTable(比如下面示意图中的Init_HPART_)

    (1) Table input就是源数据库Sql Server里的表,Insert/update 里就是被插入修改同步操作的目标MySql表

    (2) Table input 示例图

    把上图的示例sql改为下面的

    SELECT
     Id,
     Name,
     CONVERT(VARCHAR(100),CreatedTime,21) AS CreatedTime
     FROM innovator.MyTable
    

    (3) Insert/update 示例图

    2、编写源表数据第一次全量同步到目标表的初始化工作流,命名为Init_Wrokflow(比如下面示意图中的InitDataTable_Wrokflow) (只执行一次)

    3、编写增量同步添加修改操作的管道,命名为AddOrEdit_MyTable(比如下面示意图中的PR_AddOrEdit)

    (1) Table input 示例图

    把上图的示例sql改为下面的

    SELECT
     Id,
     Name,
     CONVERT(VARCHAR(100),CreatedTime,21) AS CreatedTime
     FROM dbo.addOrEdit_MyTable
    

    (2) Insert/update 示例图

    (3) Delete 示例图

    4、编写增量同步删除操作的管道,命名为Delete_MyTable(比如下面示意图中的PR_Delete)

    (1) Table input 示例图

    这里Sql改为

    SELECT Id,TableName,DeletedTime FROM dbo.Table_Delete where TableName='MyTable'
    

    (2) Delete 示例图

    (3) Delete2 示例图

    5、编写增量同步的工作流,命名为MyTable_Workflow(比如下面示意图中的PR_workflow)(定时执行)

    其中 PR_AddOrEdit.hpl就是步骤3中的同步添加修改操作的管道,PR_Delete.hpl就是步骤四中的增量同步删除操作的管道

    (1) Start定时执行示意图

    (2) 工作流并行执行各个管道任务示意图

    以上是基于触发器模式增量数据同步的hop web设计、建模,下一步需要在生产环境中执行设计的工作流/管道文件。

    发布执行已经设计建模的工作流/管道文件,参考系列3,发布成功后在 Hop Server的web页面执行工作流或者管道

    也可以使用第三方系统主动调用api执行工作流和管道。

    三、基于时间戳模式的数据增量同步

    如果主键id是自增类型的和时间戳模式的数据同步是一样的

    1、管道设计示意图

    2、Table Input 示意图

    第一个表输入,读取的是要同步的目标数据库表的最新创建时间/最新更新时间

    3、Table Input2 示意图

    第二个表输入,读取的是源数据库表的数据,创建时间或者更新时间大于第一步获取的最新时间

    4、Insert/update示意图

    第二部获取的源数据库上的表数据同步到目标数据库表上

  • 相关阅读:
    企业如何购买腾讯云服务器?(详细指南)
    一文搞定MySQL的分区技术、NoSQL、NewSQL、基于MySQL的分表分库
    node篇 CommonJS规范
    干货 | Web自动化测试中显式等待与隐式等待该怎么用?
    内部类和权限修饰符
    有了红黑树,为啥还要跳表?
    植物大战僵尸各种僵尸攻略(四)
    《Effective C++中文版,第三版》读书笔记7
    第十八章Swing程序设计总结
    瀚高数据库开机自启动失败
  • 原文地址:https://www.cnblogs.com/hudean/p/17654823.html