• flinkCDC数据同步之 postgresql to starrocks


    flinkCDC数据同步之 postgresql to starrocks

    环境准备

    starrocks

    • 集群介绍
    角色IP配置备注
    FE192.168.110.170;192.168.110.171;192.168.110.1713follower测试可一个FE
    BE192.168.110.170;192.168.110.171;192.168.110.1713be测试可一个BE
    broker192.168.110.170;192.168.110.171;192.168.110.1713broker测试可不用broker

    postgresql

    • 安装模式

      docker 部署

       # 拉取镜像
       docker pull postgres
       # 启动postgresql
       docker run --name mypostgres -d -p 5432:5432 -e POSTGRES_PASSWORD=123456 postgres
      
      • 1
      • 2
      • 3
      • 4
    • 配置修改

      # 进入容器
      docker exec -it mypostgres /bin/bash
      
      # 下载vim命令
      apt-get update
      apt-get install vim
      
      # 寻找配置文件路径
      find / -name postgresql.conf
      
      # 编辑配置文件
      vi /var/lib/postgresql/data/postgresql.conf
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 修改以下配置

        # 更改wal日志方式为logical
        wal_level = logical # minimal, replica, or logical
        
        # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
        max_replication_slots = 20 # max number of replication slots
        
        # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
        max_wal_senders = 20 # max number of walsender processes
        # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
        wal_sender_timeout = 180s # in milliseconds; 0 disable  
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10

        wal_level是必须更改的,其它参数选着性更改

        更改配置文件postgresql.conf完成,需要重启pg服务生效

      • 重启服务

        # 退出容器后
        docker restart mypostgres
        
        • 1
        • 2
    • 创建pg用户,给予复制流权限

      -- pg新建用户
      CREATE USER pgcdc WITH PASSWORD '123456';
      -- 给用户复制流权限
      ALTER ROLE pgcdc replication;
      
      -- 创建数据库
      
      create database test;
      
      
      -- 给用户登录数据库权限
      grant CONNECT ON DATABASE test to pgcdc;
      
      -- 把当前库public下所有表查询权限赋给用户
      GRANT SELECT ON ALL TABLES IN SCHEMA public TO pgcdc;
      
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • 发布表

      --创建表
      
      CREATE TABLE testcdc(
                                 ID INT PRIMARY KEY      NOT NULL,
                                 DEPT           CHAR(50) NOT NULL,
                                 EMP_ID         INT      NOT NULL
      );
      
      -- 设置发布为true
      update pg_publication set puballtables=true where pubname is not null;
      -- 把所有表进行发布
      CREATE PUBLICATION dbz_publication FOR ALL TABLES;
      -- 查询哪些表已经发布
      select * from pg_publication_tables;
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    • 更改表的复制标识包含更新和删除的值

      -- 更改复制标识包含更新和删除之前值
      ALTER TABLE testcdc REPLICA IDENTITY FULL;
      -- 查看复制标识(为f标识说明设置成功)
      select relreplident from pg_class where relname='testcdc';
      
      • 1
      • 2
      • 3
      • 4

    OK,到这一步,设置已经完全可以啦,上面步骤都是必须的

    编写代码

    maven依赖

     <properties>
            <maven.compiler.source>8maven.compiler.source>
            <maven.compiler.target>8maven.compiler.target>
            <scala.binary.version>2.11scala.binary.version>
            <debezium.version>1.5.4.Finaldebezium.version>
            <flink.version>1.13.6flink.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-scala_2.11artifactId>
                <version>1.13.6version>
            dependency>
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-scala_2.11artifactId>
                <version>1.13.6version>
            dependency>
            <dependency>
                <groupId>com.alibaba.ververicagroupId>
                <artifactId>flink-connector-postgres-cdcartifactId>
                <version>1.4.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-javaartifactId>
                <version>${flink.version}version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
                <version>${flink.version}version>
    
            dependency>
            <dependency>
                <groupId>com.starrocksgroupId>
                <artifactId>flink-connector-starrocksartifactId>
                <version>1.2.2_flink-1.13_2.11version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clients_2.11artifactId>
                <version>${flink.version}version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-planner-blink_2.11artifactId>
                <version>${flink.version}version>
    
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-cep_${scala.binary.version}artifactId>
                <version>${flink.version}version>
            dependency>
    
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-rabbitmq_2.11artifactId>
                <version>1.13.6version>
            dependency>
    
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
                <version>${flink.version}version>
            dependency>  
    
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.75version>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    java代码

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class PgsqlToStarRocksTest {
        public static void main(String[] args) throws Exception {
            EnvironmentSettings fsSettings =
                    EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    
            // 获取flink流环境变量
            StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            exeEnv.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
            exeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
            // make sure 500 ms of progress happen between checkpoints
            exeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
            // allow only one checkpoint to be in progress at the same time
            exeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
            // enable externalized checkpoints which are retained after job cancellation
            exeEnv.getCheckpointConfig()
                    .enableExternalizedCheckpoints(
                            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            FsStateBackend stateBackend = new FsStateBackend("file:\\\\D:\\fsdata");
            exeEnv.setStateBackend(stateBackend);
            // exeEnv.setDefaultSavepointDirectory();
            exeEnv.setParallelism(2);
            // 表执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings);
            String sourceDDL =
                    "CREATE TABLE pgsql_source (\n" +
                            " id int,\n" +
                            " dept STRING,\n" +
                            " emp_id int\n" +
                            ") WITH (\n" +
                            " 'connector' = 'postgres-cdc',\n" +
                            " 'hostname' = '192.168.110.13',\n" +
                            " 'port' = '5432',\n" +
                            " 'username' = 'pgcdc',\n" +
                            " 'password' = '123456',\n" +
                            " 'database-name' = 'test',\n" +
                            " 'schema-name' = 'public',\n" +
                            " 'debezium.snapshot.mode' = 'never',\n" +
                            " 'decoding.plugin.name' = 'pgoutput',\n" +
                            " 'debezium.slot.name' = 'testcdc',\n" +
                            " 'table-name' = 'testcdc'\n" +
                            ")";
            String sinkDDL =
                    "CREATE TABLE sr_sink (\n" +
                            "id int ," +
                            "dept string," +
                            "emp_id int," +
                            "PRIMARY KEY (id) " +
                            "NOT ENFORCED" +
                            ") WITH ( " +
                            "'connector' = 'starrocks'," +
                            "'jdbc-url'='jdbc:mysql://192.168.110.170:9036,192.168.110.171:9036,192.168.110.172:9036'," +
                            "'load-url'='192.168.110.170:8036;192.168.110.171:8036;192.168.110.172:8036'," +
                            "'database-name' = 'test'," +
                            "'table-name' = 'testcdc'," +
                            "'username' = 'root'," +
                            "'password' = ''," +
                            "'sink.properties.column_separator' = '\\x01'," +
                            "'sink.properties.row_delimiter' = '\\x02'" +
                            ")";
    
            String transformSQL =
                    "INSERT INTO sr_sink  select * from pgsql_source";
            // 执行source表ddl
            tableEnv.executeSql(sourceDDL);
            //  String selectSQL = "select * from  pgsql_source";
            //   Table table = tableEnv.e(selectSQL);
    
            tableEnv.executeSql(sinkDDL);
    
            //    String transformSQL = "select * from pgsql_source";
            TableResult tableResult = tableEnv.executeSql(transformSQL);
            tableResult.print();
            
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    表结构

    • starrocks

       CREATE TABLE `testcdc` (
        `id` int(11) NOT NULL DEFAULT "-1" COMMENT "",
        `dept` varchar(65533) NULL COMMENT "",
        `emp_id` int(11) NULL COMMENT ""
      ) ENGINE=OLAP 
      PRIMARY KEY(`id`)
      COMMENT "OLAP"
      DISTRIBUTED BY HASH(`id`) BUCKETS 8 
      PROPERTIES (
      "replication_num" = "3",
      "in_memory" = "false",
      "storage_format" = "DEFAULT",
      "enable_persistent_index" = "false"
      );
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
    • pg

      CREATE TABLE testcdc(
                              ID INT PRIMARY KEY      NOT NULL,
                              DEPT           CHAR(50) NOT NULL,
                              EMP_ID         INT      NOT NULL
      );
      
      • 1
      • 2
      • 3
      • 4
      • 5
  • 相关阅读:
    【BI看板】Superset2.0+图表二次开发初探
    java之集合
    【UML图】大型项目快速梳理
    深度学习 TensorFlow入门
    Ant Design of React 创建项目及运行环境
    程序员的 100款代码表白特效,一个比一个浪漫
    玩转 jmeter backend listener kafka
    Celery基本语法
    mulesoft What‘s the typeOf(payload) of Database Select
    Flutter的实现原理初探
  • 原文地址:https://blog.csdn.net/flyinthesky111/article/details/126509270