• miniob源码 架构概览


    miniob源码 架构概览

    整体架构

    如下图,简单描述了,observer启动后,建立监听、注册libevent事件,recv后触发各stags的handle_event、处理结果回调、threadpool运行机制等等几个方面对整体线程模型、reactor模型和各组件工作流进行分析。

    image.png

    Reactor模型

    • miniob运行框架是通过libevent实现了对网络事件的监听,当链接建立后,读缓冲区事件触发回调recv函数的处理流程。
    event_set(&client_context->read_event, client_context->fd, EV_READ | EV_PERSIST, recv, client_context);
    
    • 1

    image.png

    • 在recv函数的最后将SessionEvent添加到SessionStage的事件队列中,同时将SessionStage添加到threapool的Stage队列中。
      SessionEvent *sev = new SessionEvent(client);
      session_stage_->add_event(sev);
    
    • 1
    • 2
    • thread_pool的stage队列(thread_pool.h)
    std::deque<Stage *> run_queue_;  //< list of stages with work to do
    
    • 1
    • stage_event队列(stage.h)
    std::deque<StageEvent *> event_list_;  // event queue
    
    • 1

    线程模型

    miniob采用多线程的架构,通过Threadpool类创建线程池,根据etc/observer.ini中的配置创建线程。

    # threadpools' name, it will contain the threadpool's section
    ThreadPools=SQLThreads,IOThreads,DefaultThreads
    
    [SQLThreads]
    # the thread number of this threadpool, 0 means cpu's cores.
    # if miss the setting of count, it will use cpu's core number;
    count=3
    #count=0
    
    [IOThreads]
    # the thread number of this threadpool, 0 means cpu's cores.
    # if miss the setting of count, it will use cpu's core number;
    count=3
    #count=0
    
    [DefaultThreads]
    # If Stage haven't set threadpool, it will use this threadpool
    # This threadpool is used for backend operation, such as timer, sedastats and so on.
    # the thread number of this threadpool, 0 means cpu's cores.
    # if miss the setting of count, it will use cpu's core number;
    count=3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    ThreadPools 配置了3个线程池:SQLThreads,IOThreads,DefaultThreads,每个线程池可以配置count,即线程的个数。

    • 创建线程池和线程,初始化时通过new ThreadPools,在构造函数中调用add_threads,根据线程个数的配置创建线程。
      // attempt to start the requested number of threads
      for (i = 0; i < threads; i++) {
        int stat = pthread_create(&pthread, &pthread_attrs, Threadpool::run_thread, (void *)this);
        if (stat != 0) {
          LOG_WARN("Failed to create one thread\n");
          break;
        }
        char tmp[32] = {};
        sprintf(tmp,"%s%u",name_.c_str(), i);
        pthread_setname_np(pthread, tmp);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注意:8,9,10三行是我后加的代码,目的是在top的时候显示线程名称。

    • 在TimerStage初始化的时候bool TimerStage::initialize()函数也创建了一个线程。
    bool TimerStage::initialize()
    {
      // The TimerStage does not send messages to any other stage.
      ASSERT(next_stage_list_.size() == 0, "Invalid NextStages list.");
    
      // Start the thread to maintain the timer
      const pthread_attr_t *thread_attrs = NULL;
      void *thread_args = (void *)this;
      int status = pthread_create(&timer_thread_id_, thread_attrs, &TimerStage::start_timer_thread, thread_args);
      if (status != 0)
        LOG_ERROR("failed to create timer thread: status=%d\n", status);
      pthread_setname_np(timer_thread_id_, "TimerStage");
      
      return (status == 0);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    注意:低12行是我后加的代码,目的是在top的时候显示线程名称。

    • 启动observer后可以通过top看到各线程情况
    top -p `ps -ef | grep observer | grep -v grep | awk '{print $2}'` -H
    
    • 1

    image.png

    注意:可以看到observer的线程情况为,主线observer + SQLThreads(3个)+ IOThreads(3个)+ DefaultThreads(3个)+ TimerStage 共11 个线程

    • 初始化stage时,安装配置etc/observer.ini为每个stage分别了不同的线程池
    [SessionStage]
    ThreadId=SQLThreads
    NextStages=PlanCacheStage
    
    [PlanCacheStage]
    ThreadId=SQLThreads
    #NextStages=OptimizeStage
    NextStages=ParseStage
    
    [ParseStage]
    ThreadId=SQLThreads
    NextStages=ResolveStage
    
    [ResolveStage]
    ThreadId=SQLThreads
    NextStages=QueryCacheStage
    
    [QueryCacheStage]
    ThreadId=SQLThreads
    NextStages=OptimizeStage
    
    [OptimizeStage]
    ThreadId=SQLThreads
    NextStages=ExecuteStage
    
    [ExecuteStage]
    ThreadId=SQLThreads
    NextStages=DefaultStorageStage,MemStorageStage
    
    [DefaultStorageStage]
    ThreadId=IOThreads
    BaseDir=./miniob
    SystemDb=sys
    
    [MemStorageStage]
    ThreadId=IOThreads
    
    [MetricsStage]
    NextStages=TimerStage
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • bug修复,在分配线程池的时候代码有逻辑错误

    image.png

    注意:低315行,317行为调整好的代码,建issue:stage线程分配问题 #75 ,PR:stage线程分配问题 #75 #76

    Stage模型

    可以把stage模型看做一个链,每个stage都是链上的节点,节点的入口是handle_event,每个handle_event都会调用下一个stage的handle_event。

    image.png

    NextStages在配置文件中给出,但是实际上代码已经写的比较固化,比如支持配置多个NextStages,但成员变量中已经固定了一个或者两个

    handle_event的参数是SQLStageEvent,SQLStageEvent中包括Stmt、Query、sql_等重要的数据结构

    private:
      SessionEvent *session_event_ = nullptr;
      std::string sql_;
      Query *query_ = nullptr;
      Stmt *stmt_ = nullptr;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    其中包含SessionEvent的原因是SessionEvent 中带有的回调函数可以向客户端返回应答。

    private:
      ConnectionContext *client_;
    
      std::string response_;
    
    • 1
    • 2
    • 3
    • 4
    • SessionStage::callback_event回调函数,向客户端返回执行结果
    void SessionStage::callback_event(StageEvent *event, CallbackContext *context)
    {
      LOG_TRACE("Enter\n");
    
      SessionEvent *sev = dynamic_cast<SessionEvent *>(event);
      if (nullptr == sev) {
        LOG_ERROR("Cannot cat event to sessionEvent");
        return;
      }
    
      const char *response = sev->get_response();
      int len = sev->get_response_len();
      if (len <= 0 || response == nullptr) {
        response = "No data\n";
        len = strlen(response) + 1;
      }
      Server::send(sev->get_client(), response, len);
      if ('\0' != response[len - 1]) {
        // 这里强制性的给发送一个消息终结符,如果需要发送多条消息,需要调整
        char end = 0;
        Server::send(sev->get_client(), &end, 1);
      }
    
      // sev->done();
      LOG_TRACE("Exit\n");
      return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    注意:低17行向客户端发送处理结果response。

    • SessionEvent是在ParseStage阶段被调用的
    void ParseStage::callback_event(StageEvent *event, CallbackContext *context)
    {
      LOG_TRACE("Enter\n");
      SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
      sql_event->session_event()->done_immediate();
      sql_event->done_immediate();
      LOG_TRACE("Exit\n");
      return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注意:第5行为回调SessionEvent,直接将结果返回客户端。

    • 词法解析、语法解析是在ParseStage的handle_event(request)进行的
    RC ParseStage::handle_request(StageEvent *event)
    {
      SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
      const std::string &sql = sql_event->sql();
    
      Query *query_result = query_create();
      if (nullptr == query_result) {
        LOG_ERROR("Failed to create query.");
        return RC::INTERNAL;
      }
    
      RC ret = parse(sql.c_str(), query_result);
      if (ret != RC::SUCCESS) {
        // set error information to event
        sql_event->session_event()->set_response("Failed to parse sql\n");
        query_destroy(query_result);
        return RC::INTERNAL;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    注意:第12行电源parse进行语法、词法解析,生成语法树Query *query_result。

    总结

    整体架构是基于Reactor事件驱动的异步消息处理模型,使用线程池,通过配置文件编排stage链,完成SQL处理流水线。当链接建立后,libevent的epool监听读缓冲区,收到可读event后,触发recv函数事件,recv将收到的数据和session信息打包成SessionEvent,并以SessionStage形式添加到thread_pool的stage队列中,同时触发SessionStage的handle_event,启动流水线链,然后按照流水线配置进行逐步处理,最后通过SessionStage的回调函数callback_event将处理结果发送至客户端。

    整体架构应该比较清晰了,后续将逐步对各组件进行详细分析。

  • 相关阅读:
    17基于matlab卡尔曼滤波的行人跟踪算法,并给出算法估计误差结果,判断算法的跟踪精确性,程序已调通,可直接运行,基于MATLAB平台,可直接拍下。
    IEEE的论文哪里可以下载?
    Landsat数据在USGS中无法下载Surface Reflectance产品的解决方法
    【重温基础算法】内部排序之冒泡排序法
    EasyCVR视频智能分析系统如何助力广场流动摊贩监管手段升级
    【学习笔记】AGC008
    【狂神说】CSS3详解
    【机器学习】机器学习重要分支——强化学习:从理论到实践
    MYSQL之DML(数据库操作语言)
    单独使用return关键字
  • 原文地址:https://blog.csdn.net/xk_xx/article/details/126848701