• 彻底搞懂nodejs事件循环


    nodejs是单线程执行的,同时它又是基于事件驱动的非阻塞IO编程模型。这就使得我们不用等待异步操作结果返回,就可以继续往下执行代码。当异步事件触发之后,就会通知主线程,主线程执行相应事件的回调。

    以上是众所周知的内容。今天我们从源码入手,分析一下nodejs的事件循环机制

    nodejs架构

    首先,我们先看下nodejs架构,下图所示:

    如上图所示,nodejs自上而下分为

    • 用户代码 ( js 代码 )

    用户代码即我们编写的应用程序代码、npm包、nodejs内置的js模块等,我们日常工作中的大部分时间都是编写这个层面的代码。

    • binding代码或者三方插件(js 或 C/C++ 代码)

    胶水代码,能够让js调用C/C++的代码。可以将其理解为一个桥,桥这头是js,桥那头是C/C++,通过这个桥可以让js调用C/C++。
    在nodejs里,胶水代码的主要作用是把nodejs底层实现的C/C++库暴露给js环境。
    三方插件是我们自己实现的C/C++库,同时需要我们自己实现胶水代码,将js和C/C++进行桥接。

    • 底层库

    nodejs的依赖库,包括大名鼎鼎的V8、libuv。
    V8: 我们都知道,是google开发的一套高效javascript运行时,nodejs能够高效执行 js 代码的很大原因主要在它。
    libuv:是用C语言实现的一套异步功能库,nodejs高效的异步编程模型很大程度上归功于libuv的实现,而libuv则是我们今天重点要分析的。
    还有一些其他的依赖库
    http-parser:负责解析http响应
    openssl:加解密
    c-ares:dns解析
    npm:nodejs包管理器

    关于nodejs不再过多介绍,大家可以自行查阅学习,接下来我们重点要分析的就是libuv。

    libuv 架构

    我们知道,nodejs实现异步机制的核心便是libuv,libuv承担着nodejs与文件、网络等异步任务的沟通桥梁,下面这张图让我们对libuv有个大概的印象:

    这是libuv官网的一张图,很明显,nodejs的网络I/O、文件I/O、DNS操作、还有一些用户代码都是在 libuv 工作的。
    既然谈到了异步,那么我们首先归纳下nodejs里的异步事件:

    • 非I/O:
      • 定时器(setTimeout,setInterval)
      • microtask(promise)
      • process.nextTick
      • setImmediate
      • DNS.lookup
    • I/O:
      • 网络I/O
      • 文件I/O
      • 一些DNS操作

    参考nodejs进阶视频讲解:进入学习

    网络I/O

    对于网络I/O,各个平台的实现机制不一样,linux 是 epoll 模型,类 unix 是 kquene 、windows 下是高效的 IOCP 完成端口、SunOs 是 event ports,libuv 对这几种网络I/O模型进行了封装。

    文件I/O、异步DNS操作

    libuv内部还维护着一个默认4个线程的线程池,这些线程负责执行文件I/O操作、DNS操作、用户异步代码。当 js 层传递给 libuv 一个操作任务时,libuv 会把这个任务加到队列中。之后分两种情况:

    • 1、线程池中的线程都被占用的时候,队列中任务就要进行排队等待空闲线程。
    • 2、线程池中有可用线程时,从队列中取出这个任务执行,执行完毕后,线程归还到线程池,等待下个任务。同时以事件的方式通知event-loop,event-loop接收到事件执行该事件注册的回调函数。

    当然,如果觉得4个线程不够用,可以在nodejs启动时,设置环境变量UV_THREADPOOL_SIZE来调整,出于系统性能考虑,libuv 规定可设置线程数不能超过128个。

    nodejs源码

    先简要介绍下nodejs的启动过程:

    • 1、调用platformInit方法 ,初始化 nodejs 的运行环境。
    • 2、调用 performance_node_start 方法,对 nodejs 进行性能统计。
    • 3、openssl设置的判断。
    • 4、调用v8_platform.Initialize,初始化 libuv 线程池。
    • 5、调用 V8::Initialize,初始化 V8 环境。
    • 6、创建一个nodejs运行实例。
    • 7、启动上一步创建好的实例。
    • 8、开始执行js文件,同步代码执行完毕后,进入事件循环。
    • 9、在没有任何可监听的事件时,销毁 nodejs 实例,程序执行完毕。

    以上就是 nodejs 执行一个js文件的全过程。接下来着重介绍第八个步骤,事件循环。

    我们看几处关键源码:

    • 1、core.c,事件循环运行的核心文件。
    int uv_run(uv_loop_t* loop, uv_run_mode mode) {
      int timeout;
      int r;
      int ran_pending;
    //判断事件循环是否存活。
      r = uv__loop_alive(loop);
      //如果没有存活,更新时间戳
      if (!r)
        uv__update_time(loop);
    //如果事件循环存活,并且事件循环没有停止。
      while (r != 0 && loop->stop_flag == 0) {
        //更新当前时间戳
        uv__update_time(loop);
        //执行 timers 队列
        uv__run_timers(loop);
        //执行由于上个循环未执行完,并被延迟到这个循环的I/O 回调。
        ran_pending = uv__run_pending(loop); 
        //内部调用,用户不care,忽略
        uv__run_idle(loop); 
        //内部调用,用户不care,忽略
        uv__run_prepare(loop); 
    
        timeout = 0; 
        if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
        //计算距离下一个timer到来的时间差。
          timeout = uv_backend_timeout(loop);
       //进入 轮询 阶段,该阶段轮询I/O事件,有则执行,无则阻塞,直到超出timeout的时间。
        uv__io_poll(loop, timeout);
        //进入check阶段,主要执行 setImmediate 回调。
        uv__run_check(loop);
        //进行close阶段,主要执行 **关闭** 事件
        uv__run_closing_handles(loop);
    
        if (mode == UV_RUN_ONCE) {
    
          //更新当前时间戳
          uv__update_time(loop);
          //再次执行timers回调。
          uv__run_timers(loop);
        }
        //判断当前事件循环是否存活。
        r = uv__loop_alive(loop); 
        if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
          break;
      }
    
      /* The if statement lets gcc compile it to a conditional store. Avoids   * dirtying a cache line.   */
      if (loop->stop_flag != 0)
        loop->stop_flag = 0;
    
      return r;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 2、timers 阶段,源码文件:timers.c
    void uv__run_timers(uv_loop_t* loop) {
      struct heap_node* heap_node;
      uv_timer_t* handle;
    
      for (;;) {
      //取出定时器堆中超时时间最近的定时器句柄
        heap_node = heap_min((struct heap*) &loop->timer_heap);
        if (heap_node == NULL)
          break;
    
        handle = container_of(heap_node, uv_timer_t, heap_node);
        // 判断最近的一个定时器句柄的超时时间是否大于当前时间,如果大于当前时间,说明还未超时,跳出循环。
        if (handle->timeout > loop->time)
          break;
        // 停止最近的定时器句柄
        uv_timer_stop(handle);
        // 判断定时器句柄类型是否是repeat类型,如果是,重新创建一个定时器句柄。
        uv_timer_again(handle);
        //执行定时器句柄绑定的回调函数
        handle->timer_cb(handle);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 3、 轮询阶段 源码,源码文件:kquene.c
    void uv__io_poll(uv_loop_t* loop, int timeout) {
      /*一连串的变量初始化*/
      //判断是否有事件发生    
      if (loop->nfds == 0) {
        //判断观察者队列是否为空,如果为空,则返回
        assert(QUEUE_EMPTY(&loop->watcher_queue));
        return;
      }
    
      nevents = 0;
      // 观察者队列不为空
      while (!QUEUE_EMPTY(&loop->watcher_queue)) {
        /*    取出队列头的观察者对象    取出观察者对象感兴趣的事件并监听。    */
        ....省略一些代码
        w->events = w->pevents;
      }
    
    
      assert(timeout >= -1);
      //如果有超时时间,将当前时间赋给base变量
      base = loop->time;
      // 本轮执行监听事件的最大数量
      count = 48; /* Benchmarks suggest this gives the best throughput. */
      //进入监听循环
      for (;; nevents = 0) {
      // 有超时时间的话,初始化spec
        if (timeout != -1) {
          spec.tv_sec = timeout / 1000;
          spec.tv_nsec = (timeout % 1000) * 1000000;
        }
    
        if (pset != NULL)
          pthread_sigmask(SIG_BLOCK, pset, NULL);
        // 监听内核事件,当有事件到来时,即返回事件的数量。
        // timeout 为监听的超时时间,超时时间一到即返回。
        // 我们知道,timeout是传进来得下一个timers到来的时间差,所以,在timeout时间内,event-loop会一直阻塞在此处,直到超时时间到来或者有内核事件触发。
        nfds = kevent(loop->backend_fd,
                      events,
                      nevents,
                      events,
                      ARRAY_SIZE(events),
                      timeout == -1 ? NULL : &spec);
    
        if (pset != NULL)
          pthread_sigmask(SIG_UNBLOCK, pset, NULL);
    
        /* Update loop->time unconditionally. It's tempting to skip the update when     * timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the     * operating system didn't reschedule our process while in the syscall.     */
        SAVE_ERRNO(uv__update_time(loop));
        //如果内核没有监听到可用事件,且本次监听有超时时间,则返回。
        if (nfds == 0) {
          assert(timeout != -1);
          return;
        }
    
        if (nfds == -1) {
          if (errno != EINTR)
            abort();
    
          if (timeout == 0)
            return;
    
          if (timeout == -1)
            continue;
    
          /* Interrupted by a signal. Update timeout and poll again. */
          goto update_timeout;
        }
    
        。。。
        //判断事件循环的观察者队列是否为空
        assert(loop->watchers != NULL);
        loop->watchers[loop->nwatchers] = (void*) events;
        loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
        // 循环处理内核返回的事件,执行事件绑定的回调函数
        for (i = 0; i < nfds; i++) {
            。。。。
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    uv__io_poll阶段源码最长,逻辑最为复杂,可以做个概括,如下:
    当js层代码注册的事件回调都没有返回的时候,事件循环会阻塞在poll阶段。看到这里,你可能会想了,会永远阻塞在此处吗?

    1、首先呢,在poll阶段执行的时候,会传入一个timeout超时时间,该超时时间就是poll阶段的最大阻塞时间。
    2、其次呢,在poll阶段,timeout时间未到的时候,如果有事件返回,就执行该事件注册的回调函数。timeout超时时间到了,则退出poll阶段,执行下一个阶段。

    所以,我们不用担心事件循环会永远阻塞在poll阶段。

    以上就是事件循环的两个核心阶段。限于篇幅,timers阶段的其他源码和setImmediateprocess.nextTick的涉及到的源码就不罗列了,感兴趣的童鞋可以看下源码。

    最后,总结出事件循环的原理如下,以上你可以不care,记住下面的总结就好了。

    事件循环原理
    • node 的初始化
      • 初始化 node 环境。
      • 执行输入代码。
      • 执行 process.nextTick 回调。
      • 执行 microtasks。
    • 进入 event-loop
      • 进入 timers 阶段
        • 检查 timer 队列是否有到期的 timer 回调,如果有,将到期的 timer 回调按照 timerId 升序执行。
        • 检查是否有 process.nextTick 任务,如果有,全部执行。
        • 检查是否有microtask,如果有,全部执行。
        • 退出该阶段。
      • 进入IO callbacks阶段。
        • 检查是否有 pending 的 I/O 回调。如果有,执行回调。如果没有,退出该阶段。
        • 检查是否有 process.nextTick 任务,如果有,全部执行。
        • 检查是否有microtask,如果有,全部执行。
        • 退出该阶段。
      • 进入 idle,prepare 阶段:
        • 这两个阶段与我们编程关系不大,暂且按下不表。
      • 进入 poll 阶段
        • 首先检查是否存在尚未完成的回调,如果存在,那么分两种情况。
          • 第一种情况:
            • 如果有可用回调(可用回调包含到期的定时器还有一些IO事件等),执行所有可用回调。
            • 检查是否有 process.nextTick 回调,如果有,全部执行。
            • 检查是否有 microtaks,如果有,全部执行。
            • 退出该阶段。
          • 第二种情况:
            • 如果没有可用回调。
            • 检查是否有 immediate 回调,如果有,退出 poll 阶段。如果没有,阻塞在此阶段,等待新的事件通知。
        • 如果不存在尚未完成的回调,退出poll阶段。
      • 进入 check 阶段。
        • 如果有immediate回调,则执行所有immediate回调。
        • 检查是否有 process.nextTick 回调,如果有,全部执行。
        • 检查是否有 microtaks,如果有,全部执行。
        • 退出 check 阶段
      • 进入 closing 阶段。
        • 如果有immediate回调,则执行所有immediate回调。
        • 检查是否有 process.nextTick 回调,如果有,全部执行。
        • 检查是否有 microtaks,如果有,全部执行。
        • 退出 closing 阶段
      • 检查是否有活跃的 handles(定时器、IO等事件句柄)。
        • 如果有,继续下一轮循环。
        • 如果没有,结束事件循环,退出程序。

    细心的童鞋可以发现,在事件循环的每一个子阶段退出之前都会按顺序执行如下过程:

    • 检查是否有 process.nextTick 回调,如果有,全部执行。
    • 检查是否有 microtaks,如果有,全部执行。
    • 退出当前阶段。

    记住这个规律哦。

  • 相关阅读:
    Hash 表
    10.网络编程套接字Socket
    Go Mac配置Air热加载
    LeetCode2562
    雷电9模拟器抓包
    CAD对齐的两种方式、AUTOCAD——修改坐标轴方向
    [MAUI]模仿微信“按住-说话”的交互实现
    手把手教你用 Jenkins 自动部署 SpringBoot
    ansible问题排查
    软考软件设计师基础知识—法律法规知识
  • 原文地址:https://blog.csdn.net/m0_67614517/article/details/127865062