• Flink connector Oracle CDC 实时同步数据到MySQL(Oracle19c)


    准备工作

    在这一步需要配置Oracle。主要包含。

    1. 开启Archive log
    2. 开启数据库和数据表的supplemental log
    3. 创建CDC用户并赋予权限

    注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。

    下面开始配置步骤。在安装Oracle的机器上执行:

    1. su - oracle
    2. sqlplus / as sysdba

    进入Sqlplus。然后开启Archive log。

    1. alter system set db_recovery_file_dest_size = 10G;
    2. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    3. shutdown immediate;
    4. startup mount;
    5. alter database archivelog;
    6. alter database open;
    7. # 检查Archive log是否成功开启
    8. archive log list;

    注意:'/opt/oracle/oradata/recovery_area' 这个路径如果不存在的话需要自己手动去创建(Oracle用户下创建)

    1. 本步骤需要重启数据库,请选择在合适的时间操作。
    2. 例子中的/opt/oracle/oradata/recovery_area目录oracle用户需要有读写权限。
    3. 如果执行alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;的时候报ORA-32001: write to SPFILE requested but no SPFILE is in use。需要检查spfile文件。
    1. show parameter spfile;
    2. # 如果输出value为空,说明没有创建spfile,执行下面SQL创建
    3. create spfile from pfile;
    4. # 关闭并重启
    5. shutdown immediate;
    6. startup;
    7. # 检查spfile是否成功创建
    8. show parameter spfile;

    接下来进行相关重要的配置:

    1. -- 检查日志归档是否开启
    2. archive log list;
    3. -- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
    4. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    5. - 创建表空间
    6. CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    7. -- 创建用户family绑定表空间LOGMINER_TBS
    8. CREATE USER C##family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
    9. -- 授予family用户dba的权限
    10. grant connect,resource,dba to C##family;
    11. - 并授予权限
    12. GRANT CREATE SESSION TO C##family;
    13. GRANT SELECT ON V_$DATABASE to C##family;
    14. GRANT FLASHBACK ANY TABLE TO C##family;
    15. GRANT SELECT ANY TABLE TO C##family;
    16. GRANT SELECT_CATALOG_ROLE TO C##family;
    17. GRANT EXECUTE_CATALOG_ROLE TO C##family;
    18. GRANT SELECT ANY TRANSACTION TO C##family;
    19. GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##family;
    20. GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
    21. GRANT CREATE TABLE TO C##family;
    22. GRANT LOCK ANY TABLE TO C##family;
    23. GRANT ALTER ANY TABLE TO C##family;
    24. GRANT CREATE SEQUENCE TO C##family;
    25. GRANT EXECUTE ON DBMS_LOGMNR TO C##family;
    26. GRANT EXECUTE ON DBMS_LOGMNR_D TO C##family;
    27. GRANT SELECT ON V_$LOG TO C##family;
    28. GRANT SELECT ON V_$LOG_HISTORY TO C##family;
    29. GRANT SELECT ON V_$LOGMNR_LOGS TO C##family;
    30. GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
    31. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C##family;
    32. GRANT SELECT ON V_$LOGFILE TO C##family;
    33. GRANT SELECT ON V_$ARCHIVED_LOG TO C##family;
    34. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C##family;

    本地用Navicate连接Oracle:

    1. -- 创建 STUDENT_INFO 表
    2. create table student_info (
    3. sid number(10) constraint pk_sid primary key,
    4. sname varchar2(10),
    5. sex varchar2(2)
    6. );
    7. -- 修改STUDENT_INFO表让其支持增量日志,先在Oracle里创建STUDENT_INFO表再执行下面的语句
    8. ALTER TABLE C##FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

    至此,Oracle配置完成;

    Flink CDC 配置:

    这里用Flink SQL CLI 做演示:

    使用以下命令切换到 Flink 目录

    cd flink-1.14.0

     使用以下命令启动 Flink SQL CLI:

    ./bin/sql-client.sh

    我们应该看到 CLI 客户端的欢迎屏幕。

    在 Flink SQL CLI 中使用 Flink DDL 创建表

    创建从相应数据库表中捕获更改数据的表。

    Oracle CDC 表可以定义如下:

    1. -- register an Oracle table 'student_info' in Flink SQL
    2. Flink SQL> CREATE TABLE student_info (
    3. SID INT NOT NULL,
    4. SNAME STRING,
    5. SEX STRING,
    6. PRIMARY KEY(SID) NOT ENFORCED
    7. ) WITH (
    8. 'connector' = 'oracle-cdc',
    9. 'hostname' = 'localhost',
    10. 'port' = '1521',
    11. 'username' = 'C##family',
    12. 'password' = 'zyhcdc',
    13. 'database-name' = 'ORCLCDB',
    14. 'schema-name' = 'C##FAMILY',
    15. 'table-name' = 'STUDENT_INFO');
    16. -- read snapshot and binlogs from products table
    17. Flink SQL> SELECT * FROM student_info;

    在定义一个数据流出到MySQL的表:

    1. -- register an Oracle table 'mysql_user' in Flink SQL
    2. Flink SQL> CREATE TABLE mysql_user (
    3. SID INT ,
    4. SNAME STRING,
    5. SEX STRING,
    6. PRIMARY KEY(SID) NOT ENFORCED
    7. ) WITH (
    8. 'connector' = 'jdbc',
    9. 'url' = 'jdbc:mysql://localhost:3306/test_cdc',
    10. 'username' = 'root',
    11. 'password' = 'root123',
    12. 'table-name' = 'user'
    13. );
    14. Flink SQL> insert into mysql_user select SID,SNAME,SEX from student_info;
    15. [INFO] Submitting SQL update statement to the cluster...
    16. [INFO] SQL update statement has been successfully submitted to the cluster:
    17. Job ID: 1966fdd63bb36c14908fe8e31408db58

    完成以上配置;Flink connector Oracle CDC实时数据同步到MySQL的操作就完成了;后续可以自行测试一下。

    如果有主键的话会自动进行merge。 

  • 相关阅读:
    Linux 小程序-进度条
    【数据结构】—交换排序之快速排序究极详解,手把手带你从简单的冒泡排序升级到排序的难点{快速排序}(含C语言实现)
    【无标题】
    【原创】java+swing+mysql鲜花购物商城设计与实现
    Vite - 配置 - 不同的环境执行不同的配置文件
    Docker 安装Kafka
    Llama模型家族之RLAIF 基于 AI 反馈的强化学习(三) RLAIF 的工作原理
    MySql对于时间段交集的处理和通用实现方式(MyBatis-Plus)
    Windows系统中配置 Redis 监听特定的 IP 地址
    java基础---01
  • 原文地址:https://blog.csdn.net/qq_23502409/article/details/126016265