• apache-atlas-hbase-bridge-源码分析


    元数据类型

    Hbase元数据类型, 包括命令空间、表、列族、列

    public enum HBaseDataTypes {
        // Classes
        HBASE_NAMESPACE,
        HBASE_TABLE,
        HBASE_COLUMN_FAMILY,
        HBASE_COLUMN;
    
        public String getName() {
            return name().toLowerCase();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Hbase元数据采集实现
    1)批量采集HBaseBridge
    2)实时变更 HBaseAtlasCoprocessor
    虽然定义了HBASE_COLUMN,但是实际上是没有实现的,毕竟HBASE_COLUMN是动态添加的。

    执行流程

    HBaseBridge 执行流程如下图所示
    在这里插入图片描述

    源码分析

    HBaseBridge #main

    public class HBaseBridge {
       
    //...
        private final String        metadataNamespace;
        private final AtlasClientV2 atlasClientV2;
        private final Admin         hbaseAdmin;
    
        public static void main(String[] args) {
            int exitCode = EXIT_CODE_FAILED;
            AtlasClientV2 atlasClientV2  =null;
    
            try {
                Options options = new Options();
                options.addOption("n","namespace", true, "namespace");
                options.addOption("t", "table", true, "tablename");
                options.addOption("f", "filename", true, "filename");
    
                CommandLineParser parser            = new BasicParser();
                CommandLine       cmd               = parser.parse(options, args);
                String            namespaceToImport = cmd.getOptionValue("n");
                String            tableToImport     = cmd.getOptionValue("t");
                String            fileToImport      = cmd.getOptionValue("f");
                Configuration     atlasConf         = ApplicationProperties.get();
                String[]          urls              = atlasConf.getStringArray(ATLAS_ENDPOINT);
    
               //...
    
                if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
                    String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
                    atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword);
                } else {
                    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
                    atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
                }
    //...
                HBaseBridge importer = new HBaseBridge(atlasConf, atlasClientV2);
                if (StringUtils.isNotEmpty(fileToImport)) {
                    File f = new File(fileToImport);
                    if (f.exists() && f.canRead()) {
                        BufferedReader br   = new BufferedReader(new FileReader(f));
                        String         line = null;
    
                        while((line = br.readLine()) != null) {
                            String val[] = line.split(":");
                            if (ArrayUtils.isNotEmpty(val)) {
                                //...
                                importer.importHBaseEntities(namespaceToImport, tableToImport);
                            }
                        }
                        exitCode = EXIT_CODE_SUCCESS;
                    } else {
                        LOG.error("Failed to read the file");
                    }
                } else {
                    importer.importHBaseEntities(namespaceToImport, tableToImport);
                    exitCode = EXIT_CODE_SUCCESS;
                }
            } catch(ParseException e) {
               //...
            } catch(Exception e) {
                //...
            }finally {
                //...
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    HBaseBridge#importHBaseEntities

    importHBaseEntities 只要负责处理namespaceToImport和tableToImport参数,然后执行相应的流程

    private boolean importHBaseEntities(String namespaceToImport, String tableToImport) throws Exception {
        boolean ret = false;
    
        if (StringUtils.isEmpty(namespaceToImport) && StringUtils.isEmpty(tableToImport)) {
            // when both NameSpace and Table options are not present
            importNameSpaceAndTable();
            ret = true;
        } else if (StringUtils.isNotEmpty(namespaceToImport)) {
            // When Namespace option is present or both namespace and table options are present
            importNameSpaceWithTable(namespaceToImport, tableToImport);
            ret = true;
        } else  if (StringUtils.isNotEmpty(tableToImport)) {
            importTable(tableToImport);
            ret = true;
        }
    
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    导入所有的命名空间和表

    namespaceToImport和tableToImport均为空,导入所有的namespace和table

    private void importNameSpaceAndTable() throws Exception {
    
        NamespaceDescriptor[] namespaceDescriptors = hbaseAdmin.listNamespaceDescriptors();
    
        if (ArrayUtils.isNotEmpty(namespaceDescriptors)) {
            for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
                String namespace = namespaceDescriptor.getName();
                importNameSpace(namespace);
            }
        }
    
        TableDescriptor[] htds = hbaseAdmin.listTables();
        if (ArrayUtils.isNotEmpty(htds)) {
            for (TableDescriptor htd : htds) {
                String tableName = htd.getTableName().getNameAsString();
                importTable(tableName);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    导入指定的命名空间

    namespaceToImport不为空,导入指定的namespace和namespace下的table

    private void importNameSpaceWithTable(String namespaceToImport, String tableToImport) throws Exception {
        importNameSpace(namespaceToImport);
    
        List<TableDescriptor> hTableDescriptors = new ArrayList<>();
    
        if (StringUtils.isEmpty(tableToImport)) {
    // 导入指定namespace
            List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(namespaceToImport);
            if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) {
                hTableDescriptors = getTableDescriptors(matchingNameSpaceDescriptors);
            }
        } else {
            tableToImport = namespaceToImport +":" + tableToImport;
            TableDescriptor[] htds = hbaseAdmin.listTables(Pattern.compile(tableToImport));
            hTableDescriptors.addAll(Arrays.asList(htds));
        }
    
        if (CollectionUtils.isNotEmpty(hTableDescriptors)) {
            for (TableDescriptor htd : hTableDescriptors) {
                String tblName = htd.getTableName().getNameAsString();
                importTable(tblName);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    导入指定的表

    tableToImport不为空,导入指定的table和table的命名空间。importTable会处理表、列族的实体,没有处理列

    public void importTable(final String tableName) throws Exception {
        String            tableNameStr = null;
        TableDescriptor[] htds         = hbaseAdmin.listTables(Pattern.compile(tableName));
    
        if (ArrayUtils.isNotEmpty(htds)) {
            for (TableDescriptor htd : htds) {
                String tblNameWithNameSpace    = htd.getTableName().getNameWithNamespaceInclAsString();
                String tblNameWithOutNameSpace = htd.getTableName().getNameAsString();
    
                if (tableName.equals(tblNameWithNameSpace)) {
                    tableNameStr = tblNameWithNameSpace;
                } else if (tableName.equals(tblNameWithOutNameSpace)) {
                    tableNameStr = tblNameWithOutNameSpace;
                } else {
                    // when wild cards are used in table name
                    if (tblNameWithNameSpace != null) {
                        tableNameStr = tblNameWithNameSpace;
                    } else if (tblNameWithOutNameSpace != null) {
                        tableNameStr = tblNameWithOutNameSpace;
                    }
                }
    
                byte[]                 nsByte       = htd.getTableName().getNamespace();
                String                 nsName       = new String(nsByte);
                NamespaceDescriptor    nsDescriptor = hbaseAdmin.getNamespaceDescriptor(nsName);
                AtlasEntityWithExtInfo entity       = createOrUpdateNameSpace(nsDescriptor);
                ColumnFamilyDescriptor[]    hcdts        = htd.getColumnFamilies();
    // 处理表、列族,没有处理列
                createOrUpdateTable(nsName, tableNameStr, entity.getEntity(), htd, hcdts);
            }
        } else {
            throw new AtlasHookException("No Table found for the given criteria. Table = " + tableName);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    createOrUpdateTable处理表、列族列实体,比较简单这里就不详细描述

    导入命名空间

    public void importNameSpace(final String nameSpace) throws Exception {
        List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(nameSpace);
    
        if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) {
            for (NamespaceDescriptor namespaceDescriptor : matchingNameSpaceDescriptors) {
                createOrUpdateNameSpace(namespaceDescriptor);
            }
        } else {
            throw new AtlasHookException("No NameSpace found for the given criteria. NameSpace = " + nameSpace);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    createOrUpdateNameSpace处理命名空间实体,比较简单这里就不详细描述

  • 相关阅读:
    一文说清楚前端Event Loop
    提高matlab运算效率——预分配内存
    2-分类问题 SVM 核函数
    CentOS7.9搭建NTP服务器
    ES6 | 解构、模板字符串、简化对象、箭头函数
    使用 Stable Diffusion Img2Img 生成、放大、模糊和增强
    JUC-3-并发锁
    自动驾驶做“双碳”目标下的现实主义者
    导致爬虫无法使用的原因有哪些?
    【小实验1】比较ResNet、ViT、SwinTransformer的归纳偏置(然而并没有达到预期结果)
  • 原文地址:https://blog.csdn.net/windydreams/article/details/127978562