Nebula Graph介绍和SpringBoot环境连接和查询
转载请注明来源 https://www.cnblogs.com/milton/p/16784098.html
说明
当前Nebula Graph的最新版本是3.2.1, 根据官方的文档进行配置
https://docs.nebula-graph.io/3.2.1/14.client/4.nebula-java-client/
Nebula Graph 的一些特点
- 支持分布式. 相对于Neo4j, TigerGraph这些图数据库, Nebula 是面向分布式设计的, 因此对集群的支持比较完备, 在规模上上限要高很多. 在实际项目中存储了180亿的点边, 这个数量对于Neo4j和TigerGraph是比较困难的.
- 支持图空间. 各个图空间的ID是互不干扰的, 但是在同一个图空间里ID的类型和长度必须一致. 注意这个一致约束的是所有的点和边. Nebula 可以使用int64作为ID, 也可以用字符串, 但是字符串需要指定一个长度, 例如64个字节. 相对于只能用长整数的Neo4j, ID设计上更自由灵活.
- 点对应的类型叫TAG, 边对应的类型叫EDGE
- TAG和EDGE都会对应一组的属性(map, 或者说dict)
- 一个点可以对多个TAG, 每个TAG一组属性, 多组属性. 项目中建议一开始不要用多TAG, 在整个图结构稳定后, 再做合并
- 一个边只对应一个EDGE, 一组属性
- Nebula 用的是自定义的查询语法 GQL, 和 cypher 语法基本一样
- 除了点边的ID和关联关系外, 只有带索引的属性可以查询. 这点和其它图数据库不一样, 其它数据库即使没有索引, 慢是慢点但是不报错, Nebula直接给你返回错误.
- 对于返回数量较大的查询, Nebula会强制查询必须带limit
- Nebula 单节点稳定性是有问题的, 在3.2.1版本中观察到偶尔会出现服务自行退出, 如果在生产环境使用, 需要有后台监控进行心跳检测和自动启动
GQL 常用查询
下面列出一些常用的查询
-- 列出图空间 SHOW SPACES; -- 列出tag(点类型)和edge(边类型), 需要先 USE 一个图空间 SHOW TAGS; SHOW EDGES;
列出某一类型的点和边
MATCH ()-[e:follow]-() RETURN e MATCH (v:player) RETURN v
带条件的查询, 在结果数量较多时必须带limit, 否则Nebula会报错
match (v:ADDRESS)-[e]-() where id(v)==\"ADD:82388116\" return v,e limit 100
基础配置和使用
在上面的链接中, 提供了最小的配置和测试代码
pom.xml 增加包依赖
对于Nebula Graph 3.2.1, 需要使用3.0.0的版本. client的每个版本只能对应特定的一两个服务端版本
<dependency> <groupId>com.vesoftgroupId> <artifactId>clientartifactId> <version>3.0.0version> dependency>
Java调用
Java调用主要是三部分, 创建连接池, 创建会话, 执行查询
创建 NebulaPool 连接池
连接到地址127.0.0.1, 端口9669, 连接池大小100. 注意地址和端口是一个列表, Nebula是支持集群的. 连接时不需要用户和密码
NebulaPool pool = new NebulaPool(); try { NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); nebulaPoolConfig.setMaxConnSize(100); List addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669)); Boolean initResult = pool.init(addresses, nebulaPoolConfig); if (!initResult) { log.error("pool init failed."); return; } } catch () //...
创建 Session 会话
创建会话时需要用户名和密码
Session session = pool.getSession("root", "nebula", false);
执行查询
创建一个SPACE, 然后使用这个SPACE, 创建一个TAG person, 创建一个EDGE like
String createSchema = "CREATE SPACE IF NOT EXISTS test(vid_type=fixed_string(20)); " + "USE test;" + "CREATE TAG IF NOT EXISTS person(name string, age int);" + "CREATE EDGE IF NOT EXISTS like(likeness double)"; ResultSet resp = session.execute(createSchema); if (!resp.isSucceeded()) { log.error(String.format("Execute: `%s', failed: %s", createSchema, resp.getErrorMessage())); System.exit(1); }
添加一个点记录
String insertVertexes = "INSERT VERTEX person(name, age) VALUES " + "'Bob':('Bob', 10), " + "'Lily':('Lily', 9), " + "'Tom':('Tom', 10), " + "'Jerry':('Jerry', 13), " + "'John':('John', 11);"; ResultSet resp = session.execute(insertVertexes); if (!resp.isSucceeded()) { log.error(String.format("Execute: `%s', failed: %s", insertVertexes, resp.getErrorMessage())); System.exit(1); }
查询
String query = "GO FROM \"Bob\" OVER like " + "YIELD $^.person.name, $^.person.age, like.likeness"; ResultSet resp = session.execute(query); if (!resp.isSucceeded()) { log.error(String.format("Execute: `%s', failed: %s", query, resp.getErrorMessage())); System.exit(1); } printResult(resp);
在 SpringBoot 项目中使用 Nebula Graph
pom.xml 增加包依赖
<dependency> <groupId>com.vesoftgroupId> <artifactId>clientartifactId> <version>3.0.0version> dependency>
Session工厂: NebulaSessionFactory.java
配合@Bean(destroyMethod = "close")
, 创建一个工厂类, 接收pool并实现close()方法
public class NebulaSessionFactory { private final NebulaPool pool; private final String username; private final String password; public NebulaSessionFactory(NebulaPool pool, String username, String password) { this.pool = pool; this.username = username; this.password = password; } public Session getSession() { try { return pool.getSession(username, password, false); } catch (NotValidConnectionException|IOErrorException|AuthFailedException|ClientServerIncompatibleException e) { throw new RuntimeException("Nebula session exception", e); } } public void close() { pool.close(); } }
为什么不直接将 NebulaPool 配置为Bean? 因为 Session 每次创建时需要带用户名密码, 将密码作为config注入到每个Service中肯定是大家都不愿意看到的.
配置修改: application.yml
- 这里的值如果不打算使用profile配置, 可以直接写入
- hosts是逗号分隔的地址端口列表, 例如
10.22.33.33:9669,10.22.33.34:9669
myapp: nebula: hosts: @nebula.hosts@ username: @nebula.username@ password: @nebula.password@ max-conn: @nebula.max-conn@
Spring启动配置: NebulaGraphConfig.java
应用启动时读取配置, 创建 NebulaPool, 并实例化 NebulaSessionFactory, destroyMethod = "close"
, 这个表示在项目shutdown时会调用Bean的close方法释放资源.
@Configuration public class NebulaGraphConfig { @Value("${myapp.nebula.hosts}") private String hosts; @Value("${myapp.nebula.max-conn}") private int maxConn; @Value("${myapp.nebula.username}") private String username; @Value("${myapp.nebula.password}") private String password; @Bean(destroyMethod = "close") public NebulaSessionFactory nebulaSessionFactory() { List hostAddresses = new ArrayList<>(); String[] hostList = hosts.split(",[ ]*"); for (String host : hostList) { String[] hostParts = host.split(":"); if (hostParts.length != 2 || !hostParts[1].matches("\\d+")) { throw new RuntimeException("Invalid host name set for Nebula: " + host); } hostAddresses.add(new HostAddress(hostParts[0], Integer.parseInt(hostParts[1]))); } NebulaPoolConfig poolConfig = new NebulaPoolConfig(); poolConfig.setMaxConnSize(maxConn); NebulaPool pool = new NebulaPool(); try { pool.init(hostAddresses, poolConfig); } catch (UnknownHostException e) { throw new RuntimeException("Unknown Nebula hosts"); } return new NebulaSessionFactory(pool, username, password); } }
Service调用
在 Service 中进行调用
@Service @Slf4j public class GraphServiceImpl implements GraphService { @Autowired private NebulaSessionFactory sessionFactory; @Override public NebulaResult query(String graphSpace, String gql) { Session session = null; try { log.info("GQL: {}", gql); session = sessionFactory.getSession(); NebulaResult res = query(session, "USE " + graphSpace); if (!res.isSuccess() || res.getResults() == null || res.getResults().size() == 0) { log.error("Failed to use space:{}", graphSpace); return null; } if (!graphSpace.equals(res.getResults().get(0).getSpaceName())) { log.error("Failed to use space:{}, result:{}", graphSpace, res.getResults().get(0).getSpaceName()); return null; } return query(session, gql); } catch (IOErrorException e) { log.error(e.getMessage(), e); return null; } finally { if (session != null) { session.release(); } } } private NebulaResult query(Session session, String gql) throws IOErrorException { String json = session.executeJson(gql); return JacksonUtil.extractByType(json, new TypeReference<>() {}); } }
辅助类 NebulaResult.java 等
外层结构
这里定义了 json 格式响应的外层结构
@Data public class NebulaResult implements Serializable { private List errors; private List> results; @JsonIgnore public boolean isSuccess() { return (errors != null && errors.size() == 1 && errors.get(0).getCode() == 0); } @Data public static class Error implements Serializable { private int code; } @Data @JsonIgnoreProperties(ignoreUnknown = true) @JsonInclude(JsonInclude.Include.NON_NULL) public static class Result implements Serializable { private String spaceName; private List> data; private List columns; private Error errors; private long latencyInUs; } @Data public static class Element implements Serializable { private List> meta; private List row; } @Data public static class Meta implements Serializable { private String type; private T id; } }
内层因为区分Edge和Vertex, 结构不一样. 如果是混合返回的结果, 可以用 Serializable
String gql = "match (v:ADDR)-[e]-() where id(v)==\"ADD:123123\" return v,e limit 100"; NebulaResult res = graphService.query("insurance", gql); log.info(JacksonUtil.compress(res)); Assertions.assertThat(res).isNotNull();
对于边, 需要使用结构化的ID
@Data @JsonIgnoreProperties(ignoreUnknown = true) @JsonInclude(JsonInclude.Include.NON_NULL) public class EdgeId implements Serializable { private int ranking; private int type; private String dst; private String src; private String name; }
用这个结构进行查询
NebulaResult res3 = graphService.query("t_test1", "MATCH ()-[e:follow]-() RETURN e");
对于点, ID就是String
NebulaResult res2 = graphService.query("t_test1", "MATCH (v:player) RETURN v");
工具类 JacksonUtil.java
public class JacksonUtil { private static final Logger log = LoggerFactory.getLogger(JacksonUtil.class); private static final ObjectMapper MAPPER = createObjectMapper(); private JacksonUtil() {} private static ObjectMapper createObjectMapper() { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); objectMapper.setDateFormat(df); return objectMapper; } public static ObjectMapper getObjectMapper() { return MAPPER; } public static String compressByView(Object o, Class> clazz) { if (o == null) { return null; } try { return MAPPER.writerWithView(clazz).writeValueAsString(o); } catch (JsonProcessingException e) { log.error(e.getMessage(), e); } return null; } public static JsonNode readTree(String json) { if (json == null || json.length() == 0) {return null;} try { return MAPPER.readTree(json); } catch (JsonProcessingException e) { log.error(e.getMessage(), e); } return null; } public static JsonNode readTree(Object obj) { if (obj == null) {return null;} return MAPPER.valueToTree(obj); } public static T extractByType(JsonNode jsonNode, TypeReference tp) { if (jsonNode == null) {return null;} try { return MAPPER.readerFor(tp).readValue(jsonNode); } catch (IOException e) { log.error(e.getMessage(), e); } return null; } public static T extractByType(String json, TypeReference tp) { if (json == null || json.length() == 0) {return null;} try { return MAPPER.readValue(json, tp); } catch (JsonProcessingException e) { log.error(e.getMessage(), e); } return null; } public static T extractByType(JsonNode jsonNode, Type type) { if (jsonNode == null) {return null;} try { JavaType javaType = MAPPER.getTypeFactory().constructType(type); return MAPPER.readerFor(javaType).readValue(jsonNode); } catch (IOException e) { log.error(e.getMessage(), e); } return null; } public static T extractByType(String json, Type type) { JavaType javaType = MAPPER.getTypeFactory().constructType(type); return extractByType(json, javaType); } public static T extractByType(String json, JavaType type) { if (json == null || json.length() == 0) {return null;} try { return MAPPER.readValue(json, type); } catch (JsonProcessingException e) { log.error(e.getMessage(), e); } return null; } public static T extractByClass(JsonNode jsonNode, Class clazz) { if (jsonNode == null) {return null;} try { return MAPPER.treeToValue(jsonNode, clazz); } catch (JsonProcessingException e) { log.error(e.getMessage(), e); } return null; } public static T extractByClass(String json, Class clazz) { if (json == null || json.length() == 0) {return null;} try { return MAPPER.readValue(json, clazz); } catch (JsonProcessingException e) { log.error(e.getMessage(), e); } return null; } public static T extractByClass(Map map, Class clazz) { if (map == null) {return null;} return MAPPER.convertValue(map, clazz); } public static String compress(T object) { if (object == null) {return null;} try { return MAPPER.writer().writeValueAsString(object); } catch (JsonProcessingException e) { log.error(e.getMessage(), e); } return null; } }