• Nebula graph 源码 学习笔记


    一、概述

    Nebula Graph 是一个开源的分布式图数据库,采用了存算分离的设计,存储采用rocksdb。

    数据库内核:
    1)graph : 用于计算
    2)storage:用于存储
    3)meta:用于存储元数据

    此外,nebula还提供了周边工具,如数据导入(exchange)、监控、可视化、图计算等。

    1.1、总体架构

    nebula的

    在这里插入图片描述
    meta:负责数据管理,例如 Schema 操作、集群管理和用户权限管理等。

    Graph :服务负责处理计算请求

    Storage: 服务负责存储数据

    graph 和 storage 通过心跳定期向 meta汇报自身信息, 心跳通过 fbthrift实现,graph 和storage 进程中的成员变量 metaClient_

    1.2、查询计算引擎架构

    查询引擎采用无状态设计,可轻松实现横向扩展,分为语法分析、语义分析、优化器、执行引擎等几个主要部分。
    在这里插入图片描述

    1.3、存储引擎设计

    Storage 包含两个部分:
    1、meta 相关的存储, 称之为 Meta Service
    2、data 相关的存储, 称之为 Storage Service。

    Storage Service 共有三层:
    1)最底层是 Store Engine;
    2)之上便是 Consensus 层,实现了 Multi Group Raft;
    3)最上层,是 Storage interfaces,这一层定义了一系列和图相关的 API。

    在这里插入图片描述

    二、源码概览

    Nebula的源码地址在:https://github.com/vesoft-inc
    vesoft(欧若数网科技有限公司) 是2018年成立于杭州的公司,nebula graph是他们开发的。

    2.1、源码模块分类

    1、计算查询引擎
    https://github.com/vesoft-inc/nebula-graph

    2、存储引擎
    https://github.com/vesoft-inc/nebula-storage

    3、内核工具包
    https://github.com/vesoft-inc/nebula-common

    4、客户端SDK
    1)Java:https://github.com/vesoft-inc/nebula-java
    2)C++:https://github.com/vesoft-inc/nebula-cpp
    3)Golang:https://github.com/vesoft-inc/nebula-go
    4)Python:https://github.com/vesoft-inc/nebula-python

    5、工具
    1)importer(基于 Go 客户端实现的 csv–>nebula 导入工具):https://github.com/vesoft-inc/nebula-importer
    2)基于spark的导入工具,包括Nebula Exchange、Nebula Spark Connector 和 Nebula Algorithm:https://github.com/vesoft-inc/nebula-spark-utils
    3)备份恢复工具:https://github.com/vesoft-inc/nebula-br
    4)部署工具:https://github.com/vesoft-inc/nebula-ansible,https://github.com/vesoft-inc/nebula-operator(用于容器)

    6、测试工具
    1)压力与性能测试:https://github.com/vesoft-inc/nebula-bench
    2)混沌测试:https://github.com/vesoft-inc/nebula-chaos

    7、编译工具
    1)Nebula Graph 图数据库内核依赖的第三方包:https://github.com/vesoft-inc/nebula-third-party
    2)Nebula Graph 图数据库内核工具链:https://github.com/vesoft-inc/nebula-gears

    8、可视化工具
    https://github.com/vesoft-inc/nebula-studio

    2.2、重点模块结构

    2.2.1、计算查询引擎Nebula Graph

    ├── cmake
    ├── conf
    ├── LICENSES
    ├── package
    ├── resources
    ├── scripts
    ├── src
    │ ├── context
    │ ├── daemons
    │ ├── executor
    │ ├── optimizer
    │ ├── parser
    │ ├── planner
    │ ├── scheduler
    │ ├── service
    │ ├── session
    │ ├── stats
    │ ├── util
    │ ├── validator
    │ └── visitor
    └── tests
    ├── admin
    ├── bench
    ├── common
    ├── data
    ├── job
    ├── maintain
    ├── mutate
    ├── query
    └── tck

    conf/:查询引擎配置文件目录
    package/:graph 打包脚本
    resources/:资源文件
    scripts/:启动脚本
    src/:查询引擎源码目录

    src/context/:查询的上下文信息,包括 AST(抽象语法树),Execution Plan(执行计划),执行结果以及其他计算相关的资源。
    src/daemons/:查询引擎主进程
    src/executor/:执行器,各个算子的实现
    src/optimizer/:RBO(基于规则的优化)实现,以及优化规则
    src/parser/:词法解析,语法解析,:AST结构定义
    src/planner/:算子,以及执行计划生成
    src/scheduler/:执行计划的调度器
    src/service/:查询引擎服务层,提供鉴权,执行 Query 的接口
    src/session/:Session 管理
    src/stats/:执行统计,比如 P99、慢查询统计等
    src/util/:工具函数
    src/validator/:语义分析实现,用于检查语义错误,并进行一些简单的改写优化
    src/visitor/:表达式访问器,用于提取表达式信息,或者优化
    tests/:基于 BDD 的集成测试框架,测试所有 Nebula Graph 提供的功能

    2.2.2、存储引擎Nebula Storage

    ├── cmake
    ├── conf
    ├── docker
    ├── docs
    ├── LICENSES
    ├── package
    ├── scripts
    └── src
    ├── codec
    ├── daemons
    ├── kvstore
    ├── meta
    ├── mock
    ├── storage
    ├── tools
    ├── utils
    └── version

    conf/:存储引擎配置文件目录
    package/:storage 打包脚本
    scripts/:启动脚本
    src/:存储引擎源码目录

    src/codec/:序列化反序列化工具
    src/daemons/:存储引擎和元数据引擎主进程
    src/kvstore/:基于 raft 的分布式 KV 存储实现
    src/meta/:基于 KVStore 的元数据管理服务实现,用于管理元数据信息,集群管理,长耗时任务管理等
    src/storage/:基于 KVStore 的图数据存储引擎实现
    src/tools/:一些小工具实现
    src/utils/:代码工具函数

    2.2.3、内核工具包Nebula Common

    ├── cmake
    │ └── nebula
    ├── LICENSES
    ├── src
    │ └── common
    │ ├── algorithm
    │ ├── base
    │ ├── charset
    │ ├── clients
    │ ├── concurrent
    │ ├── conf
    │ ├── context
    │ ├── cpp
    │ ├── datatypes
    │ ├── encryption
    │ ├── expression
    │ ├── fs
    │ ├── function
    │ ├── graph
    │ ├── hdfs
    │ ├── http
    │ ├── interface
    │ ├── meta
    │ ├── network
    │ ├── plugin
    │ ├── process
    │ ├── session
    │ ├── stats
    │ ├── test
    │ ├── thread
    │ ├── thrift
    │ ├── time
    │ ├── version
    │ └── webservice
    └── third-party

    Nebula Common 仓库代码是 Nebula 内核代码的工具包,提供一些常用工具的高效实现。这里只对其中和图数据库密切相关的目录进行说明。

    src/common/clients/:meta,storage 客户端的 CPP 实现
    src/common/datatypes/:Nebula Graph 中数据类型及计算的定义,比如 string,int,bool,float,Vertex,Edge 等。
    rc/common/expression/:nGQL 中表达式的定义
    src/common/function/:nGQL 中的函数的定义
    src/common/interface/:graph、meta、storage 服务的接口定义

    二、Storage源码

    2.1、Storage概览回顾

    2.1.1、Storage进程

    storage 进程分为三层

    1、storage interface

    这一层定义了一系列和图相关的 API。 这些 API 请求会在这一层被翻译成一组针对相应 Partition 的 kv 操作。比如:getBeightors, Add/delete vertes/edge等等

    2、Consensus

    实现了 Multi Group Raft,每一个 Partition 都对应了一组 Raft Group,这里的 Partition 就是数据分片。目前 Nebula 的分片策略采用了 静态hash的方式;

    用户在创建 SPACE 时需指定 Partition 数,Partition 数量一旦设置便不可更改,一般来讲,Partition 数目要能满足业务将来的扩容需求。

    3、Store Engine

    单机版 local store engine,提供了对本地数据的 get / put / scan / delete 操作,相关的接口放在 KVStore / KVEngine.h 文件里面,目前 Nebula 提供了基于 RocksDB 实现的 Store Engine。

    2.1.2、代码结构

    ├── cmake
    ├── conf
    ├── docker
    ├── docs
    ├── LICENSES
    ├── package
    ├── scripts
    └── src
    ├── codec
    ├── daemons
    ├── kvstore
    ├── meta
    ├── mock
    ├── storage
    ├── tools
    ├── utils
    └── version

    conf/:存储引擎配置文件目录
    package/:storage 打包脚本
    scripts/:启动脚本
    src/:存储引擎源码目录

    src/codec/:序列化反序列化工具
    src/daemons/:存储引擎和元数据引擎主进程
    src/kvstore/:基于 raft 的分布式 KV 存储实现
    src/meta/:基于 KVStore 的元数据管理服务实现,用于管理元数据信息,集群管理,长耗时任务管理等
    src/storage/:基于 KVStore 的图数据存储引擎实现
    src/tools/:一些小工具实现
    src/utils/:代码工具函数

    2.2、源码阅读

    2.2.1、StorageServer类(storage目录下)

    2.2.1.1、成员变量

    1、std::shared_ptrfolly::IOThreadPoolExecutor ioThreadPool_;
    线程池用来处理网络IO任务,从连接中读取请求数据并反序列化

    2、std::shared_ptrapache::thrift::concurrency::ThreadManager workers_;
    负责处理用户的业务逻辑

    3、std::unique_ptrnebula::WebService webSvc_;
    监听port: HTTP[19779], HTTP2[19780]
    接收外部http命令,比如显示一些flags

    4、std::unique_ptrapache::thrift::ThriftServer storageServer_;
    监听port:9779
    与graph的通信,raft一致性协议使用thrift进行通信,addvertex/edge等
    client端提供给grapth进程

    5、std::unique_ptrapache::thrift::ThriftServer adminServer_;
    监听port:9778
    执行任务进程,比如balance,addpart等
    client端提供给meta进程

    6、std::unique_ptrapache::thrift::ThriftServer internalStorageServer_;
    监听port:9777
    toss一致性
    client端提供给peer storage进程

    7、std::unique_ptrmeta::MetaClient metaClient_;
    与meta的通信,得到schema信息,心跳等

    8、std::unique_ptr interClient_;
    InternalStorageService 的客户端,toss一致性

    2.2.1.2、StorageServer启动过程

    进程启动的入口在:storage/StorageServer.cpp bool StorageServer::start() 函数

    主要流程:

    1、创建IO线程池,用于处理网络IO

    ioThreadPool_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
    
    • 1

    2、创建works_,用于用户的业务请求

    workers_ = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(
          numWorkerThreads);
    
    • 1
    • 2

    3、创建meta客户端

    metaClient_用来与meta进行通信,metaClient_→waitForMetadReady() 会在里面注册发送心跳的请求
    storage进行的indexmanger 和 schemamanager去得到相关的index和schma 内部都是用的metaClient的接口

    metaClient_ = std::make_unique<meta::MetaClient>(ioThreadPool_, metaAddrs_, options);
    
    • 1

    4、初始化schemaManager

    schemaMan_ = meta::ServerBasedSchemaManager::create(metaClient_.get());
    
    • 1

    5、初始化indexManager

    indexMan_ = meta::ServerBasedIndexManager::create(metaClient_.get());
    
    • 1

    6、初始化kvstore
    kvstore用于实现raft协议. 端口用的9780,包括多个storage 副本之间的数据同步,心跳,将数据写入到rocksdb中.

    kvstore_ = getStoreInstance();
    
    • 1

    7、初始化interClient_

    interClient_ = std::make_unique<InternalStorageClient>(ioThreadPool_, metaClient_.get());
    
    • 1

    8、创建txnMan_

    txnMan_ = std::make_unique<TransactionManager>(env_.get());
    
    • 1

    9、创建taskMgr_

    taskMgr_ = AdminTaskManager::instance(env_.get());
    
    • 1

    10、初始化storageServer_

    storageServer_ = getStorageServer();
    
    • 1

    11、初始化adminServer_

    adminServer_ = getAdminServer();
    
    • 1

    12、internalStorageServer_

    internalStorageServer_ = getInternalServer();
    
    • 1

    关于ioThreadPool_和workers_线程的区分:
    1)创建的三个thrift的server对象 StorageServer, Adminserver。interServe 都是以workers_ 为业务线程,ioThreadPool_为IO线程
    2)创建kvstore的raftserver也是以workers_ 为业务线程,ioThreadPool_为IO线程

    2.2.2、NebulaStore类(kvstore目录下)

    2.2.2.1、成员变量

    1、SpacePartInfo

    SpacePartInfo结构体,用于存储:
    1)partitionId–> Part的映射
    Part类继承自RaftPart类,其属性包括:

       * @param spaceId
       * @param partId
       * @param localAddr Local address of the Part
       * @param walPath Listener's wal path
       * @param engine Pointer of kv engine
       * @param pool IOThreadPool for listener
       * @param workers Background thread for listener
       * @param handlers Worker thread for listener
       * @param snapshotMan Snapshot manager
       * @param clientMan Client manager
       * @param diskMan Disk manager
       * @param vIdLen Vertex id length of space
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    注意: 这里的engine,也就是下面engines_里面的engine

    2)vector<std::unique_ptr> engines_;
    存储有序集合: engines_
    每新创建一个space,都会对应rocksdb的一个instance,这个engines_里就保存了每个space的rocksdb instance。

    struct SpacePartInfo {
      ~SpacePartInfo() {
        parts_.clear();
        engines_.clear();
      }
    
      std::unordered_map<PartitionID, std::shared_ptr<Part>> parts_;
      std::vector<std::unique_ptr<KVEngine>> engines_;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    2、SpaceListenerInfo结构体

    SpaceListenerInfo用于存储partitionId–> ListenerMap的映射

    struct SpaceListenerInfo {
      std::unordered_map<PartitionID, ListenerMap> listeners_;
    };
    
    • 1
    • 2
    • 3

    这是ListenerMap的定义:

    using ListenerMap = std::unordered_map<meta::cpp2::ListenerType, std::shared_ptr<Listener>>;
    
    • 1
    3、其他成员变量
       * @param options
       * @param ioPool IOThreadPool
       * @param serviceAddr Address of NebulaStore, used in raft
       * @param workers Worker thread
    
    • 1
    • 2
    • 3
    • 4

    2.2.2.2、NebulaStore初始化过程

    NebulaStore初始化入口在 bool NebulaStore::init() 函数,其主要过程为:

    1、启动处理线程

    bgWorkers_ = std::make_shared<thread::GenericThreadPool>();
    bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers");
    storeWorker_ = std::make_shared<thread::GenericWorker>();
    
    • 1
    • 2
    • 3

    2、启动raftService

    raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.port);
    
    • 1

    3、初始化导入

      if (!isListener()) {
        // 如果当前storage不是一个Listener
        // TODO(spw): need to refactor, we could load data from local regardless of partManager,
        // then adjust the data in loadPartFromPartManager.
        loadPartFromDataPath();
        loadPartFromPartManager();
        loadRemoteListenerFromPartManager();
      } else {
        // 如果当前storage是一个Listener
        loadLocalListenerFromPartManager();
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    下面重点看loadPartFromDataPath()和loadPartFromPartManager()的过程:

    loadPartFromDataPath()的过程

    Step1:扫描本地路径,并初始化spaces_

    1)把rootPath赋值为:配置文件中的data_path + "/nebula"目录,如/data1/mmdb/storage/nebula
    2)对rootPath目录下的所有目录做遍历,/data1/mmdb/storage/nebula的下级目录是每个space的spaceId
    3)对spaceId为0的目录跳过
    4)为每个spaceId都newEngine,即创建一个rocksdb instance:

    auto engine = newEngine(spaceId, path, options_.walPath_);
    
    • 1

    Step2:为保存到本地的part信息做负载均衡
    过程待补充

    Step3:load保存到本地engine的part ids
    过程待补充

    Step4:创建part并加入到space
    过程待补充

    loadPartFromPartManager()过程

    1)根据storeSvcAddr_从cache中获取partsMap
    2)遍历partsMap,拿到spaceId后, addSpace(spaceId);
    3)把每个peerPart存入 vector partIds,并排序
    4)对partIds遍历,并执行 addPart(spaceId, partId, false, partPeers[partId].hosts_);

    addSpace的过程

    newEngine有两个地方被触发:
    1) loadPartFromDataPath()的过程
    2)loadPartFromPartManager() 中addSpace当engine不存在时

    newEngine的过程:会创建一个rocksdb的instance并返回

    return std::make_unique<RocksEngine>(
            spaceId, vIdLen, dataPath, walPath, options_.mergeOp_, cfFactory);
    
    • 1
    • 2

    addSpace的过程:
    Step1:加写锁

    Step2:如果当前storage不是Listener角色:则遍历所有engine以确保每一个dataPath都有一个engine
    1、如果spaceId在std::unordered_map<GraphSpaceID, std::shared_ptr> spaces_中:
    1)拼装出dataPath:配置中的dataPath + “/nebula/” + spaceId
    2)从RocksDBEngine.h缓存dataPath_中查询auto dPath = (*iter)->getDataRoot();
    3)比较二者是否一致,从而判断engine是否存在
    4)如果engine不存在,则newEngine,并新创建的Engine即rocksdb instance存放到SpacePartInfo结构体中的std::vector<std::unique_ptr> engines_;中。
    2、如果spaceId不在std::unordered_map<GraphSpaceID, std::shared_ptr> spaces_中:
    则newEngine,并新创建的Engine即rocksdb instance存放到SpacePartInfo结构体中的std::vector<std::unique_ptr> engines_;中。

    Step3:如果当前storage是Listener角色
    1、如果spaceId在std::unordered_map<GraphSpaceID, std::shared_ptr> spaces_中:
    打印spaceId已存在,然后返回
    2、如果spaceId不在std::unordered_map<GraphSpaceID, std::shared_ptr> spaces_中:
    创建SpaceListenerInfo实例

    2.2.2.3、NebulaStore的ingest过程

    NebulaStore的ingest过程在NebulaStore.cpp的nebula::cpp2::ErrorCode NebulaStore::ingest(GraphSpaceID spaceId)函数。

    ingest指根据spaceId找到Engine并读取.sst文件写入nebula的过程:

    Step1:根据spaceid去std::unordered_map<GraphSpaceID, std::shared_ptr> spaces_; 这个map中找到SpacePartInfo结构体

    Step2:从SpacePartInfo结构体中找到他的成员vector<std::unique_ptr> engines_;

    Step3:对vector<std::unique_ptr> engines_遍历,找出每个RocksEngine

    Step4:对每个Engine,列出其所有的partition

    Step5:对其列出的每个partition遍历
    1)从std::unordered_map<GraphSpaceID, std::shared_ptr> spaces_; 中根据spaceid找到shared_ptr,
    从SpacePartInfo中找到std::unordered_map<PartitionID, std::shared_ptr> parts_;
    从parts_中找到KVEngine,即为RocksEngine
    2)先拼装出其下载的目录:如/data1/mmdb/storage/download/{spaceid}
    3)遍历目录下的所有文件,做rocksdb的IngestExternalFile(files, options)

  • 相关阅读:
    3.2-Docker Image概述
    Linux 文件访问权限说明
    JAVA byte类型转String类型
    Databend 源码阅读:配置管理
    通信原理学习笔记6-4:数字解调——抽样判决的译码准则(最大后验概率准则MAP、最大似然准则ML、最小二乘/最小平方准则LS、最小距离准则)
    Python --解析xml
    xss漏洞和分析
    新星微前端MicroApp的基础教程
    机器人路径规划:基于A*算法的机器人路径规划(提供Python代码)
    360安全大模型为什么是“非卖品”?
  • 原文地址:https://blog.csdn.net/shijinghan1126/article/details/125455241