• flink sqlClient提交hiveIceberg


    环境准备

    组件名版本
    flink客户端1.14.4-2.12
    hadoop集群3.1.4
    hive客户端3.1.2
    icebergiceberg-flink-runtime-1.14-0.13.2.jar
    iceberg-hive依赖iceberg-hive-runtime-0.13.2.jar

    sqlclient启动前准备

    sqlclient启动有两种方式,per-job、session。
    session模式需先启动一个session,启动方式如下:

    /home/hadoop/flink/bin/yarn-session.sh \
    -t /home/hadoop/flink/sqlplugins \
    -s 2 -jm 5120 -tm 5120 -qu default -nm iceberg_test1 -d
    
    • 1
    • 2
    • 3

    per-job模式需在flink客户端的flink-conf.yaml文件中添加如下参数:
    execution.target: yarn-per-job
    注意:

    flink-conf.yaml文件中还设置了其他内容如下
    classloader.resolve-order: parent-first
    
    classloader.check-leaked-classloader: false
    
    #kerberos相关配置
    security.kerberos.login.use-ticket-cache: true
    security.kerberos.login.keytab: /bigdata/apps/test/core.keytab
    security.kerberos.login.principal: hadoop
    security.kerberos.login.contexts: Client
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    启动sqlclient

    -- yarn session模式
    /home/hadoop/flink/bin/sql-client.sh  embedded \
    -s appId \
    -l /home/hadoop/flink/sqlplugins \
    -i /home/hadoop/flink/script/init.sql \
    -f /home/hadoop/flink/script/insert.sql \
    shell
    
    -- yarn per-job模式
    /home/hadoop/flink/bin/sql-client.sh  embedded \
    -l /home/hadoop/flink/sqlplugins \
    -i /home/hadoop/flink/script/init.sql \
    -f /home/hadoop/flink/script/insert.sql \
    shell
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    init.sql

    set 'sql-client.verbose'='true';
    SET 'execution.checkpointing.interval' = '60s';
    
    CREATE CATALOG ice_catalog WITH (
      'type' = 'iceberg',
      'catalog-type' = 'hive',
      'uri' = 'thrift://hdp02.bonc.com:9083',
      'warehouse' = 'hdfs://beh001/tmp/',
      'hive-conf-dir' = '/home/hadoop/flink/confdir',
      'hadoop-conf-dir' = '/home/hadoop/flink/confdir'
    );
    
    CREATE DATABASE IF NOT EXISTS ice_catalog.ice_db;
    
    CREATE TABLE IF NOT EXISTS ice_catalog.ice_db.ice_tb (
       deal_date string,
       chnl_id string,
       chnl_name string,
       region_code string,
       city_code string,
       chnl_third_class string,
       chnl_second_class string,
       chnl_first_class string,
       chnl_area_class string,
       chnl_eff_flag string,
       oper_id string,
       oper_name string,
       self_term_code string,
       air_term_code string,
       oper_eff_flag string,
       item_cls_type string,
       item_cls_desc string,
       item_grp_type string,
       item_grp_desc string,
       user_chnl_id string,
       user_chnl_name string,
       user_region_code string,
       user_city_code string,
       item_value1 decimal(14,2),
       item_value2 decimal(14,2),
      PRIMARY KEY (chnl_id ,oper_id) NOT ENFORCED
    ) WITH (
      'write.upsert.enabled' = 'true',
      'write.metadata.previous-versions-max' = '10',
      'write.metadata.delete-after-commit.enabled' = 'true',
      'commit.manifest.min-count-to-merge' = '1',
      'engine.hive.enabled' = 'true',
      'table.dynamic-table-options.enabled' = 'true',
      'format-version' = '2'
    );
    
    CREATE TABLE csvSource (
       deal_date string COMMENT '处理日期',               
       chnl_id string COMMENT '渠道ID',                 
       chnl_name string COMMENT '渠道名称',               
       region_code string COMMENT '归属地市代码',           
       city_code string COMMENT '归属区县代码',             
       chnl_third_class string COMMENT '渠道三级类型',      
       chnl_second_class string COMMENT '渠道二级类型',     
       chnl_first_class string COMMENT '渠道一级类型',      
       chnl_area_class string COMMENT '渠道地域属性',       
       chnl_eff_flag string COMMENT '渠道有效标志',         
       oper_id string COMMENT '工号ID',                 
       oper_name string COMMENT '工号姓名',               
       self_term_code string COMMENT '自助终端标志',        
       air_term_code string COMMENT '空中充值标志',         
       oper_eff_flag string COMMENT '工号有效标志',         
       item_cls_type string COMMENT '指标大类代码',         
       item_cls_desc string COMMENT '指标大类名称',         
       item_grp_type string COMMENT '指标细项代码',         
       item_grp_desc string COMMENT '指标细项名称',         
       user_chnl_id string COMMENT '用户渠道ID',          
       user_chnl_name string COMMENT '用户渠道名称',        
       user_region_code string COMMENT '用户归属地市代码',    
       user_city_code string COMMENT '用户归属区县代码',      
       item_value1 decimal(14,2) COMMENT '指标值1',      
       item_value2 decimal(14,2) COMMENT '指标值2'
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'hdfs://beh001/tmp/originData/csvSource.txt',
      'format' = 'csv',
      'csv.field-delimiter' = ','
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    insert.sql

    insert into
      ice_catalog.ice_db.ice_tb
    select
       deal_date  ,               
       chnl_id  ,                 
       chnl_name  ,               
       region_code  ,           
       city_code  ,             
       chnl_third_class  ,      
       chnl_second_class  ,     
       chnl_first_class  ,      
       chnl_area_class  ,       
       chnl_eff_flag  ,         
       oper_id  ,                 
       oper_name  ,               
       self_term_code  ,        
       air_term_code  ,         
       oper_eff_flag  ,         
       item_cls_type  ,         
       item_cls_desc  ,         
       item_grp_type  ,         
       item_grp_desc  ,         
       user_chnl_id  ,          
       user_chnl_name  ,        
       user_region_code  ,    
       user_city_code  ,      
       item_value1,      
       item_value2
    from
      csvSource;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  • 相关阅读:
    每日一练 | 华为认证真题练习Day19
    Netty网络通信之Socket
    会计学期末题库 含WORD版
    《明解C语言》第三版(入门篇),第三章练习答案
    ECMAScript6 Proxy和Reflect 对象操作拦截以及自定义
    Redis(01)| 数据结构
    文件管理:极速复制粘贴,畅享无限次文件管理!
    花了三个月,终于把个人网站写完了
    第一行代码 第十二章 Material Design实战
    基于C语言实现的顾客排位与桌位安排系统
  • 原文地址:https://blog.csdn.net/sxau_zhangtao/article/details/134547500