• 基于apache paimon实时数仓全增量一体实时入湖


    用例简介

    Apache Paimon(以下简称 Paimon)作为支持实时更新的高性能湖存储,本用例展示了在千万数据规模下使用全量 + 增量一体化同步 MySQL 订单表到 Paimon明细表、下游计算聚合及持续消费更新的能力。整体流程如下图所示,其中  MySQL 需要提前准备 ,本机需要下载 Flink 包及 Paimon相关依赖,TPC-H 数据生成器

    数据源TPC-H 生成并导入 MySQL,本例选取其中订单明细表 lineitem,在写入 Paimon时以 l_shipdate 字段作为业务时间定义分区 l_yearl_month,时间跨度从 1992.1-1998.12,动态写入 84 个分区,详细配置如下表所示。

    ConfigurationValueDescription
    记录数59,986,052全量阶段同步数据量
    动态写入分区数84`l_year` 为一级分区,`l_month` 为二级分区
    Checkpoint Interval1m数据更新频率
    Parallelism2单机 2 个并发
    Paimon Bucket Number2每个分区下生成 2 个 bucket

    经测试,在单机并发为 2,checkpoint interval 为 1 min 的配置下(每分钟更新可见)66 min 内写入 59.9 million 全量数据,每 10 min 的写入性能如下表所示,平均写入性能在 1 million/min。

    Duration(min)Records In (million)
    1010
    2020
    3031
    4042
    5050
    6057

    从作业运行图中可以观测到每 10 min 作业写入的数据量,在 66 min 时全量同步完成,开始持续监听增量数据。

    关于数据生成

    TPC-H 作为一个经典的 Ad-hoc query 性能测试 benchmark,其包含的数据模型与真实的商业场景十分类似。本用例选取其中订单明细表 lineitem 和针对它的单表查询 Q1(下文会有详细说明)

    lineitem schema 如下表所示,每行记录在 128 bytes 左右

    字段类型描述
    l_orderkeyINT NOT NULL主订单 key,即主订单 id,联合主键第一位
    l_partkeyINT NOT NULL配件 key,即商品 id
    l_suppkeyINT NOT NULL供应商 key,即卖家 id
    l_linenumberINT NOT NULL子订单 key,即子订单 id,联合主键第二位
    l_quantityDECIMAL(15, 2) NOT NULL商品数量
    l_extendedpriceDECIMAL(15, 2) NOT NULL商品价格
    l_discountDECIMAL(15, 2) NOT NULL商品折扣
    l_taxDECIMAL(15, 2) NOT NULL商品税
    l_returnflagCHAR(1) NOT NULL订单签收标志,A 代表 accepted 签收,R 代表 returned 拒收,N 代表 none 未知
    l_linestatusCHAR(1) NOT NULL子订单状态,发货日期晚于 1995-06-17 之前的订单标记为 O,否则标记为 F
    l_shipdateDATE NOT NULL订单发货日期
    l_commitdateDATE NOT NULL订单提交日期
    l_receiptdateDATE NOT NULL收货日期
    l_shipinstructCHAR(25) NOT NULL收货信息,比如 DELIVER IN PERSON 本人签收,TAKE BACK RETURN 退货,COLLECT COD 货到付款
    l_shipmodeCHAR(10) NOT NULL快递模式,有 SHIP 海运,AIR 空运,TRUCK 陆运,MAIL 邮递等类型
    l_commentVARCHAR(44) NOT NULL订单注释

    业务需求(TPC-H Q1)

    对发货日期在一定范围内的订单,根据订单状态和收货状态统计订单数、商品数、总营业额、总利润、平均出厂价、平均折扣价、平均折扣含税价等指标。

    快速开始

    步骤简介

    mysql环境准备:

    1. DROP DATABASE IF EXISTS flink;
    2. CREATE DATABASE IF NOT EXISTS flink;
    3. USE flink;
    4. CREATE USER 'flink' IDENTIFIED WITH mysql_native_password BY 'flink';
    5. GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink';
    6. FLUSH PRIVILEGES;
    7. -- Create Base Table
    8. CREATE TABLE lineitem (
    9. l_orderkey INTEGER NOT NULL,
    10. l_partkey INTEGER NOT NULL,
    11. l_suppkey INTEGER NOT NULL,
    12. l_linenumber INTEGER NOT NULL,
    13. l_quantity DECIMAL(15,2) NOT NULL,
    14. l_extendedprice DECIMAL(15,2) NOT NULL,
    15. l_discount DECIMAL(15,2) NOT NULL,
    16. l_tax DECIMAL(15,2) NOT NULL,
    17. l_returnflag CHAR(1) NOT NULL,
    18. l_linestatus CHAR(1) NOT NULL,
    19. l_shipdate DATE NOT NULL,
    20. l_commitdate DATE NOT NULL,
    21. l_receiptdate DATE NOT NULL,
    22. l_shipinstruct CHAR(25) NOT NULL,
    23. l_shipmode CHAR(10) NOT NULL,
    24. l_comment VARCHAR(44) NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    25. -- Add PK Constraint
    26. ALTER TABLE lineitem ADD PRIMARY KEY (l_orderkey, l_linenumber);
    27. -- Create Delta Table
    28. CREATE TABLE update_lineitem LIKE lineitem;
    29. CREATE TABLE delete_lineitem (
    30. l_orderkey INTEGER NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    31. ALTER TABLE delete_lineitem ADD PRIMARY KEY (l_orderkey);

    准备TPC-H 生产文件 :添加tpch_dbgen_linux64-2.14.0.zip到服务器,解压执行命令,生产10G数据,其中lineitem大约7.3G

    ./dbgen -q -s 10 -T L

     

     加载数据到mysql

    添加mycreds.cnf文件,内容包含mysql的基本信息

    1. mysql --defaults-extra-file=mycreds.cnf -D flink --local-infile=1 -e "
    2. LOAD DATA LOCAL INFILE 'lineitem.tbl' INTO TABLE lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\n';"

    本用例会在第一步中将全量订单数据(约 59.9 million)导入 MySQL container,预计耗时 15 min,在此期间您可以准备好 Flink 及 Paimon 等环境,等待数据导入完毕,然后启动 Flink 作业。然后使用脚本开始持续触发 TPC-H 产生 RF1(新增订单)和 RF2(删除已有订单)来模拟增量更新(每组新增和删除之间间隔 20s)。以 10 组更新为例,将会产生 6 million 新增订单和 1.5 million 删除订单(注:TPC-H 产生的删除订单为主订单 ID,由于 lineitem 存在联合主键,故实际删除数据量稍大于 1.5 million)。

    第二步:下载 Flink、Paimon 及其他所需依赖

    Demo 运行使用 Flink 1.17.0 版本( flink-1.17.0 ),需要的其它依赖如下

    • Flink MySQL CDC connector
    • 基于 Flink 1.17 编译的 FTS(Paimon Jar File)
    • Hadoop Bundle Jar

    下载cdcjar并放置于本地 flink-1.17.0/lib 目录下,也可以自行下载及编译

    上述步骤完成后,lib 目录结构如图所示

    1. lib
    2. -rw-r--r-- 1 hadoop hadoop 196487 3月 17 20:07 flink-cep-1.17.0.jar
    3. -rw-r--r-- 1 hadoop hadoop 542616 3月 17 20:10 flink-connector-files-1.17.0.jar
    4. -rw-r--r-- 1 hadoop hadoop 102468 3月 17 20:14 flink-csv-1.17.0.jar
    5. -rw-r--r-- 1 hadoop hadoop 135969953 3月 17 20:22 flink-dist-1.17.0.jar
    6. -rw-r--r-- 1 hadoop hadoop 180243 3月 17 20:13 flink-json-1.17.0.jar
    7. -rw-r--r-- 1 hadoop hadoop 21043313 3月 17 20:20 flink-scala_2.12-1.17.0.jar
    8. -rw-rw-r-- 1 hadoop hadoop 36327707 5月 19 15:12 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
    9. -rw-rw-r-- 1 hadoop hadoop 22096298 5月 22 10:30 flink-sql-connector-mysql-cdc-2.2.1.jar
    10. -rw-r--r-- 1 hadoop hadoop 15407474 3月 17 20:21 flink-table-api-java-uber-1.17.0.jar
    11. -rw-r--r-- 1 hadoop hadoop 37975208 3月 17 20:15 flink-table-planner-loader-1.17.0.jar
    12. -rw-r--r-- 1 hadoop hadoop 3146205 3月 17 20:07 flink-table-runtime-1.17.0.jar
    13. -rw-r--r-- 1 hadoop hadoop 208006 3月 17 17:31 log4j-1.2-api-2.17.1.jar
    14. -rw-r--r-- 1 hadoop hadoop 301872 3月 17 17:31 log4j-api-2.17.1.jar
    15. -rw-r--r-- 1 hadoop hadoop 1790452 3月 17 17:31 log4j-core-2.17.1.jar
    16. -rw-r--r-- 1 hadoop hadoop 24279 3月 17 17:31 log4j-slf4j-impl-2.17.1.jar
    17. -rw-rw-r-- 1 hadoop hadoop 26745559 5月 19 15:14 paimon-flink-1.17-0.5-20230512.001824-7.jar

    第三步:修改 flink-conf 配置文件并启动集群

    vim flink-1.17.0/conf/flink-conf.yaml 文件,按如下配置修改

    jobmanager.memory.process.size: 4096m
    taskmanager.memory.process.size: 4096m
    taskmanager.numberOfTaskSlots: 8
    parallelism.default: 2
    execution.checkpointing.interval: 1min
    state.backend: rocksdb
    state.backend.incremental: true
    execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

    若想观察 Paimon的异步合并、Snapshot 提交及流读等信息,可以在 flink-1.17.0/conf 目录下修改 log4j.properties 文件,按需增加如下配置

    1. # Log FTS
    2. logger.commit.name = org.apache.flink.table.store.file.operation.FileStoreCommitImpl
    3. logger.commit.level = DEBUG
    4. logger.compaction.name = org.apache.flink.table.store.file.mergetree.compact
    5. logger.compaction.level = DEBUG
    6. logger.enumerator.name = org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumerator
    7. logger.enumerator.level = DEBUG

    这里我们只开启提交的 DEBUG,然后在 flink-1.17.0 目录下执行 ./bin/start-cluster.sh

    第四步:初始化表 schema 并启动 Flink SQL CLI

    ./bin/sql-client.sh
    执行SQL
    
    1. -- 设置使用流模式
    2. SET 'execution.runtime-mode' = 'streaming';
    3. -- 创建并使用 Paimon Catalog
    4. CREATE CATALOG `my_catalog` WITH (
    5. 'type' = 'paimon',
    6. 'warehouse' = 'hdfs://tmp/table-store-101'
    7. );
    8. USE CATALOG `my_catalog`;
    9. -- ODS table schema
    10. -- 注意在 PaimonCatalog 下,创建使用其它连接器的表时,需要将表声明为临时表
    11. CREATE TEMPORARY TABLE `ods_lineitem` (
    12. `l_orderkey` INT NOT NULL,
    13. `l_partkey` INT NOT NULL,
    14. `l_suppkey` INT NOT NULL,
    15. `l_linenumber` INT NOT NULL,
    16. `l_quantity` DECIMAL(15, 2) NOT NULL,
    17. `l_extendedprice` DECIMAL(15, 2) NOT NULL,
    18. `l_discount` DECIMAL(15, 2) NOT NULL,
    19. `l_tax` DECIMAL(15, 2) NOT NULL,
    20. `l_returnflag` CHAR(1) NOT NULL,
    21. `l_linestatus` CHAR(1) NOT NULL,
    22. `l_shipdate` DATE NOT NULL,
    23. `l_commitdate` DATE NOT NULL,
    24. `l_receiptdate` DATE NOT NULL,
    25. `l_shipinstruct` CHAR(25) NOT NULL,
    26. `l_shipmode` CHAR(10) NOT NULL,
    27. `l_comment` VARCHAR(44) NOT NULL,
    28. PRIMARY KEY (`l_orderkey`, `l_linenumber`) NOT ENFORCED
    29. ) WITH (
    30. 'connector' = 'mysql-cdc',
    31. 'hostname' = '127.0.0.1', -- 如果想使用 host,可以修改宿主机 /etc/hosts 加入 127.0.0.1 mysql.docker.internal
    32. 'port' = '3306',
    33. 'username' = 'flink',
    34. 'password' = 'flink',
    35. 'database-name' = 'flink',
    36. 'table-name' = 'lineitem'
    37. );
    38. -- DWD table schema
    39. -- 以 `l_shipdate` 为业务日期,创建以 `l_year` + `l_month` 的二级分区表,注意所有 partition key 都需要声明在 primary key 中
    40. CREATE TABLE IF NOT EXISTS `dwd_lineitem` (
    41. `l_orderkey` INT NOT NULL,
    42. `l_partkey` INT NOT NULL,
    43. `l_suppkey` INT NOT NULL,
    44. `l_linenumber` INT NOT NULL,
    45. `l_quantity` DECIMAL(15, 2) NOT NULL,
    46. `l_extendedprice` DECIMAL(15, 2) NOT NULL,
    47. `l_discount` DECIMAL(15, 2) NOT NULL,
    48. `l_tax` DECIMAL(15, 2) NOT NULL,
    49. `l_returnflag` CHAR(1) NOT NULL,
    50. `l_linestatus` CHAR(1) NOT NULL,
    51. `l_shipdate` DATE NOT NULL,
    52. `l_commitdate` DATE NOT NULL,
    53. `l_receiptdate` DATE NOT NULL,
    54. `l_shipinstruct` CHAR(25) NOT NULL,
    55. `l_shipmode` CHAR(10) NOT NULL,
    56. `l_comment` VARCHAR(44) NOT NULL,
    57. `l_year` BIGINT NOT NULL,
    58. `l_month` BIGINT NOT NULL,
    59. PRIMARY KEY (`l_orderkey`, `l_linenumber`, `l_year`, `l_month`) NOT ENFORCED
    60. ) PARTITIONED BY (`l_year`, `l_month`) WITH (
    61. -- 每个 partition 下设置 2 个 bucket
    62. 'bucket' = '2',
    63. -- 设置 changelog-producer 为 'input',这会使得上游 CDC Source 不丢弃 update_before,并且下游消费 dwd_lineitem 时没有 changelog-normalize 节点
    64. 'changelog-producer' = 'input'
    65. );
    66. -- ADS table schema
    67. -- 基于 TPC-H Q1,对已发货的订单,根据订单状态和收货状态统计订单数、商品数、总营业额、总利润、平均出厂价、平均折扣价、平均折扣含税价等指标
    68. CREATE TABLE IF NOT EXISTS `ads_pricing_summary` (
    69. `l_returnflag` CHAR(1) NOT NULL,
    70. `l_linestatus` CHAR(1) NOT NULL,
    71. `sum_quantity` DOUBLE NOT NULL,
    72. `sum_base_price` DOUBLE NOT NULL,
    73. `sum_discount_price` DOUBLE NOT NULL,
    74. `sum_charge_vat_inclusive` DOUBLE NOT NULL,
    75. `avg_quantity` DOUBLE NOT NULL,
    76. `avg_base_price` DOUBLE NOT NULL,
    77. `avg_discount` DOUBLE NOT NULL,
    78. `count_order` BIGINT NOT NULL,
    79. PRIMARY KEY (`l_returnflag`, `l_linestatus`) NOT ENFORCED
    80. ) WITH (
    81. 'bucket' = '2',
    82. 'merge-engine'='partial-update',
    83. 'changelog-producer'='full-compaction'
    84. );

    第五步:提交同步任务

    在全量数据导入到 MySQL lineitem 表后,我们启动全量同步作业,这里以结果表作为作业名,方便标识

    任务1:通过 Flink MySQL CDC 同步 ods_lineitemdwd_lineitem

    1. -- 设置作业名
    2. SET 'pipeline.name' = 'dwd_lineitem';
    3. INSERT INTO dwd_lineitem
    4. SELECT
    5. `l_orderkey`,
    6. `l_partkey`,
    7. `l_suppkey`,
    8. `l_linenumber`,
    9. `l_quantity`,
    10. `l_extendedprice`,
    11. `l_discount`,
    12. `l_tax`,
    13. `l_returnflag`,
    14. `l_linestatus`,
    15. `l_shipdate`,
    16. `l_commitdate`,
    17. `l_receiptdate`,
    18. `l_shipinstruct`,
    19. `l_shipmode`,
    20. `l_comment`,
    21. YEAR(`l_shipdate`) AS `l_year`,
    22. MONTH(`l_shipdate`) AS `l_month`
    23. FROM `ods_lineitem`;

    可以在 Flink Web UI 观察全量同步阶段的 rps、checkpoint 等信息,也可以切换到hdfs的 /tmp/table-store-101/default.db/dwd_lineitem 目录下,查看生成的 snpashot、manifest 和 sst 文件。

    第六步:计算聚合指标并查询结果

    在全量同步完成后,我们启动聚合作业,实时写入 ads 表 (注:如有需要在历史全量数据不全的情况下也展示聚合结果,可以不用等待全量同步完成)

    任务2:写入结果表 ads_pricing_summary

    1. -- 设置作业名
    2. SET 'pipeline.name' = 'ads_pricing_summary';
    3. INSERT INTO `ads_pricing_summary`
    4. SELECT
    5. `l_returnflag`,
    6. `l_linestatus`,
    7. SUM(`l_quantity`) AS `sum_quantity`,
    8. SUM(`l_extendedprice`) AS `sum_base_price`,
    9. SUM(`l_extendedprice` * (1-`l_discount`)) AS `sum_discount_price`, -- aka revenue
    10. SUM(`l_extendedprice` * (1-`l_discount`) * (1+`l_tax`)) AS `sum_charge_vat_inclusive`,
    11. AVG(`l_quantity`) AS `avg_quantity`,
    12. AVG(`l_extendedprice`) AS `avg_base_price`,
    13. AVG(`l_discount`) AS `avg_discount`,
    14. COUNT(*) AS `count_order`
    15. FROM `dwd_lineitem`
    16. WHERE (`l_year` < 1998 OR (`l_year` = 1998 AND `l_month`<= 9))
    17. AND `l_shipdate` <= DATE '1998-12-01' - INTERVAL '90' DAY
    18. GROUP BY
    19. `l_returnflag`,
    20. `l_linestatus`;

    我们切换到 batch 模式并且将结果展示切换为 tableau 模式

    1. SET 'execution.runtime-mode' = 'batch';
    2. SET 'sql-client.execution.result-mode' = 'tableau';

    然后查询刚才聚合的结果,可以多运行几次来观测指标的变化 (查询间隔应大于所查上游表的 checkpoint 间隔)

    1. SET 'pipeline.name' = 'Pricing Summary';
    2. SELECT * FROM ads_pricing_summary;

    除了查询聚合指标外,FTS 同时支持查询明细数据。假设我们发现 1998 年 12 月发生退货的子订单指标有问题,想通过订单明细进一步排查,可在 batch 模式下进行如下查询

    SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETURN';

    使用spark查询数据

    下载spark-3.1.2-bin-hadoop2.7

    在jars里面放入paimon-spark-3.1-0.5-SNAPSHOT.jar

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.warehouse=hdfs://tmp/table-store-101
    1. USE paimon.default;
    2. select count(*) from dwd_lineitem;
    3. SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETURN';

     

    第七步:观测更新数据

    生成更新数据

    编写脚本

    1. start_pair=1
    2. total_pair=10
    3. MYSQL_DATABASE=gmall
    4. for i in `seq ${start_pair} ${total_pair}`; do
    5. if [[ `expr ${i} % 10` -eq 0 ]]; then
    6. echo "$(date +"%Y-%m-%d %H:%M:%S") Start to apply Old Sales Refresh Function (RF2) for pair ${i}"
    7. fi
    8. # This refresh function removes old sales information from the database.
    9. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE delete_lineitem"
    10. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e "
    11. LOAD DATA LOCAL INFILE 'delete.${i}' INTO TABLE delete_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\n';"
    12. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "
    13. BEGIN;
    14. DELETE FROM lineitem WHERE l_orderkey IN (SELECT l_orderkey FROM delete_lineitem);
    15. COMMIT;"
    16. sleep 20s
    17. if [[ `expr ${i} % 10` -eq 0 ]]; then
    18. echo "$(date +"%Y-%m-%d %H:%M:%S") Start to apply New Sales Refresh Function (RF1) for pair ${i}"
    19. fi
    20. # This refresh function adds new sales information to the database.
    21. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE update_lineitem"
    22. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e "
    23. LOAD DATA LOCAL INFILE 'lineitem.tbl.u${i}' INTO TABLE update_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\n';"
    24. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "
    25. BEGIN;
    26. INSERT INTO lineitem
    27. SELECT * FROM update_lineitem;
    28. COMMIT;"
    29. done
    30. start_pair=`expr ${total_pair} + 1`
    31. total_pair=`expr ${total_pair} \* 2`
    32. sleep 2m

    参考

    基于 Apache Flink Table Store 的全增量一体实时入湖 - 简书

    Apache Flink Table Store 全增量一体 CDC 实时入湖

  • 相关阅读:
    webservice与springboot联合使用
    I/O设备管理
    多测师肖sir_高级讲师_第2个月第4讲python之判断循环语句
    大语言模型(LLM)综述(二):开发大语言模型的公开可用资源
    Typescript模块的导入导出与继承
    centos服务器命令行安装虚拟机并ssh连接
    2G大小的GPU对深度学习的加速效果如何?
    文件IO(Linux)
    刷题——Python篇(3)字符串
    中软国际:战略携手三大伙伴,三线出击收割AI红利
  • 原文地址:https://blog.csdn.net/xiaosannimei/article/details/130846255