-- 返回1表示已启用CDC功能
select is_cdc_enabled from db_name.sys.databases where name = 'db_name';
ALTER AUTHORIZATION ON DATABASE::db_name TO sa;
USE db_name;
EXEC sys.sp_cdc_enable_db;
select name from db_name.sys.database_principals
where type_desc = 'DATABASE_ROLE' and name not like '##%'
USE db_name;
create role cdc_role;
grant select on schema::schema_name to cdc_role;
SELECT
r.name AS RoleName, m.name AS MemberName
FROM sys.database_role_members drm
JOIN sys.database_principals r ON drm.role_principal_id = r.principal_id
JOIN sys.database_principals m ON drm.member_principal_id = m.principal_id
--WHERE m.name = 'sa' AND r.name = 'db_owner';
alter database db_name add filegroup cdc_filegroup;
alter database db_name add file (
name = 'cdc_data',
filename = 'D:\RDSDBDATA\DATA\cdc_data.mdf', -- 需查看SQLServer实际数据路径
size = 16MB
)
to filegroup cdc_filegroup;
-- select data_space_id,name,type_desc,t1.is_read_only,t1.type,t1.log_filegroup_id from db_name.sys.filegroups t1;
-- select file_id,name,physical_name,type_desc,data_space_id from db_name.sys.database_files;
对要采集的表启用CDC
select is_tracked_by_cdc from db_name.sys.tables where name = 'table_name';
USE db_name;
EXEC sys.sp_cdc_enable_table
@source_schema = N'schema_name',
@source_name = N'table_name',
@role_name = N'cdc_role_name',
@filegroup_name = N'cdc_filegroup',
@supports_net_changes = 0
开启表CDC可能出现如下报错:
SQL 错误 [22832] [S0001]: 无法更新元数据来指示已对表 [schema_name].[table_name] 启用了变更数据捕获。
执行命令 '[sys].[sp_cdc_add_job] @job_type = N'capture'' 时失败。
返回的错误为 22836: '无法更新数据库 db_name 的元数据来指示已添加某变更数据捕获作业。
执行命令 'sp_add_jobstep_internal' 时失败。返回的错误为 14234: '指定的 '@server' 无效(有效值由 sp_helpserver 返回)。
'。请使用此操作和错误来确定失败的原因并重新提交请求。'。请使用此操作和错误来确定失败的原因并重新提交请求。
原因:
-- 检查当前服务器名称。
select @@servername;
-- 检查实例名称
SELECT SERVERPROPERTY('ServerName');
-- SQLServer安装程序在安装时将服务器名设置为计算机名。
-- SERVERPROPERTY函数的ServerName属性和@@SERVERNAME返回相似的信息。
-- 1. ServerName属性提供Windows服务器和实例名称,两者共同构成唯一的服务器实例。(如果服务器的网络名称更改,此值会返回新的名称)
-- 2. @@SERVERNAME 提供当前配置的本地服务器名称。(代表在安装或设置SQLServer实例时指定的服务器和实例名称。如果后续对服务器的网络名称进行了更改,此值不会自动更新)
-- 如果安装时未更改默认服务器名称,则ServerName属性和@@SERVERNAME返回相同的信息。 可以通过执行以下过程配置本地服务器的名称:
EXEC sp_dropserver 'current_server_name';
EXEC sp_addserver 'new_server_name', 'local';
查看SQLServer CDC自动清理周期
EXEC sys.sp_cdc_help_jobs;
验证用户是否有权限访问CDC表
USE db_name;
EXEC sys.sp_cdc_help_change_data_capture;
所使用软件的版本
SQLServer CDC DataStream API可实现一个job监控采集一个数据库的多个表.
//源数据库连接配置文件
Properties dbProps = DbConfigUtil.loadConfig("sqlserver.properties");
//Debezium配置
Properties debeziumProps = new Properties();
//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string
//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)
//以double值来表示它们,这可能会到值精度丢失
//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
debeziumProps.setProperty("decimal.handling.mode", "string");
//Time、date和timestamps可以以不同的精度表示,包括:
//adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。
//adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。
//connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。
debeziumProps.setProperty("time.precision.mode", "connect");
//SQLServer CDC数据源
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
.hostname(dbProps.getProperty("host"))
.port(Integer.parseInt(dbProps.getProperty("port")))
.database(dbProps.getProperty("database"))
.tableList(dbProps.getProperty("table_list").split(","))
.username(dbProps.getProperty("username"))
.password(dbProps.getProperty("password"))
.debeziumProperties(debeziumProps)
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
参考: MySQL CDC配置及DataStream API实现代码
参考: MySQL CDC配置及DataStream API实现代码