有一个这样的场景,有大批量数据在mongo中,其中mongo中数据还会频繁变更,但是几乎不会删除,这样的数据需要产生一个大屏来供用户看统计数据,之前的设计方案是直接通过定时任务每10分钟去mongo计算一次,这种做法除了计算慢还可能会导致影响业务,所以需要通过大数据来计算增加大屏的实时性,且降低对业务的影响。我提供如下解决方案:
mongo中数据通过cdc同步到kafka,然后kafka中数据再分别落到hudi和clickhouse中,如果对实时性要求比较低的话去掉kafka,直接用hudi也可以,flink可以流式读取hudi,后续如果clickhouse中数据又问题也可以直接从hudi读取不用再去mongo拉全量数据来初始化了。
为了测试方案的可行性,这里拿mongo通过flink-cdc直接同步到clickhouse做demo测试。
mongo和clickhouse使用dockerhub拉取,这里记录一些过程中用到的命令
- docker pull mongo:5.0.11
- docker pull clickhouse/clickhouse-server:21.7.11.3
写docker-compose文件
- version: '3'
- services:
- mongo5:
- image: mongo:5.0.11
- container_name: mongo5
- hostname: mongo5
- volumes:
- - mongo5-db:/data/db
- - mongo5-configdb:/data/configdb
- - ./mongod.conf:/etc/mongod.conf
- restart: always
- ports:
- - "27017:27017"
- networks:
- test:
- ipv4_address: 10.10.0.8
- command: ['mongod', '--config', '/etc/mongod.conf']
-
- clickhouse21:
- image: clickhouse/clickhouse-server:21.7.11.3
- container_name: clickhouse-server21
- hostname: clickhouse-server21
- volumes:
- - clickhouse-server21:/var/lib/clickhouse
- restart: always
- ports:
- - "8123:8123"
- networks:
- test:
- ipv4_address: 10.10.0.9
-
- volumes:
- mongo5-db:
- mongo5-configdb:
- clickhouse-server21:
-
- networks:
- test:
- external: true
network: test是我创建的外部网络,上边的ipv4_address是我自己test网断中的ip,这里可以更改为自己的网络。
mongod.conf中内容如下,因为是demo,所以就不需要起多个mongod进程了,直接用一个mongdod进程
- replication:
- replSetName: "rs0"
通过docker-compose up -d启动容器以后,分别进入mongo和clickhouse中
- # 进入mongo容器内部
- docker exec -it mongo5 bash
- # 进入mongo shell命令行
- mongosh
- # 初始化rs
- rs.initiate()
- # 创建cdc同步用户
- use admin;
- db.createUser({
- user: "flinkuser",
- pwd: "flinkpw",
- roles: [
- { role: "read", db: "admin" }, //read role includes changeStream privilege
- { role: "readAnyDatabase", db: "admin" } //for snapshot reading
- ]
- });
- # 创建同步用户的collection
- use test;
- db.createCollection('users');
- # 进入clickhouser容器
- docker exec -it clickhouse-server21 bash
-
- # 通过default用户登陆clickhouse,安装后默认为无密码的default用户
- clickhouse-client --user default
-
- # 创建test库
- create database test;
-
- # 创建users表
- create table users(
- id UInt32,
- name String,
- age Int8,
- create_time String,
- modify_time String,
- _mofify_time DateTime MATERIALIZED toDateTime(modify_time, 'Asia/Shanghai')
- ) engine = ReplacingMergeTree(_mofify_time)
- partition by (toDate(create_time, 'Asia/Shanghai'))
- order by id;
-
- # 其中_mofify_time是用来给ReplacingMergeTree做合并使用的,根据modify_time保留最后更新的数据。
准备工作已完成,剩下就是通过jdbc往clickhouse写数据了,flink官方的jdbc-connector并不支持clickhouse,这里简单实现一个,需要修改一个flink的类,同时新增一个Clickhouse方言,具体操作如下:
新增ClickhouseDialect类,这个类目前仅供demo使用,getRowConverter和unSupportedTypes未适配。其中核心是修改update为insert语句,clickhouse中update和delete时mutaion操作,不建议修改数据,但是业务中数据改变又比较频繁,所以这里通过insert新数据来实现update,最终通过视图来过滤数据。
- package org.apache.flink.connector.jdbc.dialect;
-
- import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
- import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
- import org.apache.flink.table.types.logical.LogicalTypeRoot;
- import org.apache.flink.table.types.logical.RowType;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.Collections;
- import java.util.List;
- import java.util.Optional;
-
- public class ClickhouseDialect extends AbstractDialect {
-
- @Override
- public int maxDecimalPrecision() {
- return 76;
- }
-
- @Override
- public int minDecimalPrecision() {
- return 1;
- }
-
- @Override
- public int maxTimestampPrecision() {
- return 9;
- }
-
- @Override
- public int minTimestampPrecision() {
- return 0;
- }
-
- @Override
- public List
unsupportedTypes() { - return Collections.emptyList();
- }
-
- @Override
- public String dialectName() {
- return "Clickhouse";
- }
-
- @Override
- public boolean canHandle(String url) {
- return url.startsWith("jdbc:ch:") || url.startsWith("jdbc:clickhouse:");
- }
-
- @Override
- public JdbcRowConverter getRowConverter(RowType rowType) {
- return new AbstractJdbcRowConverter(rowType) {
- @Override
- public String converterName() {
- return "Clickhouse";
- }
- };
- }
-
- @Override
- public String getLimitClause(long limit) {
- return "LIMIT " + limit;
- }
-
- @Override
- public Optional
defaultDriverName() { - return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
- }
-
- @Override
- public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
- return super.getInsertIntoStatement(tableName, fieldNames);
- }
-
- @Override
- public Optional
getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { - return Optional.of(getInsertIntoStatement(tableName, fieldNames));
- }
- }
修改JdbcDialects类,在DIALECTS中添加ClickhouseDialect
- package org.apache.flink.connector.jdbc.dialect;
-
- import java.util.Arrays;
- import java.util.List;
- import java.util.Optional;
-
- public class JdbcDialects {
-
- private static final List
DIALECTS = - Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect(), new ClickhouseDialect());
-
- /** Fetch the JdbcDialect class corresponding to a given database url. */
- public static Optional
get(String url) { - for (JdbcDialect dialect : DIALECTS) {
- if (dialect.canHandle(url)) {
- return Optional.of(dialect);
- }
- }
- return Optional.empty();
- }
- }
编写flink程序并启动
- package com.catcher92.demo.flink.cdc;
-
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- public class FlinkMongoCdcToClickhouse {
-
- public static void main(String[] args) {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- env.getCheckpointConfig().setCheckpointInterval(10 * 1000L);
- env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/checkpoints/FlinkMongoCdcToClickhouse");
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
-
- env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
-
- final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
- builder.useBlinkPlanner().inStreamingMode();
- final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, builder.build());
-
- tEnv.executeSql("CREATE TABLE mongo_test_users (\n" +
- " _id INT, \n" +
- " name STRING,\n" +
- " age INT,\n" +
- " create_time STRING,\n" +
- " modify_time STRING,\n" +
- " PRIMARY KEY(_id) NOT ENFORCED\n" +
- ") WITH (\n" +
- " 'connector' = 'mongodb-cdc',\n" +
- " 'hosts' = 'localhost:27017',\n" +
- " 'username' = 'flinkuser',\n" +
- " 'password' = 'flinkpw',\n" +
- " 'database' = 'test',\n" +
- " 'collection' = 'users'\n" +
- ")");
- tEnv.executeSql("create table clickhouse_test_users(\n" +
- " id INT, \n" +
- " name STRING,\n" +
- " age INT,\n" +
- " create_time STRING,\n" +
- " modify_time STRING,\n" +
- " PRIMARY KEY(id) NOT ENFORCED\n" +
- ") WITH (\n" +
- " 'connector' = 'jdbc',\n" +
- " 'url' = 'jdbc:ch://localhost:8123/test',\n" +
- " 'username' = 'default',\n" +
- " 'password' = '',\n" +
- " 'table-name' = 'users'\n" +
- ")");
- try {
- tEnv.executeSql("insert into clickhouse_test_users\n" +
- "select * from mongo_test_users").await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
启动以后通过mongsh插入数据
- db.getCollection('users').insertOne({'_id':1, 'name':'zs1', 'age':1, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
- db.getCollection('users').insertOne({'_id':2, 'name':'zs2', 'age':2, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
- db.getCollection('users').insertOne({'_id':3, 'name':'zs3', 'age':3, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
- db.getCollection('users').insertOne({'_id':4, 'name':'zs4', 'age':4, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
- db.getCollection('users').insertOne({'_id':5, 'name':'zs5', 'age':5, 'create_time':'2022-09-04 15:39:00', 'modify_time':'2022-09-04 15:39:00'});
由于mongo-cdc中靠算子ChangelogNormalize来补全-U的数据,所以会导致checkpoint特别大,这里需要用RocksDB做状态后端并且建议用增量checkpoint防止程序超内存被kill掉。
然后去clickhouse中查询数据
- select * from users;
-
- ┌─id─┬─name─┬─age─┬─create_time─────────┬─modify_time─────────┐
- │ 1 │ zs1 │ 1 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 2 │ zs2 │ 2 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 3 │ zs3 │ 3 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 4 │ zs4 │ 4 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 5 │ zs5 │ 5 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- └────┴──────┴─────┴─────────────────────┴─────────────────────┘
-
- 5 rows in set. Elapsed: 0.004 sec.
这个时候去更新mongo中数据
db.getCollection('users').updateOne({'_id':1}, {$set:{'name':'zs1_1', 'modify_time':'2022-09-04 16:47:00'}})
再去查询clickhouse中数据
- select * from users;
-
- Query id: b406561e-1e6d-4bfe-a3ae-e852516b11e0
-
- ┌─id─┬─name─┬─age─┬─create_time─────────┬─modify_time─────────┐
- │ 1 │ zs1 │ 1 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 2 │ zs2 │ 2 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 3 │ zs3 │ 3 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 4 │ zs4 │ 4 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- │ 5 │ zs5 │ 5 │ 2022-09-04 15:39:00 │ 2022-09-04 15:39:00 │
- └────┴──────┴─────┴─────────────────────┴─────────────────────┘
- ┌─id─┬─name──┬─age─┬─create_time─────────┬─modify_time─────────┐
- │ 1 │ zs1_1 │ 1 │ 2022-09-04 15:39:00 │ 2022-09-04 16:47:00 │
- └────┴───────┴─────┴─────────────────────┴─────────────────────┘
-
- 6 rows in set. Elapsed: 0.005 sec.
可以看到已经变成6条数据了符合预期结果,至于clickhouse中ReplacingMergeTree啥时候会将数据合并成5条,完全是未知数,所以自行通过创建视图方式来过滤数据即可。