• 导入MySQL元数据到Atlas


    导入MySQL元数据到Atlas

    元数据导入流程

    导入流程分为一下几个步骤:

    1. 定义MySQL元数据模型
    2. 采集MySQL元数据
    3. 将元数据导入到Atlas

    想要定义合适的元数据类型,就必须先了解Atlas的类型系统,见下面介绍。

    Atlas 类型系统

    Atlas 允许用户为他们想要管理的元数据对象定义一个模型。该模型由称为“types”的定义组成。被称为“entities”的“types”实例代表被管理的实际元数据对象。类型系统是一个允许用户定义和管理类型和实体的组件。由 Atlas 开箱即用的管理的所有元数据对象(例如 Hive tables)都使用类型建模并表示为实体。

    Atlas 原生定义的类型的一个示例: Hive table。使用以下属性定义 Hive tables

    Name:         hive_table
    TypeCategory: Entity
    SuperTypes:   DataSet
    Attributes:
        name:             string
        db:               hive_db
        owner:            string
        createTime:       date
        lastAccessTime:   date
        comment:          string
        retention:        int
        sd:               hive_storagedesc
        partitionKeys:    array<hive_column>
        aliases:          array<string>
        columns:          array<hive_column>
        parameters:       map<string>
        viewOriginalText: string
        viewExpandedText: string
        tableType:        string
        temporary:        boolean
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    从上面的例子可以看出以下几点:

    • Atlas 中的typename唯一标识,如上所述的 hive_table
    • A type has a metatype. Atlas 具有以下元类型:
      • Primitive metatypes: boolean、byte、short、int、long、float、double、biginteger、bigdecimal、string、date
      • Enum metatypes
      • Collection metatypes: array, map
      • Composite metatypes: Entity, Struct, Classification, Relationship
    • 实体和分类类型可以从其他类型中 “extend”,称为 “supertype”–凭借这一点,它也将获得包括超类型中定义的属性。这允许建模者在一组相关类型中定义共同的属性等。这又类似于面向对象语言为一个类定义超类的概念。Atlas中的一个类型也有可能从多个超类型中扩展出来。
      • 在这个例子中,每个hive table 都是从一个预定义的超级类型 "DataSet "延伸而来。
    • 具有 “Entity”、“Struct”、"Classification"或 "Relationship"元类型的type可以有一个attributes的集合。每个attribute 都有一个名称(例如’name’)和其他一些相关的属性。一个属性可以使用表达式type_name.attribute_name来引用。同样值得注意的是,属性本身是用Atlas元类型定义的。
      • 在这个例子中,hive_table.name是一个字符串,hive_table.aliases是一个字符串数组,hive_table.db是指一个名为hive_db的类型的实例,等等。
    • 属性中的类型引用,(如hive_table.db)是特别有趣的。注意,使用这样的属性,我们可以在Atlas中定义的两个类型之间定义任意的关系,从而建立丰富的模型。请注意,我们也可以收集一个引用列表作为属性类型(比如hive_table.columns,它代表了一个从hive_tablehive_column类型的引用列表)

    一、定义MySQL元数据模型

    通过了解官方文档,以及提供的模型示例,接下来定义自己的元数据模型:

    {
        "enumDefs": [],
        "structDefs": [],
        "classificationDefs": [],
        "entityDefs": [
            {
                "name": "jdbc_instance",
                "description": "Instance that the jdbc datasource",
                "superTypes": ["DataSet"],
                "serviceType": "jdbc",
                "typeVersion": "1.0",
                "attributeDefs": [
                    {
                        "name": "url",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": true
                    },
                    {
                        "name": "userName",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": false
                    },
                    {
                        "name": "productName",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": true
                    },
                    {
                        "name": "productVersion",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": false
                    },
                    {
                        "name": "driverName",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": false
                    },
                    {
                        "name": "driverVersion",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": false
                    },
                    {
                        "name": "isReadOnly",
                        "typeName": "boolean",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": false
                    }
                ]
            },
            {
                "name": "jdbc_db",
                "description": "a database (schema) in an jdbc",
                "superTypes": ["DataSet"],
                "serviceType": "jdbc",
                "typeVersion": "1.0",
                "attributeDefs": [
                    {
                        "name": "catalogName",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": true
                    },
                    {
                        "name": "schemaName",
                        "typeName": "string",
                        "isOptional": true,
                        "cardinality": "SINGLE",
                        "isUnique": false,
                        "isIndexable": false
                    }
                ]
            }
        ],
        "relationshipDefs": [
            {
                "name": "jdbc_instance_databases",
                "serviceType": "jdbc",
                "typeVersion": "1.0",
                "relationshipCategory": "COMPOSITION",
                "relationshipLabel":    "__jdbc_instance.databases",
                "endDef1": {
                    "type": "jdbc_instance",
                    "name": "databases",
                    "isContainer": true,
                    "cardinality": "SET",
                    "isLegacyAttribute": true
                },
                "endDef2": {
                    "type": "jdbc_db",
                    "name": "instance",
                    "isContainer": false,
                    "cardinality": "SINGLE",
                    "isLegacyAttribute": true
                },
                "propagateTags": "NONE"
            }
        ]
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121

    模型定义完毕,当然也必须在Atlas中创建该模型,可以调用官方提供的API接口:

    接口地址:/v2/types/typedefs

    请求方式:POST

    请求数据类型:application/json

    调用示例:

    @Test
    public void testAtlasTypeDefs() throws AtlasServiceException {
        AtlasClientV2 atlasClientV2 = getAtlasClientV2();
        String str = loadTypeDefsFile("src/main/resources/2022-jdbc_model.json");
    
        AtlasTypesDef atlasTypesDef = JsonUtil.toBean(str, AtlasTypesDef.class);
    
        atlasClientV2.createAtlasTypeDefs(atlasTypesDef);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    调用结果:可以发现系统中新增该类型

    在这里插入图片描述

    二、采集MySQL元数据

    主要有如下两种方式:

    1. 使用MySQL内部数据库information_schema表查询实现
    2. 通过jdbc接口DatabaseMetaData方式获取元数据

    采用方式2获取元数据

    采集其他元数据信息见文档:https://docs.oracle.com/javase/8/docs/api/java/sql/DatabaseMetaData.html

        public static Connection getConnection(){
            Connection conn = null;
            String driver = "com.mysql.cj.jdbc.Driver";
            String url = "jdbc:mysql://localhost:3306";
            String user = "root";
            String password = "****";
            try {
                Class.forName(driver);
                conn = DriverManager.getConnection(url, user, password);
                conn.setAutoCommit(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return conn;
        }
    
        @Test
        public void getDataBaseInfo() throws SQLException {
            Connection conn =  getConnection();
            DatabaseMetaData dbmd = conn.getMetaData();
            System.out.println("数据库URL: " + dbmd.getURL());
            System.out.println("数据库已知的用户: "+ dbmd.getUserName());
            System.out.println("数据库的产品名称:" + dbmd.getDatabaseProductName());
            System.out.println("数据库的版本:" + dbmd.getDatabaseProductVersion());
            System.out.println("驱动程序的名称:" + dbmd.getDriverName());
            System.out.println("驱动程序的版本:" + dbmd.getDriverVersion());
        }
        @Test
        public void getCatalogInfo() throws SQLException {
            Connection conn =  getConnection();
            ResultSet rs;
            DatabaseMetaData dbmd = conn.getMetaData();
            rs = dbmd.getCatalogs();
            while (rs.next()){
                String tableSchem = rs.getString("TABLE_CAT");
                System.out.println(tableSchem);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    导入元数据到Atals

    将元数据转换成Atlas Entity

    public class DataSourceEntity extends BaseEntity {
    
        public void createJdbcInstance(
                DataSourceMeta dataSourceMeta,
                String... classificationNames
        ) throws Exception {
    
            AtlasEntity datasourceEntity = new AtlasEntity(JDBC_INSTANCE);
            // set attributes
            String name = Utils.getIpAndPort(dataSourceMeta.getUrl()).get();
            String qualifiedName = name + "@" + dataSourceMeta.getProductName().toLowerCase();
            String description = "instance of jdbc datasource";
            String owner = dataSourceMeta.getUserName();
    
            datasourceEntity.setAttribute(NAME, name);
            datasourceEntity.setAttribute(QUALIFIED_NAME, qualifiedName);
            datasourceEntity.setAttribute(DESCRIPTION, description);
            datasourceEntity.setAttribute(OWNER, owner);
    
            datasourceEntity.setAttribute("url", dataSourceMeta.getUrl());
            datasourceEntity.setAttribute("userName", dataSourceMeta.getUserName());
            datasourceEntity.setAttribute("productName", dataSourceMeta.getProductName());
            datasourceEntity.setAttribute("productVersion", dataSourceMeta.getProductVersion());
            datasourceEntity.setAttribute("driverName", dataSourceMeta.getDriverName());
            datasourceEntity.setAttribute("isReadOnly", dataSourceMeta.getIsReadOnly());
    
            // set classifications
            datasourceEntity.setClassifications(toAtlasClassifications(classificationNames));
    
            AtlasEntity instance = createInstance(datasourceEntity);
    
            createCatalogEntities(qualifiedName, instance, dataSourceMeta.getCatalogMetas(),classificationNames);
        }
    
    
        private void createCatalogEntities(
                String cluster,
                AtlasEntity datasource,
                List<CatalogMeta> catalogMetas,
                String... classificationNames
        ) throws Exception {
            for (CatalogMeta catalogMeta : catalogMetas) {
                AtlasEntity columnEntity = createCatalogEntity(cluster, datasource, catalogMeta, classificationNames);
            }
        }
    
        private AtlasEntity createCatalogEntity(
                String cluster,
                AtlasEntity datasource,
                CatalogMeta catalogMeta,
                String... classificationNames
        ) throws Exception {
            AtlasEntity entity = new AtlasEntity(BaseEntity.JDBC_DB);
    
            String qualifiedName = cluster + "." + catalogMeta.getCatalogName();
            // set attributes
            entity.setAttribute(NAME, catalogMeta.getCatalogName());
            entity.setAttribute(QUALIFIED_NAME, qualifiedName);
    
            entity.setAttribute("catalogName", catalogMeta.getCatalogName());
            entity.setAttribute("schemaName", catalogMeta.getSchemaName());
    
            entity.setRelationshipAttribute("instance", toAtlasRelatedObjectId(datasource));
    
            // set classifications
            entity.setClassifications(toAtlasClassifications(classificationNames));
    
            return createInstance(entity);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    调用API接口导入元数据:

    接口地址:/v2/entity

    请求方式:POST

    请求数据类型:application/json

    调用示例:

        protected AtlasEntity createInstance(AtlasEntity entity) throws Exception {
            return createInstance(new AtlasEntity.AtlasEntityWithExtInfo(entity));
        }
    
        protected AtlasEntity createInstance(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws Exception {
            AtlasEntity             ret      = null;
            EntityMutationResponse  response = atlasClientV2.createEntity(entityWithExtInfo);
            List<AtlasEntityHeader> entities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
    
            if (CollectionUtils.isNotEmpty(entities)) {
                AtlasEntity.AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
    
                ret = getByGuidResponse.getEntity();
            }
            return ret;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    调用结果:

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    测试查询导入的元数据

    在这里插入图片描述

    在这里插入图片描述

  • 相关阅读:
    如何利用python来提取SQL语句中的表名称
    《大师级引导-应对困境的工具与技术》读书笔记1
    完成原型设计的五个步骤
    使用开源库libyuv中替换开源汇编接口,解决汇编接口中的崩溃问题
    Api接口封装
    android 固定进度环形刷新效果
    WorkTool企微机器人APP分享自定义链接
    java-net-php-python-jsp果蔬预约种植管理系统计算机毕业设计程序
    国内用ChatGPT可以吗
    JVM——类的生命周期(加载阶段,连接阶段,初始化阶段)
  • 原文地址:https://blog.csdn.net/maohuihua123/article/details/126318925