• 聊聊 Libuv 最近引入的 io_uring


    io_uring 是 Linux 下高性能的异步 IO 框架,网上很多相关资料,我之前也初步分析了一下它的实现,有兴趣的可以查看 https://zhuanlan.zhihu.com/p/387620810。

    Libuv 中最近加入了对 io_uring 的支持,那么为什么要把它引入 Libuv 呢?因为 epoll 不支持普通文件的 Poll 能力,所以在 Libuv 中,异步文件 IO 操作需要通过线程池来实现,具体来说就是当用户发起一个异步文件 IO 操作时,Libuv 会把这个操作放到线程池中,当子线程处理这个任务时,会执行一个阻塞式的系统调用,这个系统调用会引起线程阻塞,从而导致这个线程被消耗掉了,当 IO 操作完成后,子线程就会被唤醒,子线程再通过主线程去执行用户的回调。在 Libuv 早期的实现中,如果执行比较慢的任务过多就会把线程池中的线程消耗完,从而导致执行比较快的 IO 操作需要等待很长时间,一个例子就是 DNS 解析会阻塞文件 IO 任务。而 io_uring 可以支持普通文件 IO(当然能力不仅于此),不再需要借助线程池的能力,目前 Libuv 中部分异步文件 IO 操作已经替换成 io_uring(需要通过环境变量开启),下面来看看它的实现。

    原生 io_uring 的使用比较复杂,通常需要借助 liburing 库,但是 Libuv 中可能为了减少对第三方库的依赖,实现上使用原生的方式。

    io_uring 初始化

    在 Libuv 初始化时会进行 io_uring 的初始化。

    uv__iou_init(loop->backend_fd, &lfields->iou, 64, UV__IORING_SETUP_SQPOLL);
    
    • 1

    lfields->iou 为 io_uring 核心结构体,UV__IORING_SETUP_SQPOLL 设置内核创建线程轮询是否有任务需要处理(用户层设置),接着看看 uv__iou_init。

    static void uv__iou_init(int epollfd,
                             struct uv__iou* iou,
                             uint32_t entries,
                             uint32_t flags) {
      struct uv__io_uring_params params;
      struct epoll_event e;
      size_t cqlen;
      size_t sqlen;
      size_t maxlen;
      size_t sqelen;
      uint32_t i;
      char* sq;
      char* sqe;
      int ringfd;
      
      memset(&params, 0, sizeof(params));
      params.flags = flags;
      // UV__IORING_SETUP_SQPOLL 模式下,设置多久没有任务提交则内核线程进入 sleep 状态
      if (flags & UV__IORING_SETUP_SQPOLL)
        params.sq_thread_idle = 10;  /* milliseconds */
      // 调用系统调用初始化 io_uring
      ringfd = uv__io_uring_setup(entries, &params);
      // 映射到内核发送 / 完成队列的内存,用户层和内核可以共同操作这个队列
      sq = mmap(0,
                maxlen,
                PROT_READ | PROT_WRITE,
                MAP_SHARED | MAP_POPULATE,
                ringfd,
                0);  /* IORING_OFF_SQ_RING */
    
      sqe = mmap(0,
                 sqelen,
                 PROT_READ | PROT_WRITE,
                 MAP_SHARED | MAP_POPULATE,
                 ringfd,
                 0x10000000ull);  /* IORING_OFF_SQES */
      
      memset(&e, 0, sizeof(e));
      e.events = POLLIN;
      e.data.fd = ringfd;
      // 注册等待可读事件,io_uring 中有任务完成后就会通过 epoll
      epoll_ctl(epollfd, EPOLL_CTL_ADD, ringfd, &e);
        
      // 初始化 io_uring 结构体
      iou->sqhead = (uint32_t*) (sq + params.sq_off.head);
      iou->sqtail = (uint32_t*) (sq + params.sq_off.tail);
      iou->sqmask = *(uint32_t*) (sq + params.sq_off.ring_mask);
      iou->sqarray = (uint32_t*) (sq + params.sq_off.array);
      iou->sqflags = (uint32_t*) (sq + params.sq_off.flags);
      iou->cqhead = (uint32_t*) (sq + params.cq_off.head);
      iou->cqtail = (uint32_t*) (sq + params.cq_off.tail);
      iou->cqmask = *(uint32_t*) (sq + params.cq_off.ring_mask);
      iou->sq = sq;
      iou->cqe = sq + params.cq_off.cqes;
      iou->sqe = sqe;
      iou->sqlen = sqlen;
      iou->cqlen = cqlen;
      iou->maxlen = maxlen;
      iou->sqelen = sqelen;
      iou->ringfd = ringfd;
      iou->in_flight = 0;
      iou->flags = 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    uv__iou_init 完成了 io_uring 的初始化,并且把 io_uring 对应的 fd 注册到 epoll,当 io_uring 有任务完成时,就可以通过 epoll 感知到。接着就可以使用 io_uring 了。

    提交异步任务

    下面看一个异步文件 IO 的操作。

    int uv_fs_open(uv_loop_t* loop,
                   uv_fs_t* req,
                   const char* path,
                   int flags,
                   int mode,
                   uv_fs_cb cb) {
      INIT(OPEN);
      PATH;
      req->flags = flags;
      req->mode = mode;
      if (cb != NULL)
        if (uv__iou_fs_open(loop, req))
          return 0;
      POST;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    uv_fs_open 可以以异步的方式打开一个文件,之前时通过线程池实现的,加入 io_uring 后,就会多了一层拦截,来看看 uv__iou_fs_open。

    int uv__iou_fs_open(uv_loop_t* loop, uv_fs_t* req) {
      struct uv__io_uring_sqe* sqe;
      struct uv__iou* iou;
      // 获取 io_uring 结构体
      iou = &uv__get_internal_fields(loop)->iou;
      // 获取一个任务节点,任务节点会和 req 互相关联,回调时会用到
      sqe = uv__iou_get_sqe(iou, loop, req);
      // 设置操作上下文
      sqe->addr = (uintptr_t) req->path;
      sqe->fd = AT_FDCWD;
      sqe->len = req->mode;
      // 设置操作类型
      sqe->opcode = UV__IORING_OP_OPENAT;
      sqe->open_flags = req->flags | O_CLOEXEC;
      // 提交任务
      uv__iou_submit(iou);
    
      return 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    uv__iou_fs_open 中有两个核心逻辑 uv__iou_get_sqe 和 uv__iou_submit,首先来看 uv__iou_get_sqe。

    static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou,
                                                    uv_loop_t* loop,
                                                    uv_fs_t* req) {
      struct uv__io_uring_sqe* sqe;
      uint32_t head;
      uint32_t tail;
      uint32_t mask;
      uint32_t slot;
    
      if (iou->ringfd == -1)
        return NULL;
    
      head = atomic_load_explicit((_Atomic uint32_t*) iou->sqhead,
                                  memory_order_acquire);
      tail = *iou->sqtail;
      mask = iou->sqmask;
    
      slot = tail & mask;
      sqe = iou->sqe;
      // 从请求队列中获取一个节点
      sqe = &sqe[slot];
      memset(sqe, 0, sizeof(*sqe));
      // 任务节点关联到 req,回调时需要使用
      sqe->user_data = (uintptr_t) req;
    
      req->work_req.loop = loop;
      req->work_req.work = NULL;
      req->work_req.done = NULL;
      uv__queue_init(&req->work_req.wq);
    
      uv__req_register(loop, req);
      iou->in_flight++;
    
      return sqe;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    uv__iou_get_sqe 主要是从任务队列中获取一个空闲节点并关联上请求上下文结构体,uv__iou_get_sqe 的调用方需要设置操作上下文,比如操作类型,操作的 fd 等。通过 uv__iou_get_sqe 获取任务节点并设置了操作上下文后,这个任务就会自动被操作系统感知。因为 Libuv 是使用了 UV__IORING_SETUP_SQPOLL 模式,所以还需要判断这时候内核轮训线程是否处于睡眠状态,这就是 uv__iou_submit 的逻辑。

    static void uv__iou_submit(struct uv__iou* iou) {
      uint32_t flags;
    
      atomic_store_explicit((_Atomic uint32_t*) iou->sqtail,
                            *iou->sqtail + 1,
                            memory_order_release);
    
      flags = atomic_load_explicit((_Atomic uint32_t*) iou->sqflags,
                                   memory_order_acquire);
      // 判断内核线程是否处于睡眠状态
      if (flags & UV__IORING_SQ_NEED_WAKEUP)
        // 唤醒内核线程,说明有任务需要处理
        if (uv__io_uring_enter(iou->ringfd, 0, 0, UV__IORING_ENTER_SQ_WAKEUP))
          if (errno != EOWNERDEAD)  /* Kernel bug. Harmless, ignore. */
            perror("libuv: io_uring_enter(wakeup)");  /* Can't happen. */
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这样就完成了任务的提交。

    任务完成

    任务完成后,io_uring 对应的 fd 就会变成可读,从而 epoll 就会感知到,来看看 epoll 的处理。下面是 epoll 处理就绪 fd 时的一段逻辑。

    if(fd == iou->ringfd) {
      uv__poll_io_uring(loop, iou);
      have_iou_events = 1;
      continue;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    如果是 io_uring 的 fd 可读,则执行 uv__poll_io_uring。

    static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) {
      struct uv__io_uring_cqe* cqe;
      struct uv__io_uring_cqe* e;
      uv_fs_t* req;
      uint32_t head;
      uint32_t tail;
      uint32_t mask;
      uint32_t i;
      uint32_t flags;
      int nevents;
      int rc;
      // 完成队列头/尾节点
      head = *iou->cqhead;
      tail = atomic_load_explicit((_Atomic uint32_t*) iou->cqtail,
                                  memory_order_acquire);
      mask = iou->cqmask;
      cqe = iou->cqe;
      nevents = 0;
      // 遍历完成队列
      for (i = head; i != tail; i++) {
        e = &cqe[i & mask];
        // 拿到操作关联的请求结构体
        req = (uv_fs_t*) (uintptr_t) e->user_data;
        uv__req_unregister(loop, req);
        iou->in_flight--;
    	// 操作返回值,表示操作是否成功
        req->result = e->res;
    	// 执行回调
        req->cb(req);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    uv__poll_io_uring 的逻辑很简单,就是遍历完成队列,然后拿到对应的请求上下文结构体,最后执行它的回调。

    现代软件中大多数使用的 IO 模型是 epoll,随着 io_uring 的发展和成熟,io_uring 将会出现在更多的软件中,之前我也体验了一下 io_uring,有兴趣的可以体验下 https://github.com/theanarkh/nodejs_io_uring。

  • 相关阅读:
    浅析Spring事务实现原理
    【springboot】你了解@Autowired 和 @Resource吗?@Autowired 和 @Resource深入分析
    4.物联网射频识别,RFID开发【智能门禁项目】
    STM32时钟系统配置程序源码深入分析
    用爬虫保存文章到TXT文件丨Python爬虫实战系列(7)
    Go语言Web开发入门指南
    大厂10年经验,我对Java高并发问题方案的总结,堪称教科书级
    什么是正向代理和反向代理
    【纯手工打造】时间戳转换工具(python)
    【ROS2原理19】定时器模型
  • 原文地址:https://blog.csdn.net/THEANARKH/article/details/133935102