• flinkcdc同步mongo数据到clickhouse


    有一个这样的场景,有大批量数据在mongo中,其中mongo中数据还会频繁变更,但是几乎不会删除,这样的数据需要产生一个大屏来供用户看统计数据,之前的设计方案是直接通过定时任务每10分钟去mongo计算一次,这种做法除了计算慢还可能会导致影响业务,所以需要通过大数据来计算增加大屏的实时性,且降低对业务的影响。我提供如下解决方案:

    mongo中数据通过cdc同步到kafka,然后kafka中数据再分别落到hudi和clickhouse中,如果对实时性要求比较低的话去掉kafka,直接用hudi也可以,flink可以流式读取hudi,后续如果clickhouse中数据又问题也可以直接从hudi读取不用再去mongo拉全量数据来初始化了。

    为了测试方案的可行性,这里拿mongo通过flink-cdc直接同步到clickhouse做demo测试。

    mongo和clickhouse使用dockerhub拉取,这里记录一些过程中用到的命令

    1. docker pull mongo:5.0.11
    2. docker pull clickhouse/clickhouse-server:21.7.11.3

    写docker-compose文件

    1. version: '3'
    2. services:
    3. mongo5:
    4. image: mongo:5.0.11
    5. container_name: mongo5
    6. hostname: mongo5
    7. volumes:
    8. - mongo5-db:/data/db
    9. - mongo5-configdb:/data/configdb
    10. - ./mongod.conf:/etc/mongod.conf
    11. restart: always
    12. ports:
    13. - "27017:27017"
    14. networks:
    15. test:
    16. ipv4_address: 10.10.0.8
    17. command: ['mongod', '--config', '/etc/mongod.conf']
    18. clickhouse21:
    19. image: clickhouse/clickhouse-server:21.7.11.3
    20. container_name: clickhouse-server21
    21. hostname: clickhouse-server21
    22. volumes:
    23. - clickhouse-server21:/var/lib/clickhouse
    24. restart: always
    25. ports:
    26. - "8123:8123"
    27. networks:
    28. test:
    29. ipv4_address: 10.10.0.9
    30. volumes:
    31. mongo5-db:
    32. mongo5-configdb:
    33. clickhouse-server21:
    34. networks:
    35. test:
    36. external: true

    network: test是我创建的外部网络,上边的ipv4_address是我自己test网断中的ip,这里可以更改为自己的网络。

    mongod.conf中内容如下,因为是demo,所以就不需要起多个mongod进程了,直接用一个mongdod进程

    1. replication:
    2. replSetName: "rs0"

    通过docker-compose up -d启动容器以后,分别进入mongo和clickhouse中

    1. # 进入mongo容器内部
    2. docker exec -it mongo5 bash
    3. # 进入mongo shell命令行
    4. mongosh
    5. # 初始化rs
    6. rs.initiate()
    7. # 创建cdc同步用户
    8. use admin;
    9. db.createUser({
    10. user: "flinkuser",
    11. pwd: "flinkpw",
    12. roles: [
    13. { role: "read", db: "admin" }, //read role includes changeStream privilege
    14. { role: "readAnyDatabase", db: "admin" } //for snapshot reading
    15. ]
    16. });
    17. # 创建同步用户的collection
    18. use test;
    19. db.createCollection('users');
    1. # 进入clickhouser容器
    2. docker exec -it clickhouse-server21 bash
    3. # 通过default用户登陆clickhouse,安装后默认为无密码的default用户
    4. clickhouse-client --user default
    5. # 创建test库
    6. create database test;
    7. # 创建users表
    8. create table users(
    9. id UInt32,
    10. name String,
    11. age Int8,
    12. create_time String,
    13. modify_time String,
    14. _mofify_time DateTime MATERIALIZED toDateTime(modify_time, 'Asia/Shanghai')
    15. ) engine = ReplacingMergeTree(_mofify_time)
    16. partition by (toDate(create_time, 'Asia/Shanghai'))
    17. order by id;
    18. # 其中_mofify_time是用来给ReplacingMergeTree做合并使用的,根据modify_time保留最后更新的数据。

    准备工作已完成,剩下就是通过jdbc往clickhouse写数据了,flink官方的jdbc-connector并不支持clickhouse,这里简单实现一个,需要修改一个flink的类,同时新增一个Clickhouse方言,具体操作如下:

    新增ClickhouseDialect类,这个类目前仅供demo使用,getRowConverter和unSupportedTypes未适配。其中核心是修改update为insert语句,clickhouse中update和delete时mutaion操作,不建议修改数据,但是业务中数据改变又比较频繁,所以这里通过insert新数据来实现update,最终通过视图来过滤数据。

    1. package org.apache.flink.connector.jdbc.dialect;
    2. import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
    3. import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
    4. import org.apache.flink.table.types.logical.LogicalTypeRoot;
    5. import org.apache.flink.table.types.logical.RowType;
    6. import org.slf4j.Logger;
    7. import org.slf4j.LoggerFactory;
    8. import java.util.Collections;
    9. import java.util.List;
    10. import java.util.Optional;
    11. public class ClickhouseDialect extends AbstractDialect {
    12. @Override
    13. public int maxDecimalPrecision() {
    14. return 76;
    15. }
    16. @Override
    17. public int minDecimalPrecision() {
    18. return 1;
    19. }
    20. @Override
    21. public int maxTimestampPrecision() {
    22. return 9;
    23. }
    24. @Override
    25. public int minTimestampPrecision() {
    26. return 0;
    27. }
    28. @Override
    29. public List unsupportedTypes() {
    30. return Collections.emptyList();
    31. }
    32. @Override
    33. public String dialectName() {
    34. return "Clickhouse";
    35. }
    36. @Override
    37. public boolean canHandle(String url) {
    38. return url.startsWith("jdbc:ch:") || url.startsWith("jdbc:clickhouse:");
    39. }
    40. @Override
    41. public JdbcRowConverter getRowConverter(RowType rowType) {
    42. return new AbstractJdbcRowConverter(rowType) {
    43. @Override
    44. public String converterName() {
    45. return "Clickhouse";
    46. }
    47. };
    48. }
    49. @Override
    50. public String getLimitClause(long limit) {
    51. return "LIMIT " + limit;
    52. }
    53. @Override
    54. public Optional defaultDriverName() {
    55. return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
    56. }
    57. @Override
    58. public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
    59. return super.getInsertIntoStatement(tableName, fieldNames);
    60. }
    61. @Override
    62. public Optional getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
    63. return Optional.of(getInsertIntoStatement(tableName, fieldNames));
    64. }
    65. }

    修改JdbcDialects类,在DIALECTS中添加ClickhouseDialect

    1. package org.apache.flink.connector.jdbc.dialect;
    2. import java.util.Arrays;
    3. import java.util.List;
    4. import java.util.Optional;
    5. public class JdbcDialects {
    6. private static final List DIALECTS =
    7. Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect(), new ClickhouseDialect());
    8. /** Fetch the JdbcDialect class corresponding to a given database url. */
    9. public static Optional get(String url) {
    10. for (JdbcDialect dialect : DIALECTS) {
    11. if (dialect.canHandle(url)) {
    12. return Optional.of(dialect);
    13. }
    14. }
    15. return Optional.empty();
    16. }
    17. }

    编写flink程序并启动

    1. package com.catcher92.demo.flink.cdc;
    2. import org.apache.flink.configuration.Configuration;
    3. import org.apache.flink.streaming.api.CheckpointingMode;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.table.api.EnvironmentSettings;
    6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    7. public class FlinkMongoCdcToClickhouse {
    8. public static void main(String[] args) {
    9. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    10. env.getCheckpointConfig().setCheckpointInterval(10 * 1000L);
    11. env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/checkpoints/FlinkMongoCdcToClickhouse");
    12. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    13. env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
    14. env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
    15. final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
    16. builder.useBlinkPlanner().inStreamingMode();
    17. final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, builder.build());
    18. tEnv.executeSql("CREATE TABLE mongo_test_users (\n" +
    19. " _id INT, \n" +
    20. " name STRING,\n" +
    21. " age INT,\n" +
    22. " create_time STRING,\n" +
    23. " modify_time STRING,\n" +
    24. " PRIMARY KEY(_id) NOT ENFORCED\n" +
    25. ") WITH (\n" +
    26. " 'connector' = 'mongodb-cdc',\n" +
    27. " 'hosts' = 'localhost:27017',\n" +
    28. " 'username' = 'flinkuser',\n" +
    29. " 'password' = 'flinkpw',\n" +
    30. " 'database' = 'test',\n" +
    31. " 'collection' = 'users'\n" +
    32. ")");
    33. tEnv.executeSql("create table clickhouse_test_users(\n" +
    34. " id INT, \n" +
    35. " name STRING,\n" +
    36. " age INT,\n" +
    37. " create_time STRING,\n" +
    38. " modify_time STRING,\n" +
    39. " PRIMARY KEY(id) NOT ENFORCED\n" +
    40. ") WITH (\n" +
    41. " 'connector' = 'jdbc',\n" +
    42. " 'url' = 'jdbc:ch://localhost:8123/test',\n" +
    43. " 'username' = 'default',\n" +
    44. " 'password' = '',\n" +
    45. " 'table-name' = 'users'\n" +
    46. ")");
    47. try {
    48. tEnv.executeSql("insert into clickhouse_test_users\n" +
    49. "select * from mongo_test_users").await();
    50. } catch (Exception e) {
    51. e.printStackTrace();
    52. }
    53. }
    54. }

    启动以后通过mongsh插入数据

    1. db.getCollection('users').insertOne({'_id':1, 'name':'zs1', 'age':1, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
    2. db.getCollection('users').insertOne({'_id':2, 'name':'zs2', 'age':2, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
    3. db.getCollection('users').insertOne({'_id':3, 'name':'zs3', 'age':3, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
    4. db.getCollection('users').insertOne({'_id':4, 'name':'zs4', 'age':4, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
    5. db.getCollection('users').insertOne({'_id':5, 'name':'zs5', 'age':5, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});

    由于mongo-cdc中靠算子ChangelogNormalize来补全-U的数据,所以会导致checkpoint特别大,这里需要用RocksDB做状态后端并且建议用增量checkpoint防止程序超内存被kill掉。

    然后去clickhouse中查询数据

    1. select * from users;
    2. ┌─id─┬─name─┬─age─┬─create_time─────────┬─modify_time─────────┐
    3. 1 │ zs1 │ 12022-09-04 15:39:002022-09-04 15:39:00
    4. 2 │ zs2 │ 22022-09-04 15:39:002022-09-04 15:39:00
    5. 3 │ zs3 │ 32022-09-04 15:39:002022-09-04 15:39:00
    6. 4 │ zs4 │ 42022-09-04 15:39:002022-09-04 15:39:00
    7. 5 │ zs5 │ 52022-09-04 15:39:002022-09-04 15:39:00
    8. └────┴──────┴─────┴─────────────────────┴─────────────────────┘
    9. 5 rows in set. Elapsed: 0.004 sec.

    这个时候去更新mongo中数据

    db.getCollection('users').updateOne({'_id':1}, {$set:{'name':'zs1_1', 'modify_time':'2022-09-04 16:47:00'}})
    

    再去查询clickhouse中数据

    1. select * from users;
    2. Query id: b406561e-1e6d-4bfe-a3ae-e852516b11e0
    3. ┌─id─┬─name─┬─age─┬─create_time─────────┬─modify_time─────────┐
    4. 1 │ zs1 │ 12022-09-04 15:39:002022-09-04 15:39:00
    5. 2 │ zs2 │ 22022-09-04 15:39:002022-09-04 15:39:00
    6. 3 │ zs3 │ 32022-09-04 15:39:002022-09-04 15:39:00
    7. 4 │ zs4 │ 42022-09-04 15:39:002022-09-04 15:39:00
    8. 5 │ zs5 │ 52022-09-04 15:39:002022-09-04 15:39:00
    9. └────┴──────┴─────┴─────────────────────┴─────────────────────┘
    10. ┌─id─┬─name──┬─age─┬─create_time─────────┬─modify_time─────────┐
    11. 1 │ zs1_1 │ 12022-09-04 15:39:002022-09-04 16:47:00
    12. └────┴───────┴─────┴─────────────────────┴─────────────────────┘
    13. 6 rows in set. Elapsed: 0.005 sec.

    可以看到已经变成6条数据了符合预期结果,至于clickhouse中ReplacingMergeTree啥时候会将数据合并成5条,完全是未知数,所以自行通过创建视图方式来过滤数据即可。

  • 相关阅读:
    js的变量及运算符
    axios 简介及在项目中的使用
    【MySQL】-【变量、流程控制、游标】
    蓄电池为什么要巡检?智能电池巡检箱监控系统方案帮您解答
    通义灵码-IDEA的使用教程
    网络口碑营销:业如何维护好网络口碑?
    CH552T可以这样接si24r1吗
    Springboot Actuator未授权访问漏洞复现
    4进程地址空间
    OpenFaaS梳理之一:部署
  • 原文地址:https://blog.csdn.net/catcher92/article/details/126692064