• Flink CDC-SQL Server CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表


    SQL Server CDC配置

    第一步:启用指定数据库的CDC功能

    • 查看SQL Server是否已启用CDC功能
      -- 返回1表示已启用CDC功能
      select is_cdc_enabled from db_name.sys.databases where name = 'db_name';
      
      • 1
      • 2
    • 若未启用CDC功能,依次执行如下语句开启数据库的CDC
      ALTER AUTHORIZATION ON DATABASE::db_name TO sa;
      USE db_name;
      EXEC sys.sp_cdc_enable_db;
      
      • 1
      • 2
      • 3

    第二步:创建数据库角色

    • 查看数据库角色
      select name from db_name.sys.database_principals 
      where type_desc = 'DATABASE_ROLE' and name not like '##%'
      
      • 1
      • 2
    • 新建数据库角色
      USE db_name;
      create role cdc_role;
      grant select on schema::schema_name to cdc_role;
      
      • 1
      • 2
      • 3
    • 检查用户是否具有db_owner角色
      SELECT 
      	r.name AS RoleName, m.name AS MemberName
      FROM sys.database_role_members drm
      JOIN sys.database_principals r ON drm.role_principal_id = r.principal_id
      JOIN sys.database_principals m ON drm.member_principal_id = m.principal_id
      --WHERE m.name = 'sa' AND r.name = 'db_owner';
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    第三步:创建文件组&文件

    alter database db_name add filegroup cdc_filegroup;
    alter database db_name add file (
    	name = 'cdc_data',
    	filename = 'D:\RDSDBDATA\DATA\cdc_data.mdf', -- 需查看SQLServer实际数据路径
    	size = 16MB
    )
    to filegroup cdc_filegroup;
    
    -- select data_space_id,name,type_desc,t1.is_read_only,t1.type,t1.log_filegroup_id from db_name.sys.filegroups t1;
    -- select file_id,name,physical_name,type_desc,data_space_id from db_name.sys.database_files;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    第四步:启用指定表的CDC功能

    1. 对要采集的表启用CDC

      1. 检查表是否开启CDC
        select is_tracked_by_cdc from db_name.sys.tables where name = 'table_name';
        
        • 1
      2. 表开启CDC
        USE db_name;
        EXEC sys.sp_cdc_enable_table
        @source_schema = N'schema_name',
        @source_name = N'table_name',
        @role_name     = N'cdc_role_name',
        @filegroup_name = N'cdc_filegroup',
        @supports_net_changes = 0
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
    2. 开启表CDC可能出现如下报错:

      SQL 错误 [22832] [S0001]: 无法更新元数据来指示已对表 [schema_name].[table_name] 启用了变更数据捕获。
      执行命令 '[sys].[sp_cdc_add_job] @job_type = N'capture'' 时失败。
      返回的错误为 22836: '无法更新数据库 db_name 的元数据来指示已添加某变更数据捕获作业。
      执行命令 'sp_add_jobstep_internal' 时失败。返回的错误为 14234: '指定的 '@server' 无效(有效值由 sp_helpserver 返回)'。请使用此操作和错误来确定失败的原因并重新提交请求。'。请使用此操作和错误来确定失败的原因并重新提交请求。
      
      • 1
      • 2
      • 3
      • 4
      • 5

      原因:

      -- 检查当前服务器名称。
      select @@servername;
      
      -- 检查实例名称
      SELECT SERVERPROPERTY('ServerName');
      
      -- SQLServer安装程序在安装时将服务器名设置为计算机名。
      -- SERVERPROPERTY函数的ServerName属性和@@SERVERNAME返回相似的信息。 
      -- 1. ServerName属性提供Windows服务器和实例名称,两者共同构成唯一的服务器实例。(如果服务器的网络名称更改,此值会返回新的名称)
      -- 2. @@SERVERNAME 提供当前配置的本地服务器名称。(代表在安装或设置SQLServer实例时指定的服务器和实例名称。如果后续对服务器的网络名称进行了更改,此值不会自动更新)
      -- 如果安装时未更改默认服务器名称,则ServerName属性和@@SERVERNAME返回相同的信息。 可以通过执行以下过程配置本地服务器的名称:
      EXEC sp_dropserver 'current_server_name';
      EXEC sp_addserver 'new_server_name', 'local';
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13

      查看SQLServer CDC自动清理周期

      EXEC sys.sp_cdc_help_jobs;
      
      • 1
    3. 验证用户是否有权限访问CDC表

      USE db_name;
      EXEC sys.sp_cdc_help_change_data_capture; 
      
      • 1
      • 2

    SQLServer CDC DataStream API实现

    所使用软件的版本

    • java 1.8
    • Scala 2.11
    • Flink 1.14.2
    • Flink CDC 2.3.0
    • Source SQLServer2016
    • Sink MySQL 5.7
    • jackson 2.10.2

    SQLServer CDC DataStream API可实现一个job监控采集一个数据库的多个表.

    1. 定义SqlServerSource

    //源数据库连接配置文件
    Properties dbProps = DbConfigUtil.loadConfig("sqlserver.properties");
    
    //Debezium配置
    Properties debeziumProps = new Properties();
    //decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string
    //precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)
    //以double值来表示它们,这可能会到值精度丢失
    //string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
    debeziumProps.setProperty("decimal.handling.mode", "string");
    //Time、date和timestamps可以以不同的精度表示,包括:
    //adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。
    //adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。
    //connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。
    debeziumProps.setProperty("time.precision.mode", "connect");
    
    //SQLServer CDC数据源
    SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
            .hostname(dbProps.getProperty("host"))
            .port(Integer.parseInt(dbProps.getProperty("port")))
            .database(dbProps.getProperty("database"))
            .tableList(dbProps.getProperty("table_list").split(","))
            .username(dbProps.getProperty("username"))
            .password(dbProps.getProperty("password"))
            .debeziumProperties(debeziumProps)
            .deserializer(new JsonDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2. 数据处理

    参考: MySQL CDC配置及DataStream API实现代码

    3. Sink到MySQL

    参考: MySQL CDC配置及DataStream API实现代码

    参考

    1. https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html
    2. https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/sqlserver-cdc.html
    3. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
    4. https://learn.microsoft.com/zh-cn/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-2016
  • 相关阅读:
    【测开求职】面试题:HR面相关的开放性问题
    306周赛t4 6151. 统计特殊整数 记忆化搜索
    React81_React.memo
    云原生Kubernetes:简化K8S应用部署工具Helm
    leetcode - 1658. Minimum Operations to Reduce X to Zero
    鸿蒙开发-UI-动画-组件内转场动画
    机器学习(三):多项式回归
    Shell笔记(超级完整)
    墨香戏韵,重塑经典
    分享大数据分析培训就业班课程内容
  • 原文地址:https://blog.csdn.net/lovetechlovelife/article/details/132865614