• 使用debezium、kafka-connect将postgres数据实时同步到kafka中


    在很多场景下,需要将数据同步其他数据源进行计算,本文介绍通过debezium和kafka-connect将postgres数据同步到kafka中。

    首先下载debezium,官网地址: https://debezium.io/
    目前稳定版本是1.9.5,这是postgres对应的kafka-connect插件下载地址:

    debezium-connector-postgres-1.9.5.Final-plugin.tar.gz

    然后再kafka的目录下,新建一个plugins目录,将下载的包解压到这个目录:
    在这里插入图片描述
    然后我们配置kafka-coinect,我们这里以集群模式为例:

     vim config/connect-distributed.properties
     
    
    • 1
    • 2

    主要调整如下几个地方:

    # kafka集群地址
    bootstrap.servers=node1:9092,node2:9092,node3:9092
    # kafka-connect插件位置
    plugin.path=/data1/service/kafka-2.7.2/plugins
    #可以适当调整下面这个值,默认是 10000
    offset.flush.timeout.ms=100000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    然后我们就可以启动kafka-connect:

    bin/connect-distributed.sh -daemon config/connect-distributed.properties
    
    • 1

    到这里,kafka-connect,就启动了起来。
    kafka-connect提供了http restfule的接口供我们取操作,默认的端口地址是8083,常见如下:

    GET /connector-plugins获取当前所有插件名称
    GET /connectors获取当前所有connector
    POST /connector添加一个connector
    GET /connectors/{name}获取指定的connector的信息
    GET /connectors/{name}/config获取指定的connector的配置信息
    PUT /connectors/{name}/config更新connector的配置
    GET /connectors/{name}/status获取指定connector的装填
    GET /connectors/{name}/tasks/获取指定connector正在运行的task
    GET /connectors/{name}/tasks/tasks/{taskid}/status获取connector的task状态信息
    PUT /connectors/{name}/pause暂停connector和他运行task
    POST /connectors/{name}/restart重启connector
    POST /connectors/{name}/tasks/{taskid}/restart重启一个connector的task
    DELETE /connectors/.{name}删除一个connector,停止关联的task并删除配置

    在psotgres数据库侧,我们需要调整一下参数:

    wal_level = logical
    max_wal_senders = 2000
    max_replication_slots = 2000
    #下面两个参数可以根据需要调整
    wal_sender_timeout = 60s
    wal_receiver_timeout = 60s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    配置完之后需要重启。
    另外一点,如果PG版本比较老的话,需要装

    • decoderbufs(由Debezium社区维护,基于ProtoBuf)
    • wal2json(由wal2json社区维护,基于JSON)

    而在PG10+默认自带pgoutput可以不用安装,我这里的是基于PG12,所以不用安装。

    到这里所有准备工作就做好了,接下来就是想kafka-connect中添加connector了:

    {
        "name": "prod-material-642",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.dbname": "ad_dissectorprofile_000642",
            "database.user": "postgres",
            "slot.name": "prodmaterial642",
            "tasks.max": "1",
            "database.hostname": "localhost",
            "database.password": "postgres",
            "name": "prod-material-642",
            "database.server.name": "prod-material-642",
            "database.port": "5432",
            "plugin.name": "pgoutput",
            "table.whitelist": "public.ad_entity,public.campaign,public.media_entity,public.url_scheme,public.schedule_entity"
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里添加完之后,我们可以查看天剑的connector和其对应的任务,然后再kafka中,会生成如下几个topic:

    prod-material-642.public.ad_entity
    prod-material-642.public.campaign
    prod-material-642.public.media_entity
    prod-material-642.public.schedule_entity
    prod-material-642.public.url_scheme
    
    • 1
    • 2
    • 3
    • 4
    • 5

    而写入到topic中的数据内容大致如下:

    {
        "before":null,
        "after": {
            "pk":"1",
            "value":"New data"
        },
        "source": {
            ...
            "snapshot":"incremental" 
        },
        "op":"r", 
        "ts_ms":"1620393591654",
        "transaction":null
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    如果需要详细内容,可以看下debezium官网上对于postgres同步的详细介绍:

    Debezium connector for PostgreSQL

    后续我们就可以消费对应的topic来进行相关的数据同步处理即可

  • 相关阅读:
    Open3D读取文件
    通过Pyecharts绘制可视化地球竟 然如此简单
    MyBatis 的执行流程,值得一看
    【AI绘画 | draft意间】国产draft推荐及AI绘画背后的原理解读
    [AutoSAR系列] 1.1 AutoSar 发展历史
    java根据当前日期获取本周和上周的日期区间
    7.过拟合和正则化
    MySQL 外键约束 多表联查 联合查询
    【代码随想录】二刷-栈和队列
    面试题:MySQL 中 InnoDB 的索引结构以及使用 B+ 树实现索引的原因
  • 原文地址:https://blog.csdn.net/LeoHan163/article/details/126267506