• 6 Hive引擎集成Apache Paimon


    更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html

    在实际工作中,我们通查会使用Flink计算引擎去读写Paimon,但是在批处理场景中,更多的是使用Hive去读写Paimon,这样操作起来更加方便。

    前面我们在Flink代码里面,借助于Hive Catalog,实现了在Flink中创建Paimon表,写入数据,并且把paimon的元数据信息保存在Hive Metastore里面,这样创建的表是可以被Hive识别并且操作的。

    但是最直接的肯定是在Hive中直接创建Paimon类型的表,并且读写数据。

    Paimon目前可以支持Hive 3.1, 2.3, 2.2, 2.1 and 2.1-cdh-6.3这些版本的操作。

    但是需要注意,如果Hive的执行引擎使用的是Tez,那么只能读取Paimon,无法向Paimon中写入数据。如果Hive的执行引擎使用的是MR,那么读写都是支持的。

    在Hive中配置Paimon依赖

    想要在Hive中操作Paimon,首先需要在Hive中配置Paimon的依赖,此时我们需要用到一个jar包:paimon-hive-connector
    我们目前使用的Hive是3.1.2版本的,所以需要下载对应版本的paimon-hive-connector jar包。

    https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-3.1/0.5.0-incubating/paimon-hive-connector-3.1-0.5.0-incubating.jar
    
    • 1

    将这个jar包上传到bigdata04机器(hive客户端机器)的hive安装目录中:

    [root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
    [root@bigdata04 apache-hive-3.1.2-bin]# mkdir auxlib
    [root@bigdata04 apache-hive-3.1.2-bin]# cd auxlib/
    [root@bigdata04 auxlib]# ll
    total 34128
    -rw-r--r--. 1 root root 34945743 Sep 13  2023 paimon-hive-connector-3.1-0.5.0-incubating.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意:需要在hive安装目录中创建auxlib目录,然后把jar包上传到这个目录中,这样会被自动加载。

    如果我们在操作Hive的时候使用的是beeline客户端,那么在Hive中配置好Paimon的环境之后,需要重启HiveServer2服务。

    在Hive中读写Paimon表

    咱们之前在Flink引擎代码中使用Hive Catalog的时候创建了一个表:p_h_t1,这个表的元数据在Hive Metastore也有存储,之前我们其实也能查看到,只是在hive中查询这个表中数据的时候报错了,其实就是因为缺少paimon-hive-connector这个jar包,现在我们再查询就可以了。

    在这里我们使用Hive的beeline客户端。
    注意:需要先启动HiveServer2服务。

    [root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
    [root@bigdata04 apache-hive-3.1.2-bin]# bin/hiveserver2
    
    • 1
    • 2

    查看hive中目前都有哪些表:

    [root@bigdata04 apache-hive-3.1.2-bin]# bin/beeline -u  jdbc:hive2://localhost:10000 -n root
    Connecting to jdbc:hive2://localhost:10000
    Connected to: Apache Hive (version 3.1.2)
    Driver: Hive JDBC (version 3.1.2)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    Beeline version 3.1.2 by Apache Hive
    0: jdbc:hive2://localhost:10000> SHOW TABLES;
    +--------------------+
    |      tab_name      |
    +--------------------+
    | flink_stu          |
    | orders             |
    | p_h_t1             |
    | p_h_par             |
    | s1                 |
    | student_favors     |
    | student_favors_2   |
    | student_score      |
    | student_score_bak  |
    | t1                 |
    +--------------------+
    9 rows selected (1.663 seconds)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    此时可以看到之前通过Hive Catalog写入的表:p_h_t1

    查询这个表中的数据:

    0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
    +--------------+-------------+
    | p_h_t1.name  | p_h_t1.age  |
    +--------------+-------------+
    | jack         | 18          |
    | tom          | 20          |
    +--------------+-------------+
    2 rows selected (5.853 seconds)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    此时就可以正常查询了。

    接着我们尝试在Hive中向这个Paimon表中插入一条数据:

    0: jdbc:hive2://localhost:10000> INSERT INTO p_h_t1(name,age) VALUES('jessic',19);
    
    • 1

    重新查询这个表中的最新数据:

    0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
    +--------------+-------------+
    | p_h_t1.name  | p_h_t1.age  |
    +--------------+-------------+
    | jack         | 18          |
    | jessic       | 19          |
    | tom          | 20          |
    +--------------+-------------+
    3 rows selected (0.737 seconds)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在通过Hive进行查询的时候,默认查询的是表中最新快照的数据,我们也可以通过时间旅行这个特性来控制查询之前的数据。

    举个例子,查询指定快照版本中的数据:

    0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=1;
    No rows affected (0.011 seconds)
    0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
    +--------------+-------------+
    | p_h_t1.name  | p_h_t1.age  |
    +--------------+-------------+
    | jack         | 18          |
    | tom          | 20          |
    +--------------+-------------+
    2 rows selected (0.752 seconds)
    0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=2;
    No rows affected (0.009 seconds)
    0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
    +--------------+-------------+
    | p_h_t1.name  | p_h_t1.age  |
    +--------------+-------------+
    | jack         | 18          |
    | jessic       | 19          |
    | tom          | 20          |
    +--------------+-------------+
    3 rows selected (0.692 seconds)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这样就可以实现查询历史数据的查询了。

    在Hive中创建Paimon表

    前面我们操作的p_h_t1这个表其实是借助于Flink引擎创建的。
    下面我们来看一下在Hive中如何创建Piamon表:

    0: jdbc:hive2://localhost:10000> SET hive.metastore.warehouse.dir=hdfs://bigdata01:9000/paimon;
    0: jdbc:hive2://localhost:10000> CREATE TABLE IF NOT EXISTS p_h_t2(
      name STRING,
      age INT,
      PRIMARY KEY (name) NOT ENFORCED
    )STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这样表就创建好了,下面我们可以在Hive中测试一下读写数据:

    0: jdbc:hive2://localhost:10000> INSERT INTO p_h_t2(name,age) VALUES('tom',20);
    0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
    Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state=,code=0)
    
    • 1
    • 2
    • 3

    注意:此时查询报错是因为找不到snapshot-2这份快照数据,目前这个表中只添加了一次数据,所以只有snapshot-1

    那为什么会查找snapshot-2呢?

    因为我们前面在这个会话中设置了SET paimon.scan.snapshot-id=2;,这个配置在当前会话有效。

    正常情况下我们在hive中执行SET paimon.scan.snapshot-id=null;其实就可以了:

    0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=null;
    No rows affected (0.008 seconds)
    0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id;
    +-------------------------------+
    |              set              |
    +-------------------------------+
    | paimon.scan.snapshot-id=null  |
    +-------------------------------+
    1 row selected (0.009 seconds)
    0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
    Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state=,code=0)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    但是发现他还是会找snapshot-2

    我们尝试重新开启一个新的会话查询也不行,就算重启hiveserver2也还是不行。

    后来发现这可能是一个bug,当我们在hive会话中设置了paimon.scan.snapshot-id=2,那么之后创建的表默认就只会查询snapshot-2了,那也就意味着建表的时候会把这个参数带过去。

    为了验证这个猜想,我们在flink代码中查询这个Paimon表的详细建表语句,不要在hive命令行中查看(在Hive命令行中看不到详细的参数信息)。

    创建package:tech.xuwei.paimon.hivepaimon
    创建object:FlinkSQLReadFromPaimo

    完整代码如下:

    package tech.xuwei.paimon.cdcingestion
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    /**
     * 使用FlinkSQL从Paimon表中读取数据
     * Created by xuwei
     */
    object FlinkSQLReadFromPaimon {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
    
        //创建Paimon类型的Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_catalog WITH(
            |    'type'='paimon',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_catalog")
    
        //读取Paimon表中的数据,并且打印输出结果
        tEnv.executeSql(
          """
            |SHOW CREATE TABLE p_h_t2;
            |""".stripMargin)
          .print()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在idea中执行代码,查看结果:

    CREATE TABLE `paimon_catalog`.`default`.`p_h_t2` (
      `name` VARCHAR(2147483647),
      `age` INT
    ) WITH (
      'path' = 'hdfs://bigdata01:9000/paimon/default.db/p_h_t2',
      'totalSize' = '0',
      'numRows' = '0',
      'rawDataSize' = '0',
      'scan.snapshot-id' = '2',
      'COLUMN_STATS_ACCURATE' = '{"BASIC_STATS":"true","COLUMN_STATS":{"age":"true","name":"true"}}',
      'numFiles' = '0',
      'bucketing_version' = '2',
      'storage_handler' = 'org.apache.paimon.hive.PaimonStorageHandler'
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里发现建表语句中有一个参数:'scan.snapshot-id' = '2',所以它默认会读取第2个快照。

    想要解决这个问题,有两个办法。

    • 1:在hive中删除这个表,然后执行SET paimon.scan.snapshot-id=null;,再创建这个表就行了。
    • 2:如果不想删除这个表,可以在Flink代码中修改这个表,移除scan.snapshot-id属性即可,这个功能我们之前讲过。

    第一种办法简单粗暴,不再演示,我们来看一下第二种办法:

    创建object:FlinkSQLAlterPaimonTable

    完整代码如下:

    package tech.xuwei.paimon.hivepaimon
    
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    /**
     * 修改Paimon表属性
     * Created by xuwei
     */
    object FlinkSQLAlterPaimonTable {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
    
        //创建Paimon类型的Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_catalog WITH(
            |    'type'='paimon',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_catalog")
    
        //移除表中的scan.snapshot-id属性
        tEnv.executeSql(
          """
            |ALTER TABLE p_h_t2 RESET ('scan.snapshot-id')
            |""".stripMargin)
    
        //查看最新的表属性信息
        tEnv.executeSql(
          """
            |SHOW CREATE TABLE p_h_t2
            |""".stripMargin)
          .print()
    
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    执行此代码,可以看到如下结果:

    CREATE TABLE `paimon_catalog`.`default`.`p_h_t2` (
      `name` VARCHAR(2147483647),
      `age` INT
    ) WITH (
      'path' = 'hdfs://bigdata01:9000/paimon/default.db/p_h_t2',
      'totalSize' = '0',
      'numRows' = '0',
      'rawDataSize' = '0',
      'COLUMN_STATS_ACCURATE' = '{"BASIC_STATS":"true","COLUMN_STATS":{"age":"true","name":"true"}}',
      'numFiles' = '0',
      'bucketing_version' = '2',
      'storage_handler' = 'org.apache.paimon.hive.PaimonStorageHandler'
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    此时表中就没有scan.snapshot-id属性了。

    这个时候我们再回到hive命令行中查询这个表:

    0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
    +--------------+-------------+
    | p_h_t2.name  | p_h_t2.age  |
    +--------------+-------------+
    | tom          | 20          |
    +--------------+-------------+
    1 row selected (0.46 seconds)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这样就可以正常查询了。

    注意:如果此时我们在hive中删除这个表,那么对应的paimon中这个表也会被删除。

    0: jdbc:hive2://localhost:10000> drop table p_h_t2;
    No rows affected (0.33 seconds)
    
    • 1
    • 2

    到hdfs中确认一下这个paimon表是否存在:

    [root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/p_h_t2
    ls: `/paimon/default.db/p_h_t2': No such file or directory
    
    • 1
    • 2

    这样就说明paimon中这个表不存在了。

    不过在hdfs中会多一个这个目录,这属于一个临时目录,没什么影响,可以手工删除,不处理也没影响。

    [root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/_tmp.p_h_t2
    
    • 1

    针对Paimon中已经存在的表,我们想要在hive中进行访问,应该如何实现呢?

    此时可以借助于Hive的外部表特性来实现。
    相当于是在hive中创建一个外部表,通过location指向paimon表的hdfs路径即可。

    我们使用前面cdc数据采集中创建的Paimon表:cdc_chinese_code,在hive中创建一个外部表映射到这个表:

    0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE p_h_external
    STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
    LOCATION 'hdfs://bigdata01:9000/paimon/default.db/cdc_chinese_code';
    
    • 1
    • 2
    • 3

    然后就可以在hive中查询这个表了:

    0: jdbc:hive2://localhost:10000> select * from p_h_external;
    +------------------+--------------------+
    | p_h_external.id  | p_h_external.name  |
    +------------------+--------------------+
    | 1                | 张三                 |
    | 2                | 李四                 |
    +------------------+--------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    此时如果我们在hive中删除这个外部表,不会影响paimon中的cdc_chinese_code表。

    0: jdbc:hive2://localhost:10000> drop table p_h_external;
    
    • 1

    到hdfs中确认一下,cdc_chinese_code这个paimon表还是存在的:

    [root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/cdc_chinese_code
    Found 4 items
    drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/bucket-0
    drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/manifest
    drwxr-xr-x   - root supergroup          0 2029-02-27 11:26 /paimon/default.db/cdc_chinese_code/schema
    drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/snapshot
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html

  • 相关阅读:
    Unity使用c#开发遇上的问题(十)(unity中使用自带体获得预制体阴影)
    Hadoop系列——HDFS常用和命令,Java API客户端day3-1
    Netty 线程工作机制—— NioEventLoop
    MySQL 性能优化
    微服务框架 SpringCloud微服务架构 10 使用Docker 10.3 容器命令介绍
    LeetCode //C - 38. Count and Say Medium Topics Companies
    springweb+vue前后端分离开发,集成部署
    FPGA优质开源项目 – UDP万兆光纤以太网通信
    同城拼车网约车顺风车小程序公众号/同城顺风车小程序/顺风车小程序/拼车小程序
    Node.js精进(11)——Socket.IO
  • 原文地址:https://blog.csdn.net/xu470438000/article/details/134329889