• hivehook 表血缘与字段血缘的解析


    代码

    import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
    import org.apache.hadoop.hive.ql.hooks.HookContext;
    import org.apache.hadoop.hive.ql.hooks.LineageInfo;
    import org.apache.hadoop.hive.metastore.api.Table;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.*;
    
    public class HiveServerQueryLogHook implements ExecuteWithHookContext {
    	static final Logger LOG = LoggerFactory.getLogger(HiveServerQueryLogHook.class);
    	
        @Override
        public void run(HookContext hookContext) throws Exception {
            printLineageInfo(hookContext);
        }
        private void printLineageInfo(HookContext hookContext) {
            // 输出表
            Set<String> inputTables = new HashSet<>();;
    
            // 输入表
            Set<String> outputTables = new HashSet<>();;
    
            // 字段血缘 Map
            // key为输出字段,value为来源字段数组
            Map<String, ArrayList<String>> fieldLineage = new HashMap<>();
    		
    		// 从 `hookContext` 中获取 `Linfo` 并返回其 entry set,这意味着我们会获取到一个包含键值对的集合;遍历 `hookContext` 中 `Linfo` 的 entry set
            for(Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> dep: hookContext.getLinfo().entrySet()){
                // 表血缘
                // 将 `dep.getKey()` 转换为一个 Optional 对象,以防止空指针异常。
                Optional.ofNullable(dep.getKey())
                		// 如果 `dep.getKey()` 不为空,则将其转换为 `DataContainer` 对象。
                        .map(LineageInfo.DependencyKey::getDataContainer)
                        // 如果 `DataContainer` 不为空,则获取其表信息。
                        .map(LineageInfo.DataContainer::getTable)
                        // 将表信息传递给 `dealOutputTable` 方法进行处理。
                        .map(this::dealOutputTable)
                        // 如果处理后的结果不为空,则将其添加到 `outputTables` 集合中。
                        .ifPresent(outputTables::add);
                Optional.ofNullable(dep.getValue())
                        .map(LineageInfo.Dependency::getBaseCols)
                        .ifPresent(items -> items.stream().map(LineageInfo.BaseColumnInfo::getTabAlias)
                                .map(LineageInfo.TableAliasInfo::getTable)
                                .map(this::dealOutputTable)
                                .forEach(inputTables::add));
    
                // 字段血缘
                // 将 `dep.getKey()` 转换为一个 Optional 对象,以防止空指针异常。
                String column = Optional.ofNullable(dep.getKey())
                		// 如果 `dep.getKey()` 不为空,则将其传递给 `dealDepOutputField` 方法进行处理。
                        .map(this::dealDepOutputField)
                        // 如果处理后的结果不为空,则将其作为键放入 `fieldLineage` 中,并关联一个空的 ArrayList,然后将该键返回给 `column` 变量。
    
                        .map(aimField -> {
                            fieldLineage.put(aimField, new ArrayList<>());
                            return aimField;
                        // 如果处理后的结果为空,则将 `column` 设置为 null。
                        }).orElse(null);
                        // 将 `dep.getValue()` 转换为一个 Optional 对象,以防止空指针异常。
                Optional.ofNullable(dep.getValue())
                		// 如果 `dep.getValue()` 不为空,则获取其基础列信息。
    
                        .map(LineageInfo.Dependency::getBaseCols)
                        // 如果基础列信息不为空,则将其转换为流并依次处理,将处理后的结果添加到 `fieldLineage` 中对应 `column` 的列表中。
                        .ifPresent(items -> items.stream()
                                .map(this::dealBaseOutputField)
                                .forEach(item -> {
                                    fieldLineage.get(column).add(item);
                                }));
            }
            LOG.info("inputTables : {} ",inputTables);
            LOG.info("outputTables : {} ",outputTables);
            LOG.info("fieldLineage : {} ",fieldLineage.toString());
        }
        // 处理表的格式为 库.表
        private String dealOutputTable(Table table) {
            String dbName = table.getDbName();
            String tableName = table.getTableName();
            return dbName != null ? String.format("%s.%s", dbName, tableName) : tableName;
        }
    
        // 处理输出字段的格式
        private String dealDepOutputField(LineageInfo.DependencyKey dependencyKey) {
            try{
                String tableName = dealOutputTable(dependencyKey.getDataContainer().getTable());
                String field = dependencyKey.getFieldSchema().getName();
                return String.format("%s.%s", tableName, field);
            }catch (Exception e) {
                LOG.error("deal dep output field error" + e.getMessage());
                return null;
            }
        }
    
        // 处理来源字段的格式
        private String dealBaseOutputField(LineageInfo.BaseColumnInfo baseColumnInfo) {
            try{
                String tableName = dealOutputTable(baseColumnInfo.getTabAlias().getTable());
                String field = baseColumnInfo.getColumn().getName();
                return String.format("%s.%s", tableName, field);
            }catch (Exception e) {
                LOG.error("deal base output field error" + e.getMessage());
                return null;
            }
        }
    
    

    配置

    编译后生成jar文件添加到hive运行环境,设置hook

    1)jar放置/disk1/hive-jars/hook
    
    2)设置env,conf/hive-env.sh
    pushd /disk1/hive-jars/hook
    export HOOK_DEPS=$(ls *.jar| xargs -Ixx echo "`pwd`/xx" | sort | tr '\n' ':')
    popd
    export HIVE_AUX_JARS_PATH=${xxx%:}:${HOOK_DEPS%:}
    
    3)修改hive-site.xml
      
        hive.exec.post.hooks</name>
        com.xxx.xxx.HiveServerQueryLogHook</value>
        
          Comma-separated list of post-execution hooks to be invoked for each statement.
          A post-execution hook is specified as the name of a Java class which implements the
          org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.
        </description>
      </property>
    
    

    测试

    sql

    use db_dev;
    CREATE TABLE IF NOT EXISTS `all_report_creator`(
    `project_id` INT COMMENT '项目组id',
    `report_id` INT COMMENT '报告id',
    `creator_id` INT COMMENT '报告创建者id',
    `nick` STRING COMMENT 'nick名字'
    ) 
    STORED AS PARQUET;
    insert overwrite table all_report_creator
    select
    t1.project_id,t1.id,t2.id,t2.nick from db.new_report t1 left join db.bigviz_user t2 on t1.creator_id = t2.id where t1.project_id in(7,24)
    

    血缘

    24/07/12 13:34:31 INFO xxx.HiveServerQueryLogHook: inputTables : [db.new_report, db.bigviz_user]
    24/07/12 13:34:31 INFO xxx.HiveServerQueryLogHook: outputTables : [db_dev.all_report_creator]
    24/07/12 13:34:31 INFO xxx.HiveServerQueryLogHook: fieldLineage : {db_dev.all_report_creator.project_id=[db.new_report.project_id], db_dev.all_report_creator.report_id=[db.new_report.id], db_dev.all_report_creator.creator_id=[db.bigviz_user.id], db_dev.all_report_creator.nick=[db.bigviz_user.nick]}
    

    参考

    HIVE源码学习-hivehook尝试表血缘与字段血缘的解析
    http://ganjiacheng.cn/article/2020/article_16_HIVE%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0-hivehook%E5%B0%9D%E8%AF%95%E8%A1%80%E7%BC%98%E8%A7%A3%E6%9E%90/

  • 相关阅读:
    软考系统架构师知识点集锦四:信息安全技术基础知识
    ASM使用小抄
    开源相机管理库Aravis例程学习(五)——camera-api
    Etcd 学习 安装教程
    SpringBoot+Redis 防止用户重复登录
    Java数据结构技巧
    CSS 小技巧:如何保留 hover 的状态?
    第二证券|小鹏持续萎靡,理想蔚来逆势反弹破月销记录
    给运行中的docker容器挂载目录——筑梦之路
    WPF-封装自定义雷达图控件
  • 原文地址:https://blog.csdn.net/szw_yx/article/details/140378615