Apache Paimon(以下简称 Paimon)作为支持实时更新的高性能湖存储,本用例展示了在千万数据规模下使用全量 + 增量一体化同步 MySQL 订单表到 Paimon明细表、下游计算聚合及持续消费更新的能力。整体流程如下图所示,其中 MySQL 需要提前准备 ,本机需要下载 Flink 包及 Paimon相关依赖,TPC-H 数据生成器。

数据源由 TPC-H 生成并导入 MySQL,本例选取其中订单明细表 lineitem,在写入 Paimon时以 l_shipdate 字段作为业务时间定义分区 l_year 和 l_month,时间跨度从 1992.1-1998.12,动态写入 84 个分区,详细配置如下表所示。
| Configuration | Value | Description |
|---|---|---|
| 记录数 | 59,986,052 | 全量阶段同步数据量 |
| 动态写入分区数 | 84 | `l_year` 为一级分区,`l_month` 为二级分区 |
| Checkpoint Interval | 1m | 数据更新频率 |
| Parallelism | 2 | 单机 2 个并发 |
| Paimon Bucket Number | 2 | 每个分区下生成 2 个 bucket |
经测试,在单机并发为 2,checkpoint interval 为 1 min 的配置下(每分钟更新可见)66 min 内写入 59.9 million 全量数据,每 10 min 的写入性能如下表所示,平均写入性能在 1 million/min。
| Duration(min) | Records In (million) |
|---|---|
| 10 | 10 |
| 20 | 20 |
| 30 | 31 |
| 40 | 42 |
| 50 | 50 |
| 60 | 57 |
从作业运行图中可以观测到每 10 min 作业写入的数据量,在 66 min 时全量同步完成,开始持续监听增量数据。
TPC-H 作为一个经典的 Ad-hoc query 性能测试 benchmark,其包含的数据模型与真实的商业场景十分类似。本用例选取其中订单明细表 lineitem 和针对它的单表查询 Q1(下文会有详细说明)
lineitem schema 如下表所示,每行记录在 128 bytes 左右
| 字段 | 类型 | 描述 |
|---|---|---|
| l_orderkey | INT NOT NULL | 主订单 key,即主订单 id,联合主键第一位 |
| l_partkey | INT NOT NULL | 配件 key,即商品 id |
| l_suppkey | INT NOT NULL | 供应商 key,即卖家 id |
| l_linenumber | INT NOT NULL | 子订单 key,即子订单 id,联合主键第二位 |
| l_quantity | DECIMAL(15, 2) NOT NULL | 商品数量 |
| l_extendedprice | DECIMAL(15, 2) NOT NULL | 商品价格 |
| l_discount | DECIMAL(15, 2) NOT NULL | 商品折扣 |
| l_tax | DECIMAL(15, 2) NOT NULL | 商品税 |
| l_returnflag | CHAR(1) NOT NULL | 订单签收标志,A 代表 accepted 签收,R |
| l_linestatus | CHAR(1) NOT NULL | 子订单状态,发货日期晚于 1995-06-17 之前的订单标记为 O,否则标记为 F |
| l_shipdate | DATE NOT NULL | 订单发货日期 |
| l_commitdate | DATE NOT NULL | 订单提交日期 |
| l_receiptdate | DATE NOT NULL | 收货日期 |
| l_shipinstruct | CHAR(25) NOT NULL | 收货信息,比如 DELIVER IN PERSON 本人签收,TAKE BACK RETURN 退货,COLLECT COD 货到付款 |
| l_shipmode | CHAR(10) NOT NULL | 快递模式,有 SHIP 海运,AIR 空运,TRUCK 陆运,MAIL 邮递等类型 |
| l_comment | VARCHAR(44) NOT NULL | 订单注释 |
对发货日期在一定范围内的订单,根据订单状态和收货状态统计订单数、商品数、总营业额、总利润、平均出厂价、平均折扣价、平均折扣含税价等指标。
mysql环境准备:
- DROP DATABASE IF EXISTS flink;
- CREATE DATABASE IF NOT EXISTS flink;
- USE flink;
- CREATE USER 'flink' IDENTIFIED WITH mysql_native_password BY 'flink';
- GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink';
- FLUSH PRIVILEGES;
- -- Create Base Table
- CREATE TABLE lineitem (
- l_orderkey INTEGER NOT NULL,
- l_partkey INTEGER NOT NULL,
- l_suppkey INTEGER NOT NULL,
- l_linenumber INTEGER NOT NULL,
- l_quantity DECIMAL(15,2) NOT NULL,
- l_extendedprice DECIMAL(15,2) NOT NULL,
- l_discount DECIMAL(15,2) NOT NULL,
- l_tax DECIMAL(15,2) NOT NULL,
- l_returnflag CHAR(1) NOT NULL,
- l_linestatus CHAR(1) NOT NULL,
- l_shipdate DATE NOT NULL,
- l_commitdate DATE NOT NULL,
- l_receiptdate DATE NOT NULL,
- l_shipinstruct CHAR(25) NOT NULL,
- l_shipmode CHAR(10) NOT NULL,
- l_comment VARCHAR(44) NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- -- Add PK Constraint
- ALTER TABLE lineitem ADD PRIMARY KEY (l_orderkey, l_linenumber);
- -- Create Delta Table
- CREATE TABLE update_lineitem LIKE lineitem;
-
- CREATE TABLE delete_lineitem (
- l_orderkey INTEGER NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- ALTER TABLE delete_lineitem ADD PRIMARY KEY (l_orderkey);
准备TPC-H 生产文件 :添加tpch_dbgen_linux64-2.14.0.zip到服务器,解压执行命令,生产10G数据,其中lineitem大约7.3G
./dbgen -q -s 10 -T L
![]()
加载数据到mysql
添加mycreds.cnf文件,内容包含mysql的基本信息

- mysql --defaults-extra-file=mycreds.cnf -D flink --local-infile=1 -e "
- LOAD DATA LOCAL INFILE 'lineitem.tbl' INTO TABLE lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\n';"
本用例会在第一步中将全量订单数据(约 59.9 million)导入 MySQL container,预计耗时 15 min,在此期间您可以准备好 Flink 及 Paimon 等环境,等待数据导入完毕,然后启动 Flink 作业。然后使用脚本开始持续触发 TPC-H 产生 RF1(新增订单)和 RF2(删除已有订单)来模拟增量更新(每组新增和删除之间间隔 20s)。以 10 组更新为例,将会产生 6 million 新增订单和 1.5 million 删除订单(注:TPC-H 产生的删除订单为主订单 ID,由于 lineitem 存在联合主键,故实际删除数据量稍大于 1.5 million)。
Demo 运行使用 Flink 1.17.0 版本( flink-1.17.0 ),需要的其它依赖如下
下载cdcjar并放置于本地 flink-1.17.0/lib 目录下,也可以自行下载及编译
上述步骤完成后,lib 目录结构如图所示
- lib
- -rw-r--r-- 1 hadoop hadoop 196487 3月 17 20:07 flink-cep-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 542616 3月 17 20:10 flink-connector-files-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 102468 3月 17 20:14 flink-csv-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 135969953 3月 17 20:22 flink-dist-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 180243 3月 17 20:13 flink-json-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 21043313 3月 17 20:20 flink-scala_2.12-1.17.0.jar
- -rw-rw-r-- 1 hadoop hadoop 36327707 5月 19 15:12 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
- -rw-rw-r-- 1 hadoop hadoop 22096298 5月 22 10:30 flink-sql-connector-mysql-cdc-2.2.1.jar
- -rw-r--r-- 1 hadoop hadoop 15407474 3月 17 20:21 flink-table-api-java-uber-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 37975208 3月 17 20:15 flink-table-planner-loader-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 3146205 3月 17 20:07 flink-table-runtime-1.17.0.jar
- -rw-r--r-- 1 hadoop hadoop 208006 3月 17 17:31 log4j-1.2-api-2.17.1.jar
- -rw-r--r-- 1 hadoop hadoop 301872 3月 17 17:31 log4j-api-2.17.1.jar
- -rw-r--r-- 1 hadoop hadoop 1790452 3月 17 17:31 log4j-core-2.17.1.jar
- -rw-r--r-- 1 hadoop hadoop 24279 3月 17 17:31 log4j-slf4j-impl-2.17.1.jar
- -rw-rw-r-- 1 hadoop hadoop 26745559 5月 19 15:14 paimon-flink-1.17-0.5-20230512.001824-7.jar
vim flink-1.17.0/conf/flink-conf.yaml 文件,按如下配置修改
jobmanager.memory.process.size: 4096m taskmanager.memory.process.size: 4096m taskmanager.numberOfTaskSlots: 8 parallelism.default: 2 execution.checkpointing.interval: 1min state.backend: rocksdb state.backend.incremental: true execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
若想观察 Paimon的异步合并、Snapshot 提交及流读等信息,可以在 flink-1.17.0/conf 目录下修改 log4j.properties 文件,按需增加如下配置
- # Log FTS
- logger.commit.name = org.apache.flink.table.store.file.operation.FileStoreCommitImpl
- logger.commit.level = DEBUG
-
- logger.compaction.name = org.apache.flink.table.store.file.mergetree.compact
- logger.compaction.level = DEBUG
-
- logger.enumerator.name = org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumerator
- logger.enumerator.level = DEBUG
这里我们只开启提交的 DEBUG,然后在 flink-1.17.0 目录下执行 ./bin/start-cluster.sh
./bin/sql-client.sh
执行SQL
- -- 设置使用流模式
- SET 'execution.runtime-mode' = 'streaming';
-
- -- 创建并使用 Paimon Catalog
- CREATE CATALOG `my_catalog` WITH (
- 'type' = 'paimon',
- 'warehouse' = 'hdfs://tmp/table-store-101'
- );
-
- USE CATALOG `my_catalog`;
-
- -- ODS table schema
-
- -- 注意在 PaimonCatalog 下,创建使用其它连接器的表时,需要将表声明为临时表
- CREATE TEMPORARY TABLE `ods_lineitem` (
- `l_orderkey` INT NOT NULL,
- `l_partkey` INT NOT NULL,
- `l_suppkey` INT NOT NULL,
- `l_linenumber` INT NOT NULL,
- `l_quantity` DECIMAL(15, 2) NOT NULL,
- `l_extendedprice` DECIMAL(15, 2) NOT NULL,
- `l_discount` DECIMAL(15, 2) NOT NULL,
- `l_tax` DECIMAL(15, 2) NOT NULL,
- `l_returnflag` CHAR(1) NOT NULL,
- `l_linestatus` CHAR(1) NOT NULL,
- `l_shipdate` DATE NOT NULL,
- `l_commitdate` DATE NOT NULL,
- `l_receiptdate` DATE NOT NULL,
- `l_shipinstruct` CHAR(25) NOT NULL,
- `l_shipmode` CHAR(10) NOT NULL,
- `l_comment` VARCHAR(44) NOT NULL,
- PRIMARY KEY (`l_orderkey`, `l_linenumber`) NOT ENFORCED
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = '127.0.0.1', -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal
- 'port' = '3306',
- 'username' = 'flink',
- 'password' = 'flink',
- 'database-name' = 'flink',
- 'table-name' = 'lineitem'
- );
-
-
- -- DWD table schema
- -- 以 `l_shipdate` 为业务日期,创建以 `l_year` + `l_month` 的二级分区表,注意所有 partition key 都需要声明在 primary key 中
- CREATE TABLE IF NOT EXISTS `dwd_lineitem` (
- `l_orderkey` INT NOT NULL,
- `l_partkey` INT NOT NULL,
- `l_suppkey` INT NOT NULL,
- `l_linenumber` INT NOT NULL,
- `l_quantity` DECIMAL(15, 2) NOT NULL,
- `l_extendedprice` DECIMAL(15, 2) NOT NULL,
- `l_discount` DECIMAL(15, 2) NOT NULL,
- `l_tax` DECIMAL(15, 2) NOT NULL,
- `l_returnflag` CHAR(1) NOT NULL,
- `l_linestatus` CHAR(1) NOT NULL,
- `l_shipdate` DATE NOT NULL,
- `l_commitdate` DATE NOT NULL,
- `l_receiptdate` DATE NOT NULL,
- `l_shipinstruct` CHAR(25) NOT NULL,
- `l_shipmode` CHAR(10) NOT NULL,
- `l_comment` VARCHAR(44) NOT NULL,
- `l_year` BIGINT NOT NULL,
- `l_month` BIGINT NOT NULL,
- PRIMARY KEY (`l_orderkey`, `l_linenumber`, `l_year`, `l_month`) NOT ENFORCED
- ) PARTITIONED BY (`l_year`, `l_month`) WITH (
- -- 每个 partition 下设置 2 个 bucket
- 'bucket' = '2',
- -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点
- 'changelog-producer' = 'input'
- );
-
- -- ADS table schema
- -- 基于 TPC-H Q1,对已发货的订单,根据订单状态和收货状态统计订单数、商品数、总营业额、总利润、平均出厂价、平均折扣价、平均折扣含税价等指标
- CREATE TABLE IF NOT EXISTS `ads_pricing_summary` (
- `l_returnflag` CHAR(1) NOT NULL,
- `l_linestatus` CHAR(1) NOT NULL,
- `sum_quantity` DOUBLE NOT NULL,
- `sum_base_price` DOUBLE NOT NULL,
- `sum_discount_price` DOUBLE NOT NULL,
- `sum_charge_vat_inclusive` DOUBLE NOT NULL,
- `avg_quantity` DOUBLE NOT NULL,
- `avg_base_price` DOUBLE NOT NULL,
- `avg_discount` DOUBLE NOT NULL,
- `count_order` BIGINT NOT NULL,
- PRIMARY KEY (`l_returnflag`, `l_linestatus`) NOT ENFORCED
- ) WITH (
- 'bucket' = '2',
- 'merge-engine'='partial-update',
- 'changelog-producer'='full-compaction'
- );
在全量数据导入到 MySQL lineitem 表后,我们启动全量同步作业,这里以结果表作为作业名,方便标识
任务1:通过 Flink MySQL CDC 同步 ods_lineitem 到 dwd_lineitem
- -- 设置作业名
- SET 'pipeline.name' = 'dwd_lineitem';
- INSERT INTO dwd_lineitem
- SELECT
- `l_orderkey`,
- `l_partkey`,
- `l_suppkey`,
- `l_linenumber`,
- `l_quantity`,
- `l_extendedprice`,
- `l_discount`,
- `l_tax`,
- `l_returnflag`,
- `l_linestatus`,
- `l_shipdate`,
- `l_commitdate`,
- `l_receiptdate`,
- `l_shipinstruct`,
- `l_shipmode`,
- `l_comment`,
- YEAR(`l_shipdate`) AS `l_year`,
- MONTH(`l_shipdate`) AS `l_month`
- FROM `ods_lineitem`;
可以在 Flink Web UI 观察全量同步阶段的 rps、checkpoint 等信息,也可以切换到hdfs的 /tmp/table-store-101/default.db/dwd_lineitem 目录下,查看生成的 snpashot、manifest 和 sst 文件。

在全量同步完成后,我们启动聚合作业,实时写入 ads 表 (注:如有需要在历史全量数据不全的情况下也展示聚合结果,可以不用等待全量同步完成)
任务2:写入结果表 ads_pricing_summary
- -- 设置作业名
- SET 'pipeline.name' = 'ads_pricing_summary';
- INSERT INTO `ads_pricing_summary`
- SELECT
- `l_returnflag`,
- `l_linestatus`,
- SUM(`l_quantity`) AS `sum_quantity`,
- SUM(`l_extendedprice`) AS `sum_base_price`,
- SUM(`l_extendedprice` * (1-`l_discount`)) AS `sum_discount_price`, -- aka revenue
- SUM(`l_extendedprice` * (1-`l_discount`) * (1+`l_tax`)) AS `sum_charge_vat_inclusive`,
- AVG(`l_quantity`) AS `avg_quantity`,
- AVG(`l_extendedprice`) AS `avg_base_price`,
- AVG(`l_discount`) AS `avg_discount`,
- COUNT(*) AS `count_order`
- FROM `dwd_lineitem`
- WHERE (`l_year` < 1998 OR (`l_year` = 1998 AND `l_month`<= 9))
- AND `l_shipdate` <= DATE '1998-12-01' - INTERVAL '90' DAY
- GROUP BY
- `l_returnflag`,
- `l_linestatus`;

我们切换到 batch 模式并且将结果展示切换为 tableau 模式
- SET 'execution.runtime-mode' = 'batch';
-
- SET 'sql-client.execution.result-mode' = 'tableau';
然后查询刚才聚合的结果,可以多运行几次来观测指标的变化 (查询间隔应大于所查上游表的 checkpoint 间隔)
- SET 'pipeline.name' = 'Pricing Summary';
-
- SELECT * FROM ads_pricing_summary;
除了查询聚合指标外,FTS 同时支持查询明细数据。假设我们发现 1998 年 12 月发生退货的子订单指标有问题,想通过订单明细进一步排查,可在 batch 模式下进行如下查询
SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETURN';
使用spark查询数据
下载spark-3.1.2-bin-hadoop2.7
在jars里面放入paimon-spark-3.1-0.5-SNAPSHOT.jar
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.warehouse=hdfs://tmp/table-store-101
- USE paimon.default;
- select count(*) from dwd_lineitem;
- SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETURN';


生成更新数据
编写脚本
- start_pair=1
- total_pair=10
- MYSQL_DATABASE=gmall
- for i in `seq ${start_pair} ${total_pair}`; do
-
- if [[ `expr ${i} % 10` -eq 0 ]]; then
- echo "$(date +"%Y-%m-%d %H:%M:%S") Start to apply Old Sales Refresh Function (RF2) for pair ${i}"
- fi
- # This refresh function removes old sales information from the database.
- mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE delete_lineitem"
- mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e "
- LOAD DATA LOCAL INFILE 'delete.${i}' INTO TABLE delete_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\n';"
- mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "
- BEGIN;
- DELETE FROM lineitem WHERE l_orderkey IN (SELECT l_orderkey FROM delete_lineitem);
- COMMIT;"
-
- sleep 20s
- if [[ `expr ${i} % 10` -eq 0 ]]; then
- echo "$(date +"%Y-%m-%d %H:%M:%S") Start to apply New Sales Refresh Function (RF1) for pair ${i}"
- fi
- # This refresh function adds new sales information to the database.
- mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE update_lineitem"
- mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e "
- LOAD DATA LOCAL INFILE 'lineitem.tbl.u${i}' INTO TABLE update_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\n';"
- mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "
- BEGIN;
- INSERT INTO lineitem
- SELECT * FROM update_lineitem;
- COMMIT;"
-
- done
- start_pair=`expr ${total_pair} + 1`
- total_pair=`expr ${total_pair} \* 2`
-
- sleep 2m
参考