• 使用flink sqlserver cdc 同步数据到StarRocks


    前沿: flink cdc功能越发强大,支持的数据源也越多,本篇介绍使用flink cdc实现:

    sqlserver-》(using flink cdc)-〉flink -》(using flink starrocks connector)-〉starrocks整个流程

    1.sqlserver 环境准备(得使用sqlserver 16以下版本,flink cdc当前只支持16以下sqlserver版本)

    我这个使用的是docker环境:

    1. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker images
    2. REPOSITORY TAG IMAGE ID CREATED SIZE
    3. starrocks.docker.scarf.sh/starrocks/allin1-ubuntu latest 4d3c0066a012 3 days ago 4.71GB
    4. mcr.microsoft.com/mssql/server 2019-latest e7fc0b49be3c 4 weeks ago 1.47GB
    5. mcr.microsoft.com/mssql/server 2022-latest 683d523cd395 5 weeks ago 2.9GB
    6. federatedai/standalone_fate latest 6019ec787699 9 months ago 5.29GB
    7. milvusdb/milvus v2.1.4 d9a5c977c414 11 months ago 711MB
    8. starrocks/dev-env main 8f4edba3b115 16 months ago 7.65GB
    9. minio/minio RELEASE.2022-03-17T06-34-49Z 239acc52a73a 17 months ago 228MB
    10. kicbase/stable v0.0.29 64d09634c60d 20 months ago 1.14GB
    11. quay.io/coreos/etcd v3.5.0 a7908fd5fb88 2 years ago 110MB
    1. docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest
    docker exec -it --user root sql_server_2019 bash

    开启代理,重启sqlserver环境,连接: 

    1. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
    2. root@99e196828047:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
    3. SQL Server needs to be restarted in order to apply this setting. Please run
    4. 'systemctl restart mssql-server.service'.
    5. root@99e196828047:/# exit
    6. exit
    7. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker restart sql_server_2019
    8. sql_server_2019
    9. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
    10. root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"

    开启sqlserver cdc功能:

    1. root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"
    2. 1> use cdc_test;
    3. 2> go
    4. Changed database context to 'cdc_test'.
    5. 1> EXEC sys.sp_cdc_enable_db;
    6. 2> go
    7. 1> SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
    8. 2> go
    9. is_cdc_enabled
    10. 1> CREATE TABLE orders (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id]))
    11. 2> go
    12. 1>
    13. 2>
    14. 3> EXEC sys.sp_cdc_enable_table
    15. 4> @source_schema = 'dbo',
    16. 5> @source_name = 'orders',
    17. 6> @role_name = 'cdc_role';
    18. 7> go
    19. Job 'cdc.cdc_test_capture' started successfully.
    20. Job 'cdc.cdc_test_cleanup' started successfully.

    插入一些数据:

    1. 1> select * from orders;
    2. 2> go
    3. id order_date purchaser quantity product_id
    4. ----------- ---------------- ----------- ----------- -----------
    5. 1 2023-07-07 1 1 1
    6. 2 2023-07-07 2 2 2
    7. 3 2023-07-07 3 3 3
    8. 4 2023-07-07 4 4 4
    9. 45 2023-07-07 5 5 5
    10. (5 rows affected)
    11. 1> update orders set quantity = 100 where id =1 ;
    12. 2> go
    13. (1 rows affected)
    14. 1> select * from orders;
    15. 2> go
    16. id order_date purchaser quantity product_id
    17. ----------- ---------------- ----------- ----------- -----------
    18. 1 2023-07-07 1 100 1
    19. 2 2023-07-07 2 2 2
    20. 3 2023-07-07 3 3 3
    21. 4 2023-07-07 4 4 4
    22. 45 2023-07-07 5 5 5
    23. (5 rows affected)
    24. 1> update orders set quantity = 200 where id = 2;
    25. 2> go

    2.准备flink环境:

    3.准备starrocks docker环境:

    见链接:使用 Docker 部署 StarRocks @ deploy_with_docker @ StarRocks Docs

    4.启动flink环境(cd {FLINK_HOME}):

    1. xiuchenggong@xiuchengdeMacBook-Pro bin % ./start-cluster.sh
    2. Starting cluster.
    3. Starting standalonesession daemon on host xiuchengdeMacBook-Pro.local.
    4. Starting taskexecutor daemon on host xiuchengdeMacBook-Pro.local.
    5. xiuchenggong@xiuchengdeMacBook-Pro bin % ./sql-client.sh embedded
    6. SLF4J: Class path contains multiple SLF4J bindings.
    7. SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/flink-1.16.2/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    8. SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    9. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    10. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    11. ▒▓██▓██▒
    12. ▓████▒▒█▓▒▓███▓▒
    13. ▓███▓░░ ▒▒▒▓██▒ ▒
    14. ░██▒ ▒▒▓▓█▓▓▒░ ▒████
    15. ██▒ ░▒▓███▒ ▒█▒█▒
    16. ░▓█ ███ ▓░▒██
    17. ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
    18. █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
    19. ████░ ▒▓█▓ ██▒▒▒ ▓███▒
    20. ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
    21. ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
    22. ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
    23. ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
    24. ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
    25. ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
    26. ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
    27. ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
    28. ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
    29. ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
    30. ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
    31. █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
    32. ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
    33. ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
    34. ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
    35. ▓█ ▒█▓ ░ █░ ▒█ █▓
    36. █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
    37. █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
    38. ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
    39. ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
    40. ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
    41. ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
    42. ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
    43. ______ _ _ _ _____ ____ _ _____ _ _ _ BETA
    44. | ____| (_) | | / ____|/ __ \| | / ____| (_) | |
    45. | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
    46. | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
    47. | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
    48. |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
    49. Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    50. Command history file path: /Users/xiuchenggong/.flink-sql-history
    51. Flink SQL>

    建sqlsever到flink的表:

    1. Flink SQL> CREATE TABLE t_source_sqlserver (
    2. > id INT,
    3. > order_date DATE,
    4. > purchaser INT,
    5. > quantity INT,
    6. > product_id INT,
    7. > PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
    8. > ) WITH (
    9. > 'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器
    10. > 'hostname' = 'localhost', -- SQL Server主机名
    11. > 'port' = '30027', -- SQL Server端口
    12. > 'username' = 'sa', -- SQL Server用户名
    13. > 'password' = 'abc@123456', -- SQL Server密码
    14. > 'database-name' = 'cdc_test', -- 数据库名称
    15. > 'schema-name' = 'dbo', -- 模式名称
    16. > 'table-name' = 'orders' -- 要捕获更改的表名
    17. > );

     再建flink到starrocks的表:

    1. Flink SQL>
    2. >
    3. > CREATE TABLE IF NOT EXISTS `orders_sink` (
    4. > id int,
    5. > order_date date,
    6. > purchaser int,
    7. > quantity int,
    8. > product_id int,
    9. > PRIMARY KEY(`id`) NOT ENFORCED
    10. > ) with (
    11. > 'load-url' = 'localhost:8030',
    12. > 'sink.buffer-flush.interval-ms' = '15000',
    13. > 'sink.properties.row_delimiter' = '\x02',
    14. > 'sink.properties.column_separator' = '\x01',
    15. > 'connector' = 'starrocks',
    16. > 'database-name' = 'test',
    17. > 'table-name' = 'orders',
    18. > 'jdbc-url' = 'jdbc:mysql://localhost:9030',
    19. > 'password' = '',
    20. > 'username' = 'root'
    21. > )
    22. > ;
    23. [INFO] Execute statement succeed.
    1. Flink SQL> show tables;
    2. +--------------------+
    3. | table name |
    4. +--------------------+
    5. | orders_sink |
    6. | t_source_sqlserver |
    7. +--------------------+
    8. 2 rows in set

    提交作业:

    1. Flink SQL> insert into orders_sink select * from t_source_sqlserver;
    2. [INFO] Submitting SQL update statement to the cluster...
    3. WARNING: An illegal reflective access operation has occurred
    4. WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/xiuchenggong/flink/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang.Class.ANNOTATION
    5. WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
    6. WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    7. WARNING: All illegal access operations will be denied in a future release
    8. [INFO] SQL update statement has been successfully submitted to the cluster:
    9. Job ID: 746cc173cd71133e96d080f25327e9bc

    flink webui看到长期驻留的作业:

    5.验证在sqlserver中的数据是不是已经同步到starrocks中了,insert/update/delete:

    1. StarRocks > select * from orders;
    2. +------+------------+-----------+----------+------------+
    3. | id | order_date | purchaser | quantity | product_id |
    4. +------+------------+-----------+----------+------------+
    5. | 1 | 2023-07-07 | 1 | 100 | 1 |
    6. | 3 | 2023-07-07 | 3 | 3 | 3 |
    7. | 4 | 2023-07-07 | 4 | 4 | 4 |
    8. | 45 | 2023-07-07 | 5 | 5 | 5 |
    9. | 2 | 2023-07-07 | 2 | 200 | 2 |
    10. +------+------------+-----------+----------+------------+
    11. 5 rows in set (0.016 sec)
    12. StarRocks >

    数据的增删改都同步过去了;

  • 相关阅读:
    datagrip只导出表结构
    Android app保活(前台服务)
    第二章第十二节:set集合
    音响是如何把微弱声音放大呢
    LCR 123.图书整理
    设计模式——模板方法模式、策略模式(行为型模式)
    【Html】用CSS定义咖啡 - 咖啡配料展示
    基于分组码的消息验证码的程序实现
    std::string与LPCTSTR/LPWSTR互转
    [QT]day1
  • 原文地址:https://blog.csdn.net/gongxiucheng/article/details/132618245