是一种技术,可以帮助我们实时的捕获数据库中数据的变化,并将这些变化的数据以流的形式传输到其他的系统中进行处理和存储。
- # 1、修改mysql配置文件
- vim /etc/my.cnf
-
- # 2、增加以下配置
- # 在配置文件中增加二配置
- # 需要将配置放在[mysqld]后面
- # 打开binlog
- log-bin=mysql-bin
- # 选择ROW(行)模式
- binlog-format=ROW
- # 配置MySQL replaction需要定义,不要和canal的slaveId重复
- server_id=1
-
-
- # 3、重启mysql服务
- systemctl restart mysqld
-
- # 查看mysql binlog文件
- cd /var/lib/mysql
- mysql-bin.000001
-
- # 改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
- mysql -u 用户名 -p 密码
- show variables like 'log_bin';
- # 1、上传jar到flink lib目录下
- flink-sql-connector-mysql-cdc-2.2.1.jar
-
- # 2、重启flink集群
- yarn application -list
- yarn application -kill application_1699579932721_0004
- yarn-session.sh -d
- -- 创建flink cdc表,
- -- cdc表实时从mysql读取数据的表 -- 无界流
- CREATE TABLE students_cdc (
- id BIGINT,
- name STRING,
- age BIGINT,
- gender STRING,
- clazz STRING,
- PRIMARY KEY (id) NOT ENFORCED -- 主键
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'master',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = '123456',
- 'database-name' = 'student',
- 'table-name' = 'students'
- );
-
- select * from students_cdc;