• StarRocks数据导入


    1、相关环境

    Flink作为当前流行的流式计算框架,在对接StarRocks时,若直接使用JDBC的方式"流式"写入数据,对StarRocks是不友好的,StarRocks作为一款MVCC的数据库,其导入的核心思想还是"攒微批+降频率"。为此,StarRocks单独开发了flink-connector-starrocks,其内部实现仍是通过对数据缓存攒批后执行Stream Load导入。

    1.1、StarRocks相关下载

    https://www.mirrorship.cn/zh-CN/download/community

    在这里插入图片描述

    1.2、Flink CDC连接器

    参考地址:
    https://ververica.github.io/flink-cdc-connectors/release-2.0/content/about.html#supported-flink-versions

    https://github.com/StarRocks/starrocks-connector-for-apache-flink

    https://docs.starrocks.io/zh-cn/main/loading/Flink-connector-starrocks

    1.3、搭建环境

    • StarRocks
    • Flink
    • Kafka
    • Zookeeper
    • MySQL

    2、Flink读取Kafka数据写入StarRocks

    Routine Load是StarRocks自带的可以消费Kafka数据的导入方式,其特点是简单易用,不依赖外部组件,但若需要对Kafka中的数据进行复杂的ETL,Routine Load可能就不能胜任了,这时就可以考虑使用Flink去消费Kafka中的数据,进行清洗转换后,再sink至StarRocks。

    常见的实时报表的例子,使用Flink对Kafka中追加写入的数据进行实时处理,然后将数据源源不断的同步入库StarRocks。

    2.1、数据准备

    2.1.1、在Kafka中创建主题behavior和province
    kafka-topics.sh --zookeeper 192.168.110.101:2181 --create --replication-factor 1 --partitions 1 --topic behavior
    
    kafka-topics.sh --zookeeper 192.168.110.101:2181 --create --replication-factor 1 --partitions 1 --topic province
    
    • 1
    • 2
    • 3
    2.1.2、向主题behavior生产数据
    kafka-console-producer.sh  --broker-list  192.168.110.101:9092  --topic behavior
    
    • 1
    2.1.3、生产数据
    10001,zs,18,11,shopping
    10002,ls,19, 11,add
    10003,ww,19,61,star
    
    • 1
    • 2
    • 3
    2.1.4、向主题province生产数据
    kafka-console-producer.sh  --broker-list  192.168.110.101:9092  --topic province
    
    
    • 1
    • 2
    2.1.5、生产数据
    11,北京
    61,陕西
    
    • 1
    • 2

    2.2、StarRocks准备

    2.2.1、创建主键模型表s_province
    create database starrocks;
    use starrocks;
    CREATE TABLE IF NOT EXISTS starrocks.`s_province` (
      `uid` int(10) NOT NULL COMMENT "",
      `p_id` int(2) NOT NULL COMMENT "",
      `p_name` varchar(30) NULL COMMENT ""
    )
    PRIMARY KEY(`uid`)
    DISTRIBUTED BY HASH(`uid`) BUCKETS 1
    PROPERTIES (
    "replication_num" = "1",
    -- 限主键模型
    "enable_persistent_index" = "true"
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.3、Flink准备

    2.3.1、启动Flink
     ./start-cluster.sh
    
    • 1
    2.3.2、启动sql-client
    /sql-client.sh embedded
    
    • 1
    2.3.3、执行Flink SQL,创建上下游的映射表

    1、Source部分,创建Flink向Kafka的映射表kafka_source_behavior

    CREATE TABLE kafka_source_behavior (
        uuid int,
        name string,
        age int,
        province_id int,
        behavior string
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'behavior',
        'properties.bootstrap.servers' = '192.168.110.101:9092',
        'properties.group.id' = 'source_behavior',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2、创建映射表kafka_source_province

    CREATE TABLE kafka_source_province (
        pid int,
        p_name string
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'province',
        'properties.bootstrap.servers' = '192.168.110.101:9092',
        'properties.group.id' = 'source_province',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3、Sink部分,创建Flink向StarRocks的映射表sink_province

    CREATE TABLE sink_province (
       uid INT,
       p_id INT,
       p_name STRING,
       PRIMARY KEY (uid) NOT ENFORCED
    )WITH (
       'connector' = 'starrocks',
       'jdbc-url'='jdbc:mysql://192.168.110.101:9030',
       'load-url'='192.168.110.101:8030',
       'database-name' = 'starrocks',
       'table-name' = 's_province',
       'username' = 'root',
       'password' = 'root',
       'sink.buffer-flush.interval-ms' = '5000',
       'sink.properties.column_separator' = '\x01',
       'sink.properties.row_delimiter' = '\x02'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    2.3.4、执行同步任务

    执行Flink SQL,开始同步任务

    insert into sink_province select b.uuid as uid, b.province_id as p_id, p.p_name from kafka_source_behavior b join kafka_source_province p on b.province_id = p.pid;
    
    • 1

    2.4、StarRocks查看数据

    mysql -h192.168.110.101 -P9030 -uroot –proot
    
    use starrocks;
    select * from s_province;
    
    • 1
    • 2
    • 3
    • 4

    3、Flink JDBC读取MySQL数据写入StarRocks

    使用Flink JDBC方式读取MySQL数据的实时场景不多,因为JDBC下Flink只能获取执行命令时MySQL表的数据,所以更适合离线场景。假设有复杂的MySQL数据,就可以在Flink中跑定时任务,来获取清洗后的数据,完成后写入StarRocks。

    3.1、MySQL准备

    3.1.1、MySQL中创建表s_user
    use ODS;
    CREATE TABLE `s_user` (
       `id` INT(11) NOT NULL,
       `name` VARCHAR(32) DEFAULT NULL,
       `p_id` INT(2) DEFAULT NULL,
       PRIMARY KEY (`id`)
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    3.1.2、插入数据
    insert into s_user values(10086,'lm',61),(10010, 'ls',11), (10000,'ll',61);
    
    • 1

    3.2、StarRocks准备

    3.2.1、StarRocks创建表s_user
    use starrocks;
    CREATE TABLE IF NOT EXISTS starrocks.`s_user` (
       `id` int(10) NOT NULL COMMENT "",
       `name` varchar(20) NOT NULL COMMENT "",
       `p_id` INT(2) NULL COMMENT ""
    )
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 1
    PROPERTIES (
    "replication_num" = "1",
    -- 限主键模型
    "enable_persistent_index" = "true"
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3.3、Flink创建映射表

    3.3.1、启动Flink(服务未停止,可以跳过)
     ./start-cluster.sh
    
    • 1
    3.3.2、启动sql-client
    ./sql-client.sh embedded
    
    • 1
    3.3.3、Source部分,创建映射至MySQL的映射表source_mysql_suser
    CREATE TABLE source_mysql_suser (
       id INT,
       name STRING,
       p_id INT,
       PRIMARY KEY (id) NOT ENFORCED
    )WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://192.168.110.102:3306/ODS',
       'table-name' = 's_user',
       'username' = 'root',
       'password' = 'root'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3.3.4、Sink部分,创建至StarRocks的映射表sink_starrocks_suser
    CREATE TABLE sink_starrocks_suser (
       id INT,
       name STRING,
       p_id INT,
       PRIMARY KEY (id) NOT ENFORCED
    )WITH (
       'connector' = 'starrocks',
       'jdbc-url'='jdbc:mysql://192.168.110.101:9030',
       'load-url'='192.168.110.101:8030',
       'database-name' = 'starrocks',
       'table-name' = 's_user',
       'username' = 'root',
       'password' = 'root',
       'sink.buffer-flush.interval-ms' = '5000',
       'sink.properties.column_separator' = '\x01',
       'sink.properties.row_delimiter' = '\x02'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    3.3.5、Flink清洗数据并写入StarRocks

    只是简单做一个where筛选,实际业务可能是多表join的复杂场景

    insert into sink_starrocks_suser select id,name,p_id from source_mysql_suser where p_id = 61;
    
    • 1

    数据写入StarRocks后,Flink任务完成并结束。此时若再对MySQL中s_user表的数据进行增删或修改操作,Flink亦不会感知。

    4、Flink读取StarRocks数据写入MySQL

    还使用MySQL 中的s_user表和StarRocks的s_user表,将业务流程反转一下,读取StarRocks中的数据写入其他业务库,例如MySQL。

    4.1、Flink创建映射表

    4.1.1、启动Flink(服务未停止,可以跳过)
    ./start-cluster.sh
    
    • 1
    4.1.2、启动sql-client
    ./sql-client.sh embedded
    
    • 1
    4.1.3、Source部分,创建StarRocks映射表source_starrocks_suser
    CREATE TABLE source_starrocks_suser (
       id INT,
       name STRING,
       p_id INT
    )WITH (
       'connector' = 'starrocks',
       'scan-url'='192.168.110.101:8030',
       'jdbc-url'='jdbc:mysql://192.168.110.101:9030',
       'database-name' = 'starrocks',
       'table-name' = 's_user',
       'username' = 'root',
       'password' = 'root'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    4.1.4、Sink部分,创建向MySQL的映射表sink_mysql_suser
    CREATE TABLE sink_mysql_suser (
       id INT,
       name STRING,
       p_id INT,
       PRIMARY KEY (id) NOT ENFORCED
    )WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://192.168.110.102:3306/ODS',
       'table-name' = 's_user',
       'username' = 'root',
       'password' = 'root'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.2、MySQL准备

    4.2.1、清空MySQL s_user表数据,为一会儿导入新数据做准备
     use ODS;
    truncate table s_user;
    
    • 1
    • 2

    4.3、Flink执行导入任务

    简单梳理操作,实际业务可能会对StarRocks中多个表的数据进行分组或者join等处理然后再导入。

     insert into sink_mysql_suser select id,name,p_id from source_starrocks_suser;
    
    • 1

    4.4、查看MySQL数据

     select * from s_user;
    
    • 1

    5、Flink CDC同步MySQL数据至StarRocks

    • 使用FlinkJDBC来读取MySQL数据时,JDBC的方式是“一次性”的导入,若希望让Flink感知MySQL数据源的数据变化,并近实时的实现据 同步,就需要使用Flink CDC。
    • CDC是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的数据变动记录,同步到一个或多个数据目的地中(Sink)。直观的说就是当数据源的数据变化时,通过CDC可以让目标库中的数据同步发生变化(仅限于DML操作)。
    • 还使用前面MySQL的s_user表以及StarRocks的s_user表来演示。

    5.1、MySQL准备

    5.1.1、MySQL开启binlog(格式为ROW模式)

    vi /etc/my.cnf

    log-bin=mysql-bin  # 开启binlog
    binlog-format=ROW # 选择ROW模式
    server_id=1       # 配置MySQL replaction
    
    • 1
    • 2
    • 3
    5.1.2、重启MySQL服务:
    systemctl restart mysqld
    
    • 1

    5.2、StarRocks准备

    5.2.1、StarRocks中清空s_user表中的数据
    mysql -h192.168.110.101 -P9030 -uroot –proot
    
    use starrocks;
    truncate table s_user;
    
    • 1
    • 2
    • 3
    • 4

    5.3、Flink准备

    5.3.1、启动Flink(服务未停止,可以跳过)
    ./start-cluster.sh
    
    • 1
    5.3.2、启动sql-client
    ./sql-client.sh embedded
    
    • 1
    5.3.3、Source部分,创建MySQL映射表cdc_mysql_suser
    CREATE TABLE cdc_mysql_suser (
       id INT,
       name STRING,
       p_id INT
    ) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = '192.168.110.102',
       'port' = '3306',
       'username' = 'root',
       'password' = 'root',
       'database-name' = 'ODS',
       'scan.incremental.snapshot.enabled'='false',
       'table-name' = 's_user'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    5.3.4、Sink部分,创建向StarRocks的cdc_starrocks_suser
    CREATE TABLE cdc_starrocks_suser (
       id INT,
       name STRING,
       p_id INT,
       PRIMARY KEY (id) NOT ENFORCED
    )WITH (
       'connector' = 'starrocks',
       'jdbc-url'='jdbc:mysql://192.168.110.101:9030',
       'load-url'='192.168.110.101:8030',
       'database-name' = 'starrocks',
       'table-name' = 's_user',
       'username' = 'root',
       'password' = 'root',
       'sink.buffer-flush.interval-ms' = '5000',
       'sink.properties.column_separator' = '\x01',
       'sink.properties.row_delimiter' = '\x02'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    5.4、执行同步任务

    insert into cdc_starrocks_suser select id,name,p_id from cdc_mysql_suser;
    
    • 1

    在CDC场景下,Flink SQL执行后同步任务将会持续进行,当MySQL中数据出现变化,Flink会快速感知,并将变化同步至StarRocks中。

    5.5、数据观察

    5.5.1、MySQL库中观察数据
    mysql -uroot –proot
    
    use ODS;
    select * from s_user;
    
    • 1
    • 2
    • 3
    • 4
    5.5.2、StarRocks库中观察数据
    mysql -h192.168.110.101 -P9030 -uroot –proot
    
    use starrocks;
    select * from s_user;
    
    • 1
    • 2
    • 3
    • 4
    5.5.3、MySQL中,对数据进行增删改操作
     INSERT INTO s_user VALUES(12345,'SR',61);
    
    DELETE FROM s_user WHERE id = 10010;
    
    UPDATE s_user SET `name`='No.1' WHERE id = 10086;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    5.5.4、查看StarRocks中表的数据
     select * from s_user;
    
    • 1

    可以确认对MySQL源表数据的增加、修改和删除操作引起的数据变化,都能同步至StarRocks目标表中。

    6、通过CDC+SMT实现MySQL多表数据的秒级同步

    StarRocks Migration Tool:为了友好的解决多表同步时的问题,StarRocks发布了StarRocks-migrate-tools(简称smt)工具,来快捷生成StarRocks表结构和Flink-SQL映射表及同步语句。Smt目前可用于MySQL、PostgreSQL、Oracle和hive,后面三个数据库的同步还在公测中,先以MySQL来进行演示。

    6.1 MySQL准备

    已开启binlog的MySQL中创建数据库CDC,并在其中创建表departments和jobs,创建完成后再导入少量数据。

    6.1.1、创建表departments
    CREATE DATABASE CDC;
    USE CDC;
     CREATE TABLE `departments` (
       `department_id` int(4) NOT NULL AUTO_INCREMENT,
       `department_name` varchar(3) DEFAULT NULL,
       `manager_id` int(6) DEFAULT NULL,
       `location_id` int(4) DEFAULT NULL,
       PRIMARY KEY (`department_id`)
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    6.1.2、为表departments插入数据
    insert  into `departments`(`department_id`,`department_name`,`manager_id`,`location_id`) 
    values (10,'Adm',200,1700),(20,'Mar',201,1800),(30,'Pur',114,1700),(40,'Hum',203,2400),(50,'Shi',121,1500),(60,'IT',103,1400),(70,'Pub',204,2700),(80,'Sal',145,2500),(90,'Exe',100,1700),(100,'Fin',108,1700),(110,'Acc',205,1700),(120,'Tre',NULL,1700),(130,'Cor',NULL,1700),(140,'Con',NULL,1700),(150,'Sha',NULL,1700),(160,'Ben',NULL,1700),(170,'Man',NULL,1700),(180,'Con',NULL,1700),(190,'Con',NULL,1700),(200,'Ope',NULL,1700),(210,'IT ',NULL,1700),(220,'NOC',NULL,1700),(230,'IT ',NULL,1700),(240,'Gov',NULL,1700),(250,'Ret',NULL,1700),(260,'Rec',NULL,1700),(270,'Pay',NULL,1700);
    
    • 1
    • 2
    6.1.3、创建表jobs
    CREATE TABLE `jobs` (
       `job_id` varchar(10) NOT NULL,
       `job_title` varchar(35) DEFAULT NULL,
       `min_salary` int(6) DEFAULT NULL,
       `max_salary` int(6) DEFAULT NULL,
       PRIMARY KEY (`job_id`)
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    6.1.4、为表jobs插入数据
    insert  into `jobs`(`job_id`,`job_title`,`min_salary`,`max_salary`) 
    values ('AC_ACCOUNT','Public Accountant',4200,9000),('AC_MGR','Accounting Manager',8200,16000),('AD_ASST','Administration Assistant',3000,6000),('AD_PRES','President',20000,40000),('AD_VP','Administration Vice President',15000,30000),('FI_ACCOUNT','Accountant',4200,9000),('FI_MGR','Finance Manager',8200,16000),('HR_REP','Human Resources Representative',4000,9000),('IT_PROG','Programmer',4000,10000),('MK_MAN','Marketing Manager',9000,15000),('MK_REP','Marketing Representative',4000,9000),('PR_REP','Public Relations Representative',4500,10500),('PU_CLERK','Purchasing Clerk',2500,5500),('PU_MAN','Purchasing Manager',8000,15000),('SA_MAN','Sales Manager',10000,20000),('SA_REP','Sales Representative',6000,12000),('SH_CLERK','Shipping Clerk',2500,5500),('ST_CLERK','Stock Clerk',2000,5000),('ST_MAN','Stock Manager',5500,8500);
    
    • 1
    • 2

    6.2 配置SMT工具

    6.2.1 下载smt工具,解压后修改配置文件

    vi conf/config_prod.conf

    1、配置MySQL部分
    [db]
    
    host = 192.168.110.102  #MySQL所在服务器IP
    port = 3306  #MySQL服务端口
    user = root  #用户名
    password = root  #密码
    # currently available types: `mysql`, `pgsql`, `oracle`, `hive`
    type = mysql  #类型选择MySQL,目前PostgreSQL、Oracle和Hive正在公测中
    # # only takes effect on `type == hive`.
    # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
    # authentication = kerberos
    [other]
    # number of backends in StarRocks
    be_num = 1  #配置StarRocks BE的节点数,以便生成更合理bucket数量的建表语句
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = true  #使用更高精度的Decimal类型,1.18后的版本都支持
    # file to save the converted DDL SQL
    output_dir = ./result  #后续生成sql文件的保存目录
    # !!!`database` `table` `schema` are case sensitive in `oracle`!!!
    [table-rule.1]
    # pattern to match databases for setting properties
    # !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!!
    database = CDC  #配置需要同步的数据库,需使用正则表达式的写法
    # pattern to match tables for setting properties
    table = departments|jobs  #配置需要同步的表,需使用正则表达式的写法
    # `schema` only takes effect on `postgresql` and `oracle`
    schema = ^public$  #同步MySQL时不需要管这个
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    2、配置StarRocks集群信息
    ############################################
    ### flink sink configurations  #这部分与Flink Sink部分写法相似
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.110.101:9030
    flink.starrocks.load-url=192.168.110.101:8030
    flink.starrocks.username=root
    flink.starrocks.password=root
    flink.starrocks.sink.properties.format=json  #以json格式攒批
    flink.starrocks.sink.properties.strip_outer_array=true  #展开为数组
    flink.starrocks.sink.buffer-flush.interval-ms=10000  #攒批10秒导入一次
    # # used to set the server-id for mysql-cdc jobs instead of using a random server-id
    # flink.cdc.server-id = 5000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    6.3 SMT工具使用

    参考地址:
    https://docs.starrocks.io/zh-cn/latest/loading/Flink_cdc_load#%E4%BB%8E-mysql-%E5%AE%9E%E6%97%B6%E5%90%8C%E6%AD%A5

    6.3.1 执行smt工具
    ./starrocks-migrate-tool
    
    • 1
    6.3.2 在配置的./result路径下生成sql语句文件
    flink-create.1.sql
    flink-create.all.sql
    starrocks-create.1.sql
    starrocks-create.all.sql
    starrocks-external-create.1.sql
    starrocks-external-create.all.sql
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    6.4 生成Flink 任务

    6.4.1 同步库表结构

    如果数据需要经过 Flink 处理后写入目标表,目标表与源表的结构不一样,则您需要修改 SQL 文件 starrocks-create.all.sql 中的建表语句。

    mysql -h192.168.110.101 -P9030 -uroot -proot < /opt/module/smt/result/starrocks-create.all.sql
    
    
    • 1
    • 2
    6.4.2、同步数据

    进入 Flink 目录,执行如下命令

    ./bin/sql-client.sh -f /opt/module/smt/result/flink-create.all.sql
    
    • 1
    6.4.3、处理同步数据

    在同步过程中,如果您需要对数据进行一定的处理,例如 GROUP BY、JOIN 等,则可以修改 SQL 文件 flink-create.all.sql。可以通过执行 count(*) 和 GROUP BY 计算。

    INSERT INTO `default_catalog`.`demo`.`orders_sink` SELECT product_id,product_name, COUNT(*) AS cnt FROM `default_catalog`.`demo`.`orders_src` WHERE order_date >'2021-01-01 00:00:01' GROUP BY product_id,product_name;
    
    • 1

    执行同步数据命令(5.4.2),如果返回如下结果,则表示 Flink job 已经提交,开始同步全量和增量数据。

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 5ae005c4b3425d8bb13fe660260a35da
    
    • 1
    • 2
    • 3

    6.5 观察任务状况

     ./flink list
    
    Waiting for response...
    
    ------------------ Running/Restarting Jobs -------------------
    
    19.01.2022 21:55:30 : 80c4e81de2d0d7e34c8f1aac1c22a8c4 : insert-into_default_catalog.CDC.departments_sink (RUNNING)
    
    19.01.2022 21:55:34 : b2b76afe7d33196a09a274142d9128cf : insert-into_default_catalog.CDC.jobs_sink (RUNNING)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    6.6 数据观察

    就不再演示改变数据了,与场景四中的情况相同,当数据源中的数据变化时,StarRocks中的数据也会同步变化,实现数据的近实时同步。

    这个场景特别适合维度表的数据同步,因为当前StarRocks还不支持update语法,就可以将数据需要频繁更新的维度表放在MySQL中,使用Flink CDC+SMT实时的在StarRocks中同步数据,实现灵活的多表关联查询。

  • 相关阅读:
    .NET6发布项目到腾讯云Windows2012R全网最详细教程
    STL——查找算法及实例
    PTA 甲级 1012 The Best Rank
    使用Qt QML创建自定义表格组件
    PNA/RNA/DNA修饰方法研究进展
    CF1535F String Distance
    PyTorch中的pyi檔案生成機制
    JS库-jQuery入门到掌握
    Fisco Bcos从入门到国一,宝宝版教学
    抗疫众志成城网页设计成品 抗击疫情感动人物网页制作模板 大学生抗疫静态HTML网页源码 dreamweaver网页作业致敬逆行者网页设计作品
  • 原文地址:https://blog.csdn.net/docsz/article/details/133340784