• 2 快速上手使用Paimon数据湖


    2.1 基于Flink SQL操作Paimon

    在这里我们基于Flink 1.15(ON YARN)、Paimon 0.5版本开发一个案例。

    注意:想要使用Paimon是非常简单的,不需要复杂的安装部署,只需要使用一个jar包即可对它进行操作。

    我们在使用Paimon的时候其实也可以把它简单理解为Hive,这样便于理解。但是我们要知道,他们两个底层其实是不一样的,一个是数据仓库,一个是数据湖。

    目前Paimon主要提供的是SQL层面的API,所以我们在使用Flink操作Paimon的时候需要用到Flink SQL

    还有一点需要注意:Paimon 目前只支持 Flink 1.17、1.16、1.15 和 1.14,低版本的Flink暂时无法使用。

    https://paimon.apache.org/docs/master/engines/flink/
    在这里插入图片描述

    在本案例中,我们使用Flink 1.15版本,同时我们需要使用Flink 1.15版本对应的Paimon jar包。

    注意:Paimon 目前有0.5和 0.6版本,0.5是稳定版本,0.6属于正在开发中的版本,目前建议大家使用0.5版本。

    0.5稳定版本下载地址如下:

    https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.15/0.5.0-incubating/paimon-flink-1.15-0.5.0-incubating.jar
    
    • 1

    使用Flink SQL操作Paimon的时候,可以在Flink SQL代码中操作,也可以在sql-client.sh中操作。

    2.1.1 在Flink sql-client.sh中操作Paimon

    下面我们首先来看一下如何在sql-client.sh中操作Paimon。

    (1)将这个Paimon jar包下载下来之后,上传到flink客户端节点中flink的lib目录里面。

    [root@bigdata04 ~]# cd /data/soft/flink-1.15.0/lib/
    [root@bigdata04 lib]# ll paimon-flink-1.15-0.5.0-incubating.jar 
    -rw-r--r--. 1 root root 26756622 May 22  2023 paimon-flink-1.15-0.5.0-incubating.jar
    
    • 1
    • 2
    • 3

    (2)确认这个Flink客户端节点中是否有Hadoop的相关环境,有没有配置HADOOP_CLASSPATH环境变量。
    在工作中,基本上Flink客户端节点上面也会有Hadoop的相关环境,HADOOP_CLASSPATH我们之前也配置过了。
    所以这一步就不需要额外做什么操作了。
    (3)启动Hadoop集群。
    因为我们要使用Flink ON YARN模式,所以需要启动Hadoop集群。

    [root@bigdata01 ~]# cd /data/soft/hadoop-3.2.0/
    [root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh 
    
    • 1
    • 2

    (4)启动sql-client客户端
    由于使用的是Flink ON YARN模式,所以需要先使用yarn-session.sh脚本在YARN上启动一个Flink集群。

    [root@bigdata04 flink-1.15.0]# bin/yarn-session-1-15.sh -jm 1024m -tm 1024m -d
    
    • 1

    启动sql-client

    [root@bigdata04 flink-1.15.0]# bin/sql-client-1-15.sh 
    2028-12-14 17:19:02,842 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
    
    • 1
    • 2

    Flink SQL>

    (5)创建Paimon类型的Catalog

    Flink SQL> CREATE CATALOG paimon_catalog WITH (
        'type'='paimon',
        'warehouse'='hdfs://bigdata01:9000/paimon'
    );
    Flink SQL> USE CATALOG paimon_catalog;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意:Paimon中的Catalog也可以支持多种管理元数据的方式,目前我们使用的是默认的filesystem这种Metastore,也就是说Paimon的元数据目前会存储到我们在warehouse中指定的hdfs路径中。

    除了这种Metastore之外,Paimon中的Catalog还可以支持Hive Metastore,也就是Paimon共用Hive的Metastore,这块内容后面我们再详细讲解。

    此时到HDFS中查看一下,可以看到在/paimon目录下会自动创建default.dbdefault.db相当于是一个默认的数据库了。

    [root@bigdata04 ~]# hdfs dfs -ls /paimon
    Found 1 items
    drwxr-xr-x   - root supergroup          0 2028-11-07 10:35 /paimon/default.db
    
    • 1
    • 2
    • 3

    (6)创建表。
    首先创建一个数据源表,这个表负责模拟产生实时数据。

    Flink SQL> CREATE TABLE word_source (
        word STRING
    ) WITH (
        'connector' = 'datagen',
    'fields.word.length' = '1',
    'rows-per-second' = '1'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    此时会看到如下错误信息:

    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.catalog.exceptions.CatalogException: Paimon Catalog only supports paimon tables , and you don't need to specify  'connector'= 'paimon' when using Paimon Catalog
     You can create TEMPORARY table instead if you want to create the table of other connector.
    
    • 1
    • 2
    • 3

    解释:此时创建的word_source表不是Paimon类型的表,但是却放在了Paimon类型的Catalog里面,所以就报错了。
    当我们在Paimon类型的Catalog里面创建表的时候,表默认会使用'connector'= 'paimon',可以省略不写。

    针对这个问题,有两种解决方案:

    • 1:不在Paimon类型的Catalog里面创建这个表
    • 2:在建表语句中增加TEMPORARY关键字来创建一个临时表,这样在建表的时候可以指定其他类型的connector。
    Flink SQL> CREATE TEMPORARY TABLE word_source (
        word STRING
    ) WITH (
        'connector' = 'datagen',
    'fields.word.length' = '1',
    'rows-per-second' = '1'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    然后创建一个结果表,这个表负责存储结果数据。

    Flink SQL> CREATE TABLE wc_sink (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    );
    
    • 1
    • 2
    • 3
    • 4

    注意:此时创建这个表的时候不需要在WITH里面指定'connector',因为我们在Paimon Catalog里面创建的表默认都是paimon类型的表。

    此时可以到HDFS中查看到这个表对应的hdfs目录。

    [root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db
    Found 1 items
    drwxr-xr-x   - root supergroup          0 2028-11-07 10:47 /paimon/default.db/wc_sink
    
    • 1
    • 2
    • 3

    wc_sink目录下面会有一个schema目录,里面维护的是表的schema信息。

    [root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink/schema
    Found 1 items
    -rw-r--r--   2 root supergroup        265 2028-11-07 10:47 /paimon/default.db/wc_sink/schema/schema-0
    
    • 1
    • 2
    • 3

    咱们前面说了,目前这个paimon类型的catalog使用的metastore是默认的filesystem,所以表的元数据信息会存储在我们指定的hdfs路径里面。

    问题:为什么刚才创建的word_source表的元数据信息没有存储在这里呢?

    答案:因为word_source表是TEMPORARY(临时)类型的表。

    解释:这里面的schema-0表示是这个表的第1个schema,因为表的schema的信息可能会发生变化,所以后期可能会有schema-1schema-2等等。

    查看schema-0中的详细内容:

    [root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/wc_sink/schema/schema-0
    {
      "id" : 0,
      "fields" : [ {
        "id" : 0,
        "name" : "word",
        "type" : "STRING NOT NULL"
      }, {
        "id" : 1,
        "name" : "cnt",
        "type" : "BIGINT"
      } ],
      "highestFieldId" : 1,
      "partitionKeys" : [ ],
      "primaryKeys" : [ "word" ],
      "options" : { }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    解释:

    • id:对应的就是schema文件的编号。
    • fields:对应的是表中的字段列表,以json数组形式存储,里面包含了id、name、type,分别表示字段的位置编号,字段名称,字段类型。
    • highestFieldId:最大的字段位置编号。
    • partitionKeys:表中的分区字段。
    • primaryKeys:表中的主键字段。
    • options:表的扩展配置。

    (7)执行计算逻辑,向结果表中写入数据。

    Flink SQL> SET 'execution.checkpointing.interval' = '10 s';
    Flink SQL> INSERT INTO wc_sink SELECT word, COUNT(*) FROM word_source GROUP BY word;
    
    • 1
    • 2

    注意:在流处理模式中,操作Paimon表时需要开启Checkpoint

    此时可以到HDFS中查看一下:

    [root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink
    Found 4 items
    drwxr-xr-x   - root supergroup          0 2028-11-07 11:35 /paimon/default.db/wc_sink/bucket-0
    drwxr-xr-x   - root supergroup          0 2028-11-07 11:35 /paimon/default.db/wc_sink/manifest
    drwxr-xr-x   - root supergroup          0 2028-11-07 10:47 /paimon/default.db/wc_sink/schema
    drwxr-xr-x   - root supergroup          0 2028-11-07 11:35 /paimon/default.db/wc_sink/snapshot
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里可以看到snapshot、manifest、bucket-0等信息,这些其实就是Paimon中最核心的东西了。这些文件后续会有一个独立章节详细分析,在这大家先有一个大致的概念即可。

    (8)OLAP查询。
    OLAP查询其实就是离线查询了。

    Flink SQL> -- 设置结果数据显示格式
    Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
    
    Flink SQL> -- 切换到批处理模式
    Flink SQL> RESET 'execution.checkpointing.interval';
    Flink SQL> SET 'execution.runtime-mode' = 'batch';
    
    Flink SQL> -- 执行OLAP查询
    Flink SQL> SELECT * FROM wc_sink;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    结果如下:

    +------+--------+
    | word |    cnt |
    +------+--------+
    |    0 | 594057 |
    |    1 | 594887 |
    |    2 | 594064 |
    |    3 | 594812 |
    |    4 | 595013 |
    |    5 | 594375 |
    |    6 | 594052 |
    |    7 | 593309 |
    |    8 | 594334 |
    |    9 | 594878 |
    |    a | 596356 |
    |    b | 593656 |
    |    c | 592675 |
    |    d | 595513 |
    |    e | 594268 |
    |    f | 593751 |
    +------+--------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    注意:在这里多次执行这个SQL语句,可以发现结果是不一样的,因为表中的结果数据是一直在变化的,每次执行查询的时候都会读取最新快照中的数据。

    (9)流式查询。

    Flink SQL> -- 切换到流处理模式
    Flink SQL> SET 'execution.runtime-mode' = 'streaming';
    
    Flink SQL> -- 执行流式查询
    Flink SQL> SELECT * FROM wc_sink;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    结果如下:

    +----+--------------------------------+----------------------+
    | op |                           word |                  cnt |
    +----+--------------------------------+----------------------+
    | +I |                              0 |               637764 |
    | +I |                              1 |               638705 |
    | +I |                              2 |               637742 |
    | +I |                              3 |               638773 |
    | +I |                              4 |               638829 |
    | +I |                              5 |               638090 |
    | +I |                              6 |               637866 |
    | +I |                              7 |               637241 |
    | +I |                              8 |               638128 |
    | +I |                              9 |               638221 |
    | +I |                              a |               640101 |
    | +I |                              b |               637347 |
    | +I |                              c |               636275 |
    | +I |                              d |               639562 |
    | +I |                              e |               637851 |
    | +I |                              f |               637505 |
    | -U |                              0 |               637764 |
    | +U |                              0 |               643996 |
    | -U |                              1 |               638705 |
    | +U |                              1 |               644960 |
    | -U |                              2 |               637742 |
    | +U |                              2 |               644017 |
    | -U |                              3 |               638773 |
    | +U |                              3 |               645018 |
    | -U |                              4 |               638829 |
    | +U |                              4 |               645143 |
    | -U |                              5 |               638090 |
    | +U |                              5 |               644230 |
    | -U |                              6 |               637866 |
    | +U |                              6 |               644086 |
    | -U |                              7 |               637241 |
    | +U |                              7 |               643529 |
    | -U |                              8 |               638128 |
    | +U |                              8 |               644379 |
    | -U |                              9 |               638221 |
    | +U |                              9 |               644502 |
    | -U |                              a |               640101 |
    | +U |                              a |               646362 |
    | -U |                              b |               637347 |
    | +U |                              b |               643531 |
    | -U |                              c |               636275 |
    | +U |                              c |               642611 |
    | -U |                              d |               639562 |
    | +U |                              d |               645845 |
    | -U |                              e |               637851 |
    | +U |                              e |               644111 |
    | -U |                              f |               637505 |
    | +U |                              f |               643680 |
    | -U |                              0 |               643996 |
    | +U |                              0 |               650285 |
    | -U |                              1 |               644960 |
    | +U |                              1 |               651196 |
    | -U |                              2 |               644017 |
    | +U |                              2 |               650205 |
    | -U |                              3 |               645018 |
    | +U |                              3 |               651225 |
    | -U |                              4 |               645143 |
    | +U |                              4 |               651405 |
    | -U |                              5 |               644230 |
    | +U |                              5 |               650612 |
    | -U |                              6 |               644086 |
    | +U |                              6 |               650322 |
    | -U |                              7 |               643529 |
    | +U |                              7 |               649680 |
    | -U |                              8 |               644379 |
    | +U |                              8 |               650659 |
    | -U |                              9 |               644502 |
    | +U |                              9 |               650721 |
    | -U |                              a |               646362 |
    | +U |                              a |               652679 |
    | -U |                              b |               643531 |
    | +U |                              b |               649683 |
    | -U |                              c |               642611 |
    | +U |                              c |               648862 |
    | -U |                              d |               645845 |
    | +U |                              d |               652217 |
    | -U |                              e |               644111 |
    | +U |                              e |               650447 |
    | -U |                              f |               643680 |
    | +U |                              f |               649802 |
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    此时可以看到结果数据是一直在发生变化的,因为数据源是一直源源不断在产生数据的。

    在Flink SQL控制台,按ctrl+c停止此流式查询任务。

    (10)停止任务,退出sql-client
    到Flink任务界面中停止Flink核心计算逻辑对应的任务
    在这里插入图片描述

    然后退出sql-client。

    Flink SQL>exit;
    
    • 1

    最后停止YARN中的Flink集群。

    [root@bigdata04 flink-1.15.0]# yarn application -kill application_1857176567822_0001
    
    • 1

    2.1.2 在Flink SQL代码中操作Paimon

    创建一个maven项目:db_paimon

    在项目中创建一个scala目录。

    pom.xml中引入相关依赖:

    
        org.apache.flink
        flink-streaming-scala_2.12
        1.15.0
    
    
        org.apache.flink
        flink-clients
        1.15.0
    
    
    
        org.slf4j
        slf4j-api
        1.7.10
    
    
        org.slf4j
        slf4j-log4j12
        1.7.10
    
    
        org.apache.flink
        flink-table-api-scala-bridge_2.12
        1.15.0
    
    
        org.apache.flink
        flink-table-planner_2.12
        1.15.0
    
    
        org.apache.paimon
        paimon-flink-1.15
        0.5
        system
        ${project.basedir}/lib/paimon-flink-1.15-0.5.0-incubating.jar
    
    
    
        org.apache.hadoop
        hadoop-client
        3.2.0
    
    
        org.apache.flink
        flink-connector-files
        1.15.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    注意:由于目前在maven仓库里面还无法查到paimon的依赖,所以通过本地jar包的形式引入paimon的依赖。

    resource目录中添加log4j.properties日志配置文件:

    log4j.rootLogger=warn,stdout
    
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    首先来看一下如何使用Flink SQL代码向Paimon表中写入数据。

    创建package:tech.xuwei.paimon.sql
    创建object:FlinkSQLWriteToPaimon

    代码如下:

    package tech.xuwei.paimon.sql
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    /**
     * 使用FlinkSQL向Paimon表中写入数据
     * Created by xuwei
     */
    object FlinkSQLWriteToPaimon {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
    
        //注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。
        env.enableCheckpointing(5000)
    
        //创建数据源表-普通表
        //注意:此时这个表是在Flink SQL中默认的Catalog里面创建的
        tEnv.executeSql(
          """
            |CREATE TABLE word_source (
            |    word STRING
            |) WITH (
            |    'connector' = 'datagen',
            |    'fields.word.length' = '1',
            |    'rows-per-second' = '1'
            |)
            |""".stripMargin)
    
        //创建Paimon类型的Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_catalog WITH (
            |    'type'='paimon',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_catalog")
    
        //创建目的地表-Paimon表
        tEnv.executeSql(
          """
            |CREATE TABLE IF NOT EXISTS wc_sink_sql (
            |    word STRING,
            |    cnt BIGINT,
            |    PRIMARY KEY (word) NOT ENFORCED
            |)
            |""".stripMargin)
    
        //向目的地表中写入数据
        tEnv.executeSql(
          """
            |INSERT INTO `paimon_catalog`.`default`.`wc_sink_sql`
            |SELECT
            |    word,
            |    COUNT(*) as cnt
            |FROM `default_catalog`.`default_database`.`word_source`
            |GROUP BY word
            |""".stripMargin).print()
    
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    注意:在这里我们创建表word_source的时候没有创建临时表,因为我们不是在Paimon Catalog里面创建的。

    运行代码,此时可以在hdfs中看到表中的相关文件内容

    [root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/wc_sink_sql
    Found 4 items
    drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/bucket-0
    drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/manifest
    drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:04 /paimon/default.db/wc_sink_sql/schema
    drwxr-xr-x   - yehua supergroup          0 2028-11-28 17:08 /paimon/default.db/wc_sink_sql/snapshot
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    停止代码。

    接下来我们来使用Flink SQL代码从Paimon表中读取数据。
    创建object:FlinkSQLReadFromPaimon

    代码如下:

    package tech.xuwei.paimon.sql
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    /**
     * 使用FlinkSQL从Paimon表中读取数据
     * Created by xuwei
     */
    object FlinkSQLReadFromPaimon {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
    
        //创建Paimon类型的Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_catalog WITH (
            |    'type'='paimon',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_catalog")
    
        //读取Paimon表中的数据,并且打印输出结果
        tEnv.executeSql(
          """
            |SELECT * FROM  `paimon_catalog`.`default`.`wc_sink_sql`
            |""".stripMargin)
          .print()
    
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    运行代码,此时可以看到类似这样的数据:

    +----+--------------------------------+----------------------+
    | op |                           word |                  cnt |
    +----+--------------------------------+----------------------+
    | +I |                              1 |                   26 |
    | +I |                              7 |                   24 |
    | +I |                              e |                   14 |
    | +I |                              2 |                   20 |
    | +I |                              4 |                   23 |
    | +I |                              6 |                   23 |
    | +I |                              c |                   26 |
    | +I |                              0 |                   21 |
    | +I |                              3 |                   28 |
    | +I |                              8 |                   23 |
    | +I |                              5 |                   20 |
    | +I |                              d |                   20 |
    | +I |                              a |                   14 |
    | +I |                              b |                   19 |
    | +I |                              f |                   25 |
    | +I |                              9 |                   18 |
    .....
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    此时再启动FlinkSQLWriteToPaimon代码向Paimon表中写入数据,可以在控制台看到如下数据:

    | -U |                              2 |                   20 |
    | +U |                              2 |                    4 |
    | -U |                              4 |                   23 |
    | +U |                              4 |                    2 |
    | -U |                              6 |                   23 |
    | +U |                              6 |                    1 |
    | -U |                              c |                   26 |
    | +U |                              c |                    3 |
    | -U |                              1 |                   26 |
    | +U |                              1 |                    4 |
    | -U |                              7 |                   24 |
    | +U |                              7 |                    2 |
    | -U |                              e |                   14 |
    | +U |                              e |                    2 |
    | -U |                              a |                   14 |
    | +U |                              a |                    2 |
    | -U |                              b |                   19 |
    | +U |                              b |                    3 |
    | -U |                              5 |                   20 |
    | +U |                              5 |                    3 |
    | -U |                              d |                   20 |
    | +U |                              d |                    1 |
    | -U |                              0 |                   21 |
    | +U |                              0 |                    2 |
    | -U |                              8 |                   23 |
    | +U |                              8 |                    2 |
    | -U |                              9 |                   18 |
    | +U |                              9 |                    1 |
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    这样就可以看到表中数据的实时变更情况。

    基于Flink DataStream API 操作Paimon

    Pamion虽然没有提供DataStream API,但是可以借助于Flink中DataStreamTable的转换来操作Pamion。

    下面我们来具体演示一下:

    首先来看一下如何使用Flink DataStreamAPI向Paimon表中写入数据。

    创建package:tech.xuwei.paimon.datastream
    创建object:FlinkDataStreamWriteToPaimon

    代码如下:

    package tech.xuwei.paimon.datastream
    
    import org.apache.flink.api.common.typeinfo.Types
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api.{DataTypes, Schema}
    import org.apache.flink.table.connector.ChangelogMode
    import org.apache.flink.types.{Row, RowKind}
    
    /**
     * 使用Flink DataStream API向Paimon表中写入数据
     * Created by xuwei
     */
    object FlinkDataStreamWriteToPaimon {
      def main(args: Array[String]): Unit = {
        //获取执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
    
        //手工构造一个Changelog DataStream 数据流
        val dataStream = env.fromElements(
          Row.ofKind(RowKind.INSERT, "jack", Int.box(10)),//+I
          Row.ofKind(RowKind.INSERT, "tom", Int.box(20)),//+I
          Row.ofKind(RowKind.UPDATE_BEFORE, "jack", Int.box(10)),//-U
          Row.ofKind(RowKind.UPDATE_AFTER, "jack", Int.box(11))//+U
        )(Types.ROW_NAMED(Array("name", "age"),Types.STRING,Types.INT))
    
    
        //将DataStream转换为Table
        val schema = Schema.newBuilder()
          .column("name", DataTypes.STRING().notNull())//主键非空
          .column("age", DataTypes.INT())
          .primaryKey("name")//指定主键
          .build()
        val table = tEnv.fromChangelogStream(dataStream,schema,ChangelogMode.all())
    
        //创建Paimon类型的Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_catalog WITH (
            |    'type'='paimon',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_catalog")
    
        //注册临时表
        tEnv.createTemporaryView("t1",table)
    
        //创建Paimon类型的表
        tEnv.executeSql(
          """
            |-- 注意:这里的表名使用反引号进行转义,否则会导致SQL DDL语句解析失败。
            |CREATE TABLE IF NOT EXISTS `user` (
            |    name STRING,
            |    age INT,
            |    PRIMARY KEY (name) NOT ENFORCED
            |) WITH (
            |    'changelog-producer' = 'input'
            |)
            |""".stripMargin)
    
        //向Paimon表中写入数据
        tEnv.executeSql(
          """
            |INSERT INTO `user`
            |SELECT name,age FROM t1
            |""".stripMargin)
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    运行代码,可以将changlog DataStream数据写入到Paimon表中。

    接下来开发一个从Paimon表中读取数据的代码。
    创建object:FlinkDataStreamReadFromPaimon

    代码如下:

    package tech.xuwei.paimon.datastream
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    /**
     * 使用Flink DataStream API从Paimon表中读取数据
     * Created by xuwei
     */
    object FlinkDataStreamReadFromPaimon {
      def main(args: Array[String]): Unit = {
        //获取执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
    
        //创建Paimon类型的Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_catalog WITH (
            |    'type'='paimon',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_catalog")
    
        //将计算结果Table转换为DataStream
        val execSql =
          """
            |SELECT * FROM `user` -- 此时默认只能查到数据的最新值
            | /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
            |""".stripMargin
        val table = tEnv.sqlQuery(execSql)
    
        //将结果数据转为Changlog DataStream数据流
        val dataStream = tEnv.toChangelogStream(table)
    
        //将DataStream中的数据输出打印到控制台
        dataStream.print().setParallelism(1)
    
        //执行任务
        env.execute("FlinkDataStreamReadFromPaimon")
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    运行代码。
    注意:如果没有手工指定数据读取模式,那么最终的结果数据是类似这样的,看不到数据的历史变化,只能看到最新的数据:

    +U[jack, 11]
    +I[tom, 20]
    
    • 1
    • 2

    此时如果想要查看到数据历史的变化情况,需要通过动态表选项来指定数据读取(扫描)模式为from-snapshot

    val execSql =
          """
            |SELECT * FROM `user` -- 此时默认只能查到数据的最新值
            |/*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '1') */ -- 通过动态表选项来指定数据读取(扫描)模式,以及从哪里开始读取
            |""".stripMargin
    
    • 1
    • 2
    • 3
    • 4
    • 5

    运行代码,结果如下:

    +I[jack, 10]
    -U[jack, 10]
    +U[jack, 11]
    +I[tom, 20]
    
    • 1
    • 2
    • 3
    • 4

    这个结果其实和最开始我们构造的changelog datastream数据流是一致的。


    更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html

  • 相关阅读:
    【单片机毕业设计】【mcuclub-hj-003】基于单片机的温湿度控制的设计
    【SpringCloud原理】Ribbon核心组件以及运行原理万字源码剖析
    React报错之Functions are not valid as a React child
    零基础学习CANoe Panel(1)—— 新建 Panel
    零售业变革下,数智化供应链系统精细化库存管理,构建企业数字化供应链体系
    连接数据库
    深度解析:Web 3.0和元宇宙
    COLING 2022 | CSL-大规模中文科学文献数据集
    FileZilla Server1.5使用入门
    常见首屏优化
  • 原文地址:https://blog.csdn.net/xu470438000/article/details/134282851