• Apache-atlas-kafka-hook-源码分析


    流程图

    KafkaBridge导入kafka 比较简单, 主要是导入kafka的topic名称,流程图大致如下:
    流程图

    源码梳理

    KafkaBridge.main方法

    在这里插入代码片
    public static void main(String[] args) {
    
        int exitCode = EXIT_CODE_FAILED;
        AtlasClientV2 atlasClientV2 = null;
    
        try {
    
    // 获取命令行参数
            Options options = new Options();
    // 指定topic
            options.addOption("t","topic", true, "topic");
    // 指定文件(包含主题名,每行一个主题)
            options.addOption("f", "filename", true, "filename");
    
            CommandLineParser parser        = new BasicParser();
            CommandLine       cmd           = parser.parse(options, args);
            String            topicToImport = cmd.getOptionValue("t");
            String            fileToImport  = cmd.getOptionValue("f");                 // 导入文件?
    // 获取atlas-application.properties文件
            Configuration     atlasConf     = ApplicationProperties.get();           
     
            String[]          urls          = atlasConf.getStringArray(ATLAS_ENDPOINT);// ATLAS API接口地址
            if (urls == null || urls.length == 0) {
                urls = new String[] { DEFAULT_ATLAS_URL };
            }
    
            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
     // 接收控制台的输入
                String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
                atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword);
            } else {
                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    
    
                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
            }
    
    // 执行导入
            KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2);
    
            if (StringUtils.isNotEmpty(fileToImport)) {
                File f = new File(fileToImport);
    
                if (f.exists() && f.canRead()) {
    
                    BufferedReader br   = new BufferedReader(new FileReader(f));
                    String         line = null;
    
                    while((line = br.readLine()) != null) {
                        topicToImport = line.trim();
                        // 导入命令行指定文件包含的topic 按行进行分割
                        importer.importTopic(topicToImport);
                    }
                    exitCode = EXIT_CODE_SUCCESS;
    
                } else {
                    LOG.error("Failed to read the file");
                }
            } else {
                // 导入命令行指定的topic
                importer.importTopic(topicToImport);
                exitCode = EXIT_CODE_SUCCESS;
            }
    
        } catch(ParseException e) {
            LOG.error("Failed to parse arguments. Error: ", e.getMessage());
            printUsage();
        } catch(Exception e) {
            System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message");
            e.printStackTrace();
            LOG.error("ImportKafkaEntities failed", e);
        } finally {
            if (atlasClientV2 != null) {
                atlasClientV2.close();
            }
        }
    
        System.exit(exitCode);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    KafkaBridge构造函数, 进行初始化

    public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
    
        String   zookeeperConnect    = getZKConnection(atlasConf);
        int      sessionTimeOutMs    = atlasConf.getInt(ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS) ;
        int      connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
    // 创建zk 从zk中获取元数据
        ZkClient zkClient            = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
    
        this.atlasClientV2     = atlasClientV2;
        this.metadataNamespace = getMetadataNamespace(atlasConf);
        this.zkUtils           = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
    // 获取所有的主题
        this.availableTopics   = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    KafkaBridge.importTopic

    public void importTopic(String topicToImport) throws Exception {
        List<String> topics = availableTopics;
    
    // topicToImport 不为空, 导入知道的topic, 否则全部导入
        if (StringUtils.isNotEmpty(topicToImport)) {
            List<String> topics_subset = new ArrayList<>();
            for(String topic : topics) {
                if (Pattern.compile(topicToImport).matcher(topic).matches()) {
                    topics_subset.add(topic);
                }
            }
            topics = topics_subset;
        }
    
    // 依次导入元数据
        if (CollectionUtils.isNotEmpty(topics)) {
            for(String topic : topics) {
                createOrUpdateTopic(topic);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    KafkaBridge.createOrUpdateTopic

    AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
    
    //主题限定名(唯一) 
        String                 topicQualifiedName = getTopicQualifiedName(metadataNamespace, topic);
    
    // 查询
        AtlasEntityWithExtInfo topicEntity        = findTopicEntityInAtlas(topicQualifiedName);
    
        if (topicEntity == null) {
    // 新增
            System.out.println("Adding Kafka topic " + topic);
            LOG.info("Importing Kafka topic: {}", topicQualifiedName);
    
            AtlasEntity entity = getTopicEntity(topic, null);
            topicEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity));
        } else {
    // 修改
            System.out.println("Updating Kafka topic "  + topic);
            LOG.info("Kafka topic {} already exists in Atlas. Updating it..", topicQualifiedName);
    
            AtlasEntity entity = getTopicEntity(topic, topicEntity.getEntity());
            topicEntity.setEntity(entity);
    
            topicEntity = updateEntityInAtlas(topicEntity);
        }
    
        return topicEntity;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    kafka 元数据采集进入后,可以对元数据进行治理, 比如分类和关联术语
    分类、关联术语

    思考

    kafka 元数据采集的几点思考
    1)考虑到业务系统割裂的情况,不同的系统可能使用不同的kafka集群,因此需要采集不同集群的数据
    2)目前atlas提供采集的元数据是主题名称,是否可以将主题消息的定义、主题的生产端、消费端、集群等纳入元数据的范畴?比如定义如下主题:
    kafka_topic(已有)
    kafka_topic_def
    kafka_zk(zookeeper 集群 另外一种数据源)
    kafka_borker
    kafka_producer
    kafka_consumer

  • 相关阅读:
    java-php-net-python-球鞋购物网站计算机毕业设计程序
    IDEA手动导入jar包到maven本地库
    Reggie外卖项目 —— 员工管理模块之启用/禁用员工账号功能
    【ShaderLab罪恶装备卡通角色_二次元风格_“Sol Badguy“_角色渲染(第二篇)】
    08_Express框架
    Postman 批量测试接口详细教程
    内存问题难定位,那是因为你没用ASAN
    MST2101Q2 摩托车三相磁电机调压器控制芯片
    HHSTU1050型货车转向系及前轴设计(说明书+任务书+CAD图纸)
    IDEA:开发配置(2024版 建议收藏)
  • 原文地址:https://blog.csdn.net/windydreams/article/details/127813483