FlinkCDC是一个实现CDC(Change Data Capture)思想的数据同步工具。 借助于Flink CDC Connector ,它监听数据库的二进制日志文件,来获取数据库变更的数据,以此实现数据同步。
FlinkCDC 支持的 Connectors and Version
下载 mysql-cdc connector 放入 FLINK_HOME/lib
启动Flink Cluster : Flink_HOME/bin/start_cluster.sh
① 除了使用Flink CDC连接数据库外,也可以使用jdbc方式连接数据库。Flink CDC持续监听数据库二进制日志文件,以此捕获变更数据。而jdbc连接器允许读和写关系型数据库。
② 监听数据库变更的数据,既可以使用DataStream方式,也可以使用SQL
vim /etc/my.cnf
[mysqld]
server_id=1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30 #这个配置在公司中不要配,这里是虚拟机环境,可以配置。
重启MySQL数据库 :systemctl restart mysqld
建表规则
1.字段名必须相同
2.数据类型必须匹配上
3.FlinkSQL中的表名无所谓,不需要和MySQL中的源表一样
create table f_student (
s_id string primary key not enforced,
s_name string,
s_birth string,
s_sex string
) with (
'connector' = 'mysql-cdc',
'hostname' = 'node1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'Student',
'scan.startup.mode' = 'initial',
'server-time-zone' = 'Asia/Shanghai'
);
scan.startup.mode:
MySQL CDC 消费者可选的启动模式, 合法的模式为 “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”
更多连接选项在这里
mysql-cdc实时捕获mysql数据库表Student中变更的数据,包括insert 、update、delete操作,通过select语句查看
select * from f_student ;
下载Flink jdbc Connector 放入 FLUME_HOME/lib
※ flink-docs-release-1.14 检查该路径是否和已安装的flink版本一样
下载MySQL数据库驱动 放入 FLUME_HOME/lib
CREATE TABLE f_score(
s_id STRING,
c_id STRING,
s_score INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://node1:3306/test',
'table-name' = 'Score',
'username' = 'root',
'password' = '123456'
);
flink jdbc connector 读取mysql中的数据,通过select语句查看读取的数据
select * from f_score;