• Apache Paimon Flink引擎解析


    Paimon 支持 Flink 1.17, 1.16, 1.15 和 1.14,当前 Paimon 提供了两类 Jar 包,一类支持数据读写,另一类支持其它操作(compaction)

    Version	      Type	        Jar
    Flink 1.18	  Bundled Jar	  paimon-flink-1.18-0.7.0-incubating.jar
    Flink 1.17	  Bundled Jar	  paimon-flink-1.17-0.7.0-incubating.jar
    Flink 1.16	  Bundled Jar	  paimon-flink-1.16-0.7.0-incubating.jar
    Flink 1.15	  Bundled Jar	  paimon-flink-1.15-0.7.0-incubating.jar
    Flink 1.14	  Bundled Jar	  paimon-flink-1.14-0.7.0-incubating.jar
    Flink Action	Action Jar	  paimon-flink-action-0.7.0-incubating.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1.环境准备

    下载 Flink 后解压

    tar -xzf flink-*.tgz
    
    • 1

    拷贝 Paimon bundled jar 包到 Flink 的 lib 目录下

    cp paimon-flink-*.jar /lib/
    
    • 1

    拷贝 Hadoop Bundled Jar 包到 Flink 的 lib 目录下

    cp flink-shaded-hadoop-2-uber-*.jar /lib/
    
    • 1

    为同时运行多个Flink作业,修改/conf/flink-conf.yaml中的集群配置

    taskmanager.numberOfTaskSlots: 2
    
    • 1

    本地启动 Flink 集群

    /bin/start-cluster.sh
    
    • 1

    验证 Web UI 查看集群是否已启动并运行

    localhost:8081
    
    • 1

    启动Flink SQL客户端来执行SQL脚本

    /bin/sql-client.sh
    
    • 1
    2.创建 Paimon Catalog 和 Table

    创建 Catalog 和 Table

    -- if you're trying out Paimon in a distributed environment,
    -- the warehouse path should be set to a shared file system, such as HDFS or OSS
    CREATE CATALOG my_catalog WITH (
        'type'='paimon',
        'warehouse'='file:/tmp/paimon'
    );
    
    USE CATALOG my_catalog;
    
    -- create a word count table
    CREATE TABLE word_count (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    3.使用Flink通用的Catalog创建Table

    使用Flink通用的Catalog,需要使用Hive metastore,然后可以使用Paimon、Hive和Flink通用表(Kafka和其他表)中的所有表。

    在此模式下,应该使用"connector"选项来创建tables。

    Paimon将在hive-site.xml中使用hive.metastore.warehouse.dir,需要使用带有scheme的path,例如,hdfs://....否则,Paimon将使用本地路径。

    CREATE CATALOG my_catalog WITH (
        'type'='paimon-generic',
        'hive-conf-dir'='...',
        'hadoop-conf-dir'='...'
    );
    
    USE CATALOG my_catalog;
    
    -- create a word count table
    CREATE TABLE word_count (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    ) WITH (
        'connector'='paimon'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    4.创建 Source 数据表
    -- create a word data generator table
    CREATE TEMPORARY TABLE word_table (
        word STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.word.length' = '1'
    );
    
    -- paimon requires checkpoint interval in streaming mode
    SET 'execution.checkpointing.interval' = '10 s';
    
    -- write streaming data to dynamic table
    INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    5.数据查询案例

    OLAP查询

    -- use tableau result mode
    SET 'sql-client.execution.result-mode' = 'tableau';
    
    -- switch to batch mode
    RESET 'execution.checkpointing.interval';
    SET 'execution.runtime-mode' = 'batch';
    
    -- olap query the table
    SELECT * FROM word_count;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    流式查询

    -- switch to streaming mode
    SET 'execution.runtime-mode' = 'streaming';
    
    -- track the changes of table and calculate the count interval statistics
    SELECT `interval`, COUNT(*) AS interval_cnt FROM
        (SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    6.退出Flink SQL客户端,停止Flink集群

    退出Flink SQL客户端

    -- uncomment the following line if you want to drop the dynamic table and clear the files
    -- DROP TABLE word_count;
    
    -- exit sql-client
    EXIT;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    停止Flink集群

    ./bin/stop-cluster.sh
    
    • 1
    7.触发 Savepoint 和 recover

    由于Paimon有自己的snapshot管理,可能与Flink的checkpoint管理相冲突,在从savepoint恢复时会导致异常(不会导致存储损坏)。

    建议使用以下方法开启savepoint

    使用Stop with savepoint。
    使用 Tag with savepoint,并在从savepoint恢复之前rollback-to-tag。

    8.使用Action Jar

    Flink本地集群启动后,使用以下命令执行 action jar

    /bin/flink run \
     /path/to/paimon-flink-action-0.7.0-incubating.jar \
     
     
    
    • 1
    • 2
    • 3
    • 4

    compact 一张 table

    /bin/flink run \
     /path/to/paimon-flink-action-0.7.0-incubating.jar \
     compact \
     --path 
    
    • 1
    • 2
    • 3
    • 4
    9.支持的Flink数据类型

    支持所有Flink数据类型,除了

    • MULTISET不受支持。
    • MAP不支持作为主键。
    10.使用Flink Managed Memory

    Paimon 任务可以创建 memory pools,基于Flink executor管理的 executor memory , 像Flink任务管理的 managed memory。

    通过 executor 管理的多个任务的 writer buffers 可以提升 sinks 的稳定性和性能。

    使用 Flink managed memory 的配置如下:

    OptionDefaultDescription
    sink.use-managed-memory-allocatorfalseIf true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator, which means each task allocates and manages its own memory pool (heap memory), if there are too many tasks in one Executor, it may cause performance issues and even OOM.
    sink.managed.writer-buffer-memory256MWeight of writer buffer in managed memory, Flink will compute the memory size, for writer according to the weight, the actual memory used depends on the running environment. Now the memory size defined in this property are equals to the exact memory allocated to write buffer in runtime.

    在SQL中为Flink Managed Memory设置内存权重,然后Flink sink operator将获得memory pool大小,并为Paimon writer创建allocator。

    INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
    SELECT * FROM ....;
    
    • 1
    • 2
    11.Setting dynamic options

    与Paimon表交互时,可以在不更改catalog options的情况下调整table options。

    Paimon将获取job-level的dynamic options,并在current session中生效,dynamic options的格式是:

    paimon.${catalogName}.${dbName}.${tableName}.${config_key}
    
    catalogName/dbName/tableName 可以是 *
    
    • 1
    • 2
    • 3

    例如:

    -- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
    SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
    SELECT * FROM T;
    
    -- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
    SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
    SELECT * FROM T;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    12.Procedures

    Flink 1.18及以上版本支持Call Statements,通过编写SQL来操作Paimon表的数据和元数据。

    注意:当 call 一个 procedure 时,必须按顺序传递参数,如果不想传递某些参数,必须使用 ‘’ 作为占位符。

    例如,用并行度为4的任务压缩表default.t,但不想指定分区和排序策略,调用语句应该是
    CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')

    指定分区:使用字符串来表示partition filter,“,“表示"AND”,”;"表示“OR”。

    例如,指定两个分区date=01或date=02,需要写’date=01;date=02’;如果指定一个带有date=01和day=01的分区,需要写’date=01,day=01’。

    table options 语法:使用字符串来表示table options,格式是’key1=value1,key2=value2…'。

    Procedure NameUsageExplainationExample
    compactCALL [catalog.]sys.compact(‘identifier’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’, ‘order_strategy’, ‘order_columns’, ‘table_options’)TO compact a table. Arguments:identifier: the target table identifier. Cannot be empty.partitions: partition filter.order_strategy: ‘order’ or ‘zorder’ or ‘none’. Left empty for ‘none’.order_columns: the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’.table_options: additional dynamic options of the table.CALL sys.compact(‘default.T’, ‘p=0’, ‘zorder’, ‘a,b’, ‘sink.parallelism=4’)
    compact_databaseCALL [catalog.]sys.compact_database() CALL [catalog.]sys.compact_database(‘includingDatabases’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’)To compact databases. Arguments:includingDatabases: to specify databases. You can use regular expression.mode: compact mode. “divided”: start a sink for each table, detecting the new table requires restarting the job; “combined” (default): start a single combined sink for all tables, the new table will be automatically detected.includingTables: to specify tables. You can use regular expression.excludingTables: to specify tables that are not compacted. You can use regular expression.tableOptions: additional dynamic options of the table.CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table_.*’, ‘ignore’, ‘sink.parallelism=4’)
    create_tagCALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId)To create a tag based on given snapshot. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the new tag.snapshotId (Long): id of the snapshot which the new tag is based on.CALL sys.create_tag(‘default.T’, ‘my_tag’, 10)
    delete_tagCALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’)To delete a tag. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the tag to be deleted.CALL sys.delete_tag(‘default.T’, ‘my_tag’)
    merge_into– when matched then upsert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’) – when matched then upsert; when not matched then insert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’) – when matched then delete CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedDeleteCondition’) – when matched then upsert + delete; – when not matched then insert CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’, ‘matchedDeleteCondition’)To perform “MERGE INTO” syntax. See merge_into action for details of arguments.– for matched order rows, – increase the price, – and if there is no match, – insert the order from – the source table CALL sys.merge_into(‘default.T’, ‘’, ‘’, ‘default.S’, ‘T.id=S.order_id’, ‘’, ‘price=T.price+20’, ‘’, ‘*’)
    remove_orphan_filesCALL [catalog.]sys.remove_orphan_files(‘identifier’) CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’)To remove the orphan data files and metadata files. Arguments:identifier: the target table identifier. Cannot be empty.olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’)
    reset_consumer– reset the new next snapshot id in the consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId) – delete consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’)To reset or delete consumer. Arguments:identifier: the target table identifier. Cannot be empty.consumerId: consumer to be reset or deleted.nextSnapshotId (Long): the new next snapshot id of the consumer.CALL sys.reset_consumer(‘default.T’, ‘myid’, 10)
    rollback_to– rollback to a snapshot CALL sys.rollback_to(‘identifier’, snapshotId) – rollback to a tag CALL sys.rollback_to(‘identifier’, ‘tagName’)To rollback to a specific version of target table. Argument:identifier: the target table identifier. Cannot be empty.snapshotId (Long): id of the snapshot that will roll back to.tagName: name of the tag that will roll back to.CALL sys.rollback_to(‘default.T’, 10)
    expire_snapshots– expires snapshot CALL sys.expire_snapshots(‘identifier’, retainMax)To expire snapshots. Argument:identifier: the target table identifier. Cannot be empty.retainMax: the maximum number of completed snapshots to retain.CALL sys.expire_snapshots(‘default.T’, 2)
  • 相关阅读:
    刚刚!ACL 2024公布7篇最佳论文,华中科技大学本科生一作成果获奖
    【论文合集】2022年12月医学影像期刊论文合集
    干货 | 5719个字详解低代码在某银行&券商的实践
    【Node.js实战】构建商品管理系统:从前端到后端的全栈开发实践
    【C++】万字详解IO流(输入输出流+文件流+字符串流)
    使用Idea新建Play项目sbt构建失败,提示scala-xml依赖冲突
    注入之SQLMAP(工具注入)
    文件拷贝【 使用字节流完成文件的复制(支持一切文件类型的复制)】
    虹科分享 | 软件供应链攻击如何工作?如何评估软件供应链安全?
    前端入门学习笔记五十
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/136370154