• Node.js C++ 层的任务管理


    我们都知道 Node.js 是基于事件循环来运行的,本质上是一个生产者 / 消费者模型,所以就少不了任务的管理机制,不过本文不是介绍事件循环中的任务管理,而是 C++ 层的任务管理。本文主要介绍 SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook 这四个 API 产生的任务。时间关系,随便写写,权当笔记。

    任务管理机制的初始化

    首先来看一下 Node.js 启动的过程中,和任务管理相关的逻辑。

    uv_check_start(immediate_check_handle(), CheckImmediate)
    uv_async_init(
    	event_loop(),
    	&task_queues_async_,
    	[](uv_async_t* async) {
    	  Environment* env = ContainerOf(&Environment::task_queues_async_, async);
    	  env->RunAndClearNativeImmediates();
    	})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    CheckImmediate 是在 check 阶段执行的函数,task_queues_async_ 则用于线程间通信,即当子线程往主线程提交任务时,通过 task_queues_async_ 通知主线程,然后主线程执行 uv_async_init 注册的回调。上面的代码就是消费者的逻辑。后面再详细分析里面的处理流程。

    提交任务

    接下来逐个看一下生产者的逻辑。

    template <typename Fn>
    void Environment::SetImmediate(Fn&& cb, CallbackFlags::Flags flags) {
      auto callback = native_immediates_.CreateCallback(std::move(cb), flags);
      native_immediates_.Push(std::move(callback));
      // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    SetImmediate 用于同线程的代码提交任务。

    template <typename Fn>
    void Environment::SetImmediateThreadsafe(Fn&& cb, CallbackFlags::Flags flags) {
      auto callback = native_immediates_threadsafe_.CreateCallback(
          std::move(cb), flags);
      {
        Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
        native_immediates_threadsafe_.Push(std::move(callback));
        if (task_queues_async_initialized_)
          uv_async_send(&task_queues_async_);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    SetImmediateThreadsafe 用于子线程给主线程提交任务,所以需要加锁。

    template <typename Fn>
    void Environment::RequestInterrupt(Fn&& cb) {
      auto callback = native_immediates_interrupts_.CreateCallback(
          std::move(cb), CallbackFlags::kRefed);
      {
        Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
        native_immediates_interrupts_.Push(std::move(callback));
        if (task_queues_async_initialized_)
          uv_async_send(&task_queues_async_);
      }
      RequestInterruptFromV8();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    RequestInterrupt 用于子线程给主线程提交代码,他和 SetImmediateThreadsafe 有一个很重要的区别是调用了 RequestInterruptFromV8。

    void Environment::RequestInterruptFromV8() {
      isolate()->RequestInterrupt([](Isolate* isolate, void* data) {
        std::unique_ptr<Environment*> env_ptr { static_cast<Environment**>(data) };
        Environment* env = *env_ptr;
        env->RunAndClearInterrupts();
      }, interrupt_data);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    RequestInterrupt 可以使得提交的代码在 JS 代码死循环时依然会被执行。接着看 AddCleanupHook。

    void Environment::AddCleanupHook(CleanupQueue::Callback fn, void* arg) {
      cleanup_queue_.Add(fn, arg);
    }
    
    • 1
    • 2
    • 3

    AddCleanupHook 用于注册线程退出前的回调。生产者的逻辑都比较简单,就是往任务队列里插入一个任务,如果是涉及到线程间的任务,则通知主线程。

    消费者

    接下来看一下消费者的逻辑,根据前面的分析可以知道,消费者有几个:CheckImmediate,task_queues_async_ 的处理函数、RequestInterrupt 注册的函数、退出前回调处理函数。先看 CheckImmediate。

    void Environment::CheckImmediate(uv_check_t* handle) {
      Environment* env = Environment::from_immediate_check_handle(handle);
      env->RunAndClearNativeImmediates();
    }
    
    void Environment::RunAndClearNativeImmediates(bool only_refed) {
      RunAndClearInterrupts();
    
      auto drain_list = [&](NativeImmediateQueue* queue) {
        while (auto head = queue->Shift()) {
            head->Call(this);
        }
        return false;
      };
      while (drain_list(&native_immediates_)) {}
      NativeImmediateQueue threadsafe_immediates;
      if (native_immediates_threadsafe_.size() > 0) {
        Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
        threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));
      }
      while (drain_list(&threadsafe_immediates)) {}
    }
    
    void Environment::RunAndClearInterrupts() {
      while (native_immediates_interrupts_.size() > 0) {
        NativeImmediateQueue queue;
        {
          Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
          queue.ConcatMove(std::move(native_immediates_interrupts_));
        }
        while (auto head = queue.Shift())
          head->Call(this);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    CheckImmediate 函数中处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。但是如果主线程阻塞在 Poll IO 阶段时,只有子线程提交任务时会唤醒主线程,具体是通过 task_queues_async_ 结构体,看一下处理函数。

    env->RunAndClearNativeImmediates();
    
    • 1

    可以看到这时候也是处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。最后来看一下处理退出前回调的函数,具体时机是 FreeEnvironment 函数中的 env->RunCleanup()。

    void Environment::RunCleanup() {
      RunAndClearNativeImmediates(true);
      while (!cleanup_queue_.empty() || principal_realm_->HasCleanupHooks() ||
             native_immediates_.size() > 0 ||
             native_immediates_threadsafe_.size() > 0 ||
             native_immediates_interrupts_.size() > 0) {
        // 见 CleanupQueue::Drain
        cleanup_queue_.Drain();
        RunAndClearNativeImmediates(true);
      }
    }
    
    // cleanup_queue_.Drain();
    void CleanupQueue::Drain() {
      std::vector<CleanupHookCallback> callbacks(cleanup_hooks_.begin(),
                                                 cleanup_hooks_.end());
      std::sort(callbacks.begin(),
                callbacks.end(),
                [](const CleanupHookCallback& a, const CleanupHookCallback& b) {
                  return a.insertion_order_counter_ > b.insertion_order_counter_;
                });
    
      for (const CleanupHookCallback& cb : callbacks) {
        cb.fn_(cb.arg_);
        cleanup_hooks_.erase(cb);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    RunCleanup 中同时处理了 SetImmediate、SetImmediateThreadsafe、 RequestInterrupt 产生的任务和注册的退出前回调。

  • 相关阅读:
    缓存与数据一致性问题
    【日拱一卒行而不辍20220918】自制操作系统
    如何用一款产品推动「品牌的惊险一跃」?
    Ribbon的随机算法,为什么能难倒这么多的微服务专家?
    2022/6/27 Quartz(定时任务)讲解+入门案例
    R语言中的函数19:openxlsx::read.xlsx(), write.xlsx(), writeData(), writeDataTable()
    Pytorch -> ONNX -> TensorRT 模型转换与部署
    TypeScript 泛型
    Flink部署模式及核心概念
    并发冲突:记一次导致流量放大的生产问题
  • 原文地址:https://blog.csdn.net/THEANARKH/article/details/128154139