• Flink Hive Catalog操作案例


    在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。

    基础环境

    Hive-3.1.3
    Flink-1.17.1

    基本操作与准备

    1、上传依赖jar包到flink/lib目录下

    cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
    cp mysql-connector-j-8.1.0.jar
    
    • 1
    • 2

    2、更换planner依赖(Hive集成的推荐设置)

    mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
    mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/
    
    • 1
    • 2

    3、启动Hive MetaStore

    nohup hive --service metastore 2>&1 &
    
    • 1

    4、启动flink集群和sql-client

    yarn-session.sh -d -nm flink-cluster
    sql-client.sh embedded -s yarn-session
    
    • 1
    • 2

    5、在flink sql-client中创建hive catalog

    CREATE CATALOG hive WITH (
        'type' = 'hive',
        'default-database' = 'sty',
        'hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5

    分区表读写

    1、Hive中建表并插入数据

    create table behavior(
    username string,
    behavior string
    );
    insert into behavior values('lisi','buy'),('zhangsan','read');
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、使用hive catalog

    use catalog hive;
    
    • 1

    2、flink sql-client中执行数据插入与数据查询(和常规sql一致)

    insert into behavior values('sisi','buy'),('tracy','read');
    select *from behavior;
    
    • 1
    • 2

    在这里插入图片描述

    分区表读写

    这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092

    写入到hive分区表
    streamEnv需要开启checkpoint,保证flink写入hive分区表的写入一致性
    hive表ddl中需要指定以下TBLPROPERTIES:
    sink.partition-commit.trigger:分区提交触发器,单选,可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交,当watermark + delay大于等于分区上的时间时就会提交该分区元数据;process-time的话根据当前系统处理时间来判断分区是否需要提交,当系统处理时间大于等于分区上的时间就会提交该分区元数据
    partition.time-extractor.timestamp-pattern:使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式,需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd,则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒,可以选择多个表字段组合。当表字段无法抽取出符合的格式时,则使用自定义提取器partition.time-extractor.class。
    sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间,使用partition-time触发器时可以使用,默认为0s
    sink.partition-commit.policy.kind:分区提交方式,多选,可选值为metastore、success-file、custom,metastore表示写入元数据库,success-file表示往hdfs分区目录写入一个标志文件,custom表示使用自定义提交方式,通常使用metastore,success-file组合
    partition.time-extractor.kind:当要使用自定义分区时间提取器时需要配置此项,值配置为custom
    partition.time-extractor.class:当要使用自定义分区时间提取器时需要配置此项,值配置为自定义提取器的类路径。在集群中运行时,需要把该类打成jar包放到flink lib目录下。
    某个分区触发提交后,后续再有此分区的数据进来,仍然会写入hive该分区。
    作者:spongebobZ
    链接:https://www.jianshu.com/p/295066a24092
    来源:简书

    1、hive创建分区表并插入数据

    create table userinfo(
    name string,
    age int
    )
    partitioned by (dt string)
    stored as orc
    tblproperties(
        'sink.partition-commit.trigger' = 'partition-time',
        'sink.partition-commit.policy.kind'='metastore,success-file',
        'partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss',
        'sink.partition-commit.delay' = '10'
    );
    
    insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
    insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:

    Could not execute SQL statement. Reason:
    org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind
    
    • 1
    • 2

    2、flink sql-client插入与查询数据

    insert into  userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
    select *from userinfo;
    
    • 1
    • 2

    在这里插入图片描述

  • 相关阅读:
    Halo-Theme-Hao文档:如何设置导航栏?
    Web3:价值投资的范式转移
    MySQL介绍与安装(超详细)
    【容器化】Kubernetes(k8s)
    网络协议:透彻解析HTTP协议
    社区分享|中南民族大学基于JumpServer构建规范、便利的运维安全体系
    SSL单向认证原理
    【WSL2】CENTOS7 安装与配置
    高NA傅里叶显微镜单分子成像
    vue3学习笔记
  • 原文地址:https://blog.csdn.net/qq_34901049/article/details/134060047