• 【kafka】JDBC source&sink connect实现数据从Oracle实时同步插入更新到PostgreSQL(PG)...


    〇、所需资料

    1、JDBC connect的plugins下载地址(confluent)

    一、Oracle建表

    1、表规划

    表名:Test_TimeFormat_Order、Test_Stress_Order

    字段:INCREMENT_UID/Order_ID/quantity/cost/CREATE_DATE/UPDDATTIM_0

    2、建表语句

    1. -- 建表语句
    2. CREATE TABLE "TEST"."Test_TimeFormat_Order"(
    3. "INCREMENT_UID" INTEGER NOT NULL,
    4. "Order_ID" VARCHAR2(255) NOT NULL,
    5. "quantity" INTEGER DEFAULT 0 NOT NULL,
    6. "cost" NUMBER(11,2) DEFAULT 0.99 NOT NULL,
    7. "CREATE_DATE" TIMESTAMP (2) DEFAULT SYSDATE NOT NULL,
    8. "UPDDATTIM_0" TIMESTAMP (2) DEFAULT SYSDATE NOT NULL
    9. );
    10. -- 建自增序列
    11. create sequence test.test_seq_Test_Stress_Order increment by 1 start with 1
    12. minvalue 1 maxvalue 9999999999999 nocache;
    13. -- 创建触发器,将自增序列的值插入
    14. create or replace trigger test.Auto_Trig_Test_TimeFormat_Order
    15. before insert on "TEST"."Test_TimeFormat_Order"
    16. for each row
    17. begin
    18. select test.test_seq_Test_TimeFormat_Order.Nextval into:new.INCREMENT_UID from dual;
    19. end;
    20. -- 问题:Oracle序列如何归零

    3、插入数据

    1. -- 全字段
    2. INSERT INTO "TEST"."Test_TimeFormat_Order"("INCREMENT_UID", "Order_ID", "quantity", "cost", "CREATE_DATE", "UPDDATTIM_0") VALUES ('1', 'SO001', '2', '1.99', TO_TIMESTAMP('2022-09-08 20:19:41.00', 'SYYYY-MM-DD HH24:MI:SS:FF2'), TO_TIMESTAMP('2022-09-08 20:19:44.00', 'SYYYY-MM-DD HH24:MI:SS:FF2'));
    3. -- 最简化
    4. INSERT INTO "TEST"."Test_TimeFormat_Order"("Order_ID") VALUES ('SO001')

    4、更新数据

    1. -- 全字段更新
    2. UPDATE "TEST"."Test_TimeFormat_Order" SET "INCREMENT_UID" = '1', "Order_ID" = 'SO001', "quantity" = '2', "cost" = '1.99', "CREATE_DATE" = TO_TIMESTAMP('2022-09-08 20:19:41.00', 'SYYYY-MM-DD HH24:MI:SS:FF2'), "UPDDATTIM_0" = TO_TIMESTAMP('2022-09-08 20:19:44.00', 'SYYYY-MM-DD HH24:MI:SS:FF2') WHERE "INCREMENT_UID" = '1' AND "Order_ID" = 'SO001' AND "quantity" = '2' AND "cost" = '1.99' AND "CREATE_DATE" = '2022-09-08 20:19:41.00' AND "UPDDATTIM_0" = '2022-09-08 20:19:44.00';
    3. -- 单字段更新(只更新时间戳列)
    4. UPDATE "TEST"."Test_TimeFormat_Order" SET "UPDDATTIM_0" = TO_TIMESTAMP('2022-09-08 20:19:44.00', 'SYYYY-MM-DD HH24:MI:SS:FF2') WHERE "Order_ID" = 'SO001';

    二、建source connector

    PUT 192.168.0.1:8083/connectors/sink_connector_Test_TimeFormat_Order/config

    1. {
    2. "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    3. "mode": "timestamp",
    4. "timestamp.column.name": "UPDDATTIM_0",
    5. "topic.prefix": "connector_topic_",
    6. "connection.password": "system",
    7. "connection.user": "system",
    8. "db.timezone": "Asia/Shanghai",
    9. "name": "source_connector_Test_TimeFormat_Order",
    10. "connection.url": "jdbc:oracle:thin:@//192.168.0.1:1521/helowin",
    11. "table.whitelist": "TEST.Test_TimeFormat_Order"
    12. }

    三、建sink connector

    PUT 192.168.0.2:8083/connectors/sink_connector_Test_TimeFormat_Order/config

    1. {
    2. "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    3. "table.name.format": "ljh.Test_TimeFormat_Order",
    4. "connection.password": "QAZ123",
    5. "tasks.max": "1",
    6. "topics": "connector_topic_Test_TimeFormat_Order",
    7. "delete.enabled": "false",
    8. "auto.evolve": "true",
    9. "connection.user": "postgres",
    10. "name": "sink_connector_Test_TimeFormat_Order",
    11. "auto.create": "true",
    12. "connection.url": "jdbc:postgresql://192.168.0.2:5432/bigdata",
    13. "insert.mode": "upsert",
    14. "pk.mode": "record_value",
    15. "pk.fields": "Order_ID"
    16. }

    四、用到的命令

    1、启停zk、kafka、connect

    kafka:nohup bin/kafka-server-start.sh config/server.properties > nohup_kafka_log.txt 2>&1 &

    bin/kafka-server-stop.sh
    zookeeper:nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > nohup_zookeeper_log.txt 2>&1 &

    bin/zookeeper-server-stop.sh

    nohup bin/connect-distributed.sh config/connect-distributed.properties > nohup_connect_log.txt 2>&1 &

    kill pid

    2、查看状态

    sudo /usr/local/jdk1.8.0_291/bin/jps

    tail -f nohup_zookeeper.log

    3、topic相关

    查看topic列表:   bin/kafka-topics.sh --bootstrap-server big04:9092,big05:9092,big07:9092 --list
    新建topic:bin/kafka-topics.sh --bootstrap-server big04:9092,big05:9092,big07:9092  --create --replication-factor 1 --partitions 1 --topic first
    查看topic详细信息:kafka-topics.sh --describe --bootstrap-server big04:9092,big05:9092,big07:9092 --topic first
    删除topic:bin/kafka-topics.sh --bootstrap-server big04:9092,big05:9092,big07:9092 --delete --topic first

    生产:bin/kafka-console-producer.sh --broker-list big04:9092,big05:9092,big07:9092 --topic first
    消费:bin/kafka-console-consumer.sh --bootstrap-server big04:9092,big05:9092,big07:9092 --topic connector_topic_0908_MFGITM_0908
    从头消费:bin/kafka-console-consumer.sh --topic connector_topic_bak_MFGITM --from-beginning --bootstrap-server big04:9092,big05:9092,big07:9092

    4、connect命令

    查询全部connector:GET http://192.168.0.2:8083/connectors

    查询可用的插件:GET 192.168.0.2:8083/connector-plugins

    查询具体connector:GET 192.168.0.2:8083/connectors/source_connector_docker_oracle_MFGITM_bak/config(status)

    新增或修改connector:PUT 192.168.0.2:8083/connectors/sink_connector_Test_TimeFormat_Order/config

    新增connector:POST 192.168.0.2:8083/connectors

    五、验证

    1、Oracle插入数据

    (1)插入新纪录

    (2)修改记录

    2、消费者监听topic

    (1)查看创建的topic

    (2)从头消费topic

    (3)新增数据后

    (4)修改数据后

    3、pg端查看插入的数据

    (1)插入数据后

    (2)修改数据后

  • 相关阅读:
    请问,非计算机专业,只为软考中级,哪一种最容易过?
    【无标题】
    Java 21 新功能展示(含示例)
    软件测试之压力测试详解
    ActiveMQ用法
    iceoryx之Roudi
    Oracle-表空间基于时间点恢复(TSPITR)
    【EDA365电子论坛】硬件人经历南下、北上,回乡创业,后悔了吗?
    java(HashSet类)
    数据库概述
  • 原文地址:https://blog.csdn.net/USTSD/article/details/126774083