导入流程分为一下几个步骤:
想要定义合适的元数据类型,就必须先了解Atlas的类型系统,见下面介绍。
Atlas
允许用户为他们想要管理的元数据对象定义一个模型。该模型由称为“types
”的定义组成。被称为“entities
”的“types
”实例代表被管理的实际元数据对象。类型系统是一个允许用户定义和管理类型和实体的组件。由 Atlas 开箱即用的管理的所有元数据对象(例如 Hive tables
)都使用类型建模并表示为实体。
Atlas 原生定义的类型的一个示例: Hive table
。使用以下属性定义 Hive tables
:
Name: hive_table
TypeCategory: Entity
SuperTypes: DataSet
Attributes:
name: string
db: hive_db
owner: string
createTime: date
lastAccessTime: date
comment: string
retention: int
sd: hive_storagedesc
partitionKeys: array<hive_column>
aliases: array<string>
columns: array<hive_column>
parameters: map<string>
viewOriginalText: string
viewExpandedText: string
tableType: string
temporary: boolean
从上面的例子可以看出以下几点:
Atlas
中的type
由name
唯一标识,如上所述的 hive_table
boolean、byte、short、int、long、float、double、biginteger、bigdecimal、string、date
Enum
metatypesarray, map
Entity, Struct, Classification, Relationship
Entity
”、“Struct
”、"Classification
"或 "Relationship
"元类型的type
可以有一个attributes
的集合。每个attribute
都有一个名称(例如’name
’)和其他一些相关的属性。一个属性可以使用表达式type_name.attribute_name
来引用。同样值得注意的是,属性本身是用Atlas元类型定义的。
hive_table.name
是一个字符串,hive_table.aliases
是一个字符串数组,hive_table.db
是指一个名为hive_db
的类型的实例,等等。hive_table.db
)是特别有趣的。注意,使用这样的属性,我们可以在Atlas中定义的两个类型之间定义任意的关系,从而建立丰富的模型。请注意,我们也可以收集一个引用列表作为属性类型(比如hive_table.columns
,它代表了一个从hive_table
到hive_column
类型的引用列表)通过了解官方文档,以及提供的模型示例,接下来定义自己的元数据模型:
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "jdbc_instance",
"description": "Instance that the jdbc datasource",
"superTypes": ["DataSet"],
"serviceType": "jdbc",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "url",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": true
},
{
"name": "userName",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": false
},
{
"name": "productName",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": true
},
{
"name": "productVersion",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": false
},
{
"name": "driverName",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": false
},
{
"name": "driverVersion",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": false
},
{
"name": "isReadOnly",
"typeName": "boolean",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": false
}
]
},
{
"name": "jdbc_db",
"description": "a database (schema) in an jdbc",
"superTypes": ["DataSet"],
"serviceType": "jdbc",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "catalogName",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": true
},
{
"name": "schemaName",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"isUnique": false,
"isIndexable": false
}
]
}
],
"relationshipDefs": [
{
"name": "jdbc_instance_databases",
"serviceType": "jdbc",
"typeVersion": "1.0",
"relationshipCategory": "COMPOSITION",
"relationshipLabel": "__jdbc_instance.databases",
"endDef1": {
"type": "jdbc_instance",
"name": "databases",
"isContainer": true,
"cardinality": "SET",
"isLegacyAttribute": true
},
"endDef2": {
"type": "jdbc_db",
"name": "instance",
"isContainer": false,
"cardinality": "SINGLE",
"isLegacyAttribute": true
},
"propagateTags": "NONE"
}
]
}
模型定义完毕,当然也必须在Atlas中创建该模型,可以调用官方提供的API接口
:
接口地址:/v2/types/typedefs
请求方式:POST
请求数据类型:application/json
调用示例:
@Test
public void testAtlasTypeDefs() throws AtlasServiceException {
AtlasClientV2 atlasClientV2 = getAtlasClientV2();
String str = loadTypeDefsFile("src/main/resources/2022-jdbc_model.json");
AtlasTypesDef atlasTypesDef = JsonUtil.toBean(str, AtlasTypesDef.class);
atlasClientV2.createAtlasTypeDefs(atlasTypesDef);
}
调用结果:可以发现系统中新增该类型
主要有如下两种方式:
MySQL
内部数据库information_schema
表查询实现jdbc
接口DatabaseMetaData
方式获取元数据采集其他元数据信息见文档:https://docs.oracle.com/javase/8/docs/api/java/sql/DatabaseMetaData.html
public static Connection getConnection(){
Connection conn = null;
String driver = "com.mysql.cj.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306";
String user = "root";
String password = "****";
try {
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(true);
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
@Test
public void getDataBaseInfo() throws SQLException {
Connection conn = getConnection();
DatabaseMetaData dbmd = conn.getMetaData();
System.out.println("数据库URL: " + dbmd.getURL());
System.out.println("数据库已知的用户: "+ dbmd.getUserName());
System.out.println("数据库的产品名称:" + dbmd.getDatabaseProductName());
System.out.println("数据库的版本:" + dbmd.getDatabaseProductVersion());
System.out.println("驱动程序的名称:" + dbmd.getDriverName());
System.out.println("驱动程序的版本:" + dbmd.getDriverVersion());
}
@Test
public void getCatalogInfo() throws SQLException {
Connection conn = getConnection();
ResultSet rs;
DatabaseMetaData dbmd = conn.getMetaData();
rs = dbmd.getCatalogs();
while (rs.next()){
String tableSchem = rs.getString("TABLE_CAT");
System.out.println(tableSchem);
}
}
将元数据转换成Atlas Entity
public class DataSourceEntity extends BaseEntity {
public void createJdbcInstance(
DataSourceMeta dataSourceMeta,
String... classificationNames
) throws Exception {
AtlasEntity datasourceEntity = new AtlasEntity(JDBC_INSTANCE);
// set attributes
String name = Utils.getIpAndPort(dataSourceMeta.getUrl()).get();
String qualifiedName = name + "@" + dataSourceMeta.getProductName().toLowerCase();
String description = "instance of jdbc datasource";
String owner = dataSourceMeta.getUserName();
datasourceEntity.setAttribute(NAME, name);
datasourceEntity.setAttribute(QUALIFIED_NAME, qualifiedName);
datasourceEntity.setAttribute(DESCRIPTION, description);
datasourceEntity.setAttribute(OWNER, owner);
datasourceEntity.setAttribute("url", dataSourceMeta.getUrl());
datasourceEntity.setAttribute("userName", dataSourceMeta.getUserName());
datasourceEntity.setAttribute("productName", dataSourceMeta.getProductName());
datasourceEntity.setAttribute("productVersion", dataSourceMeta.getProductVersion());
datasourceEntity.setAttribute("driverName", dataSourceMeta.getDriverName());
datasourceEntity.setAttribute("isReadOnly", dataSourceMeta.getIsReadOnly());
// set classifications
datasourceEntity.setClassifications(toAtlasClassifications(classificationNames));
AtlasEntity instance = createInstance(datasourceEntity);
createCatalogEntities(qualifiedName, instance, dataSourceMeta.getCatalogMetas(),classificationNames);
}
private void createCatalogEntities(
String cluster,
AtlasEntity datasource,
List<CatalogMeta> catalogMetas,
String... classificationNames
) throws Exception {
for (CatalogMeta catalogMeta : catalogMetas) {
AtlasEntity columnEntity = createCatalogEntity(cluster, datasource, catalogMeta, classificationNames);
}
}
private AtlasEntity createCatalogEntity(
String cluster,
AtlasEntity datasource,
CatalogMeta catalogMeta,
String... classificationNames
) throws Exception {
AtlasEntity entity = new AtlasEntity(BaseEntity.JDBC_DB);
String qualifiedName = cluster + "." + catalogMeta.getCatalogName();
// set attributes
entity.setAttribute(NAME, catalogMeta.getCatalogName());
entity.setAttribute(QUALIFIED_NAME, qualifiedName);
entity.setAttribute("catalogName", catalogMeta.getCatalogName());
entity.setAttribute("schemaName", catalogMeta.getSchemaName());
entity.setRelationshipAttribute("instance", toAtlasRelatedObjectId(datasource));
// set classifications
entity.setClassifications(toAtlasClassifications(classificationNames));
return createInstance(entity);
}
}
调用API接口导入元数据:
接口地址:/v2/entity
请求方式:POST
请求数据类型:application/json
调用示例:
protected AtlasEntity createInstance(AtlasEntity entity) throws Exception {
return createInstance(new AtlasEntity.AtlasEntityWithExtInfo(entity));
}
protected AtlasEntity createInstance(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws Exception {
AtlasEntity ret = null;
EntityMutationResponse response = atlasClientV2.createEntity(entityWithExtInfo);
List<AtlasEntityHeader> entities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
if (CollectionUtils.isNotEmpty(entities)) {
AtlasEntity.AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
ret = getByGuidResponse.getEntity();
}
return ret;
}
调用结果: