io_uring是Linux内核在v5.1引入的一套异步IO接口,和aio不同的是,它可以提供更高的性能
io_uring 具体有三个系统调用
分别是 io_uring_setup
, io_uring_enter
, io_uring_register
,我们可以通过这三个系统调用来完成异步事件提交,收割,自己处理的流程
异步io的优点在于,不用我们自己去等待 io操作的完成,我们只需要告诉内核,我们的任务,内核来帮我们完成。这样就可以让我们的进程去干其他的事情,实现更高的吞吐量。
io_uring 利用 mmap 开辟出一块空间,让用户态和内核态的程序都可以共享的一块区域
io_uring 分为 提交队列 和 完成队列
用户提交的任务放在提交队列中,由内核去处理,处理好的东西会放在完成队列中。
内核如何处理,不是用户关系的问题,内核可以回调、轮询的方式都可以进行处理。
用户只需要设置好就可以使用
由于这三个系统调用要用好并不容易
开发作者也提供了一个liburing来给我们使用
注: 内核版本最好高一点,比如 Linux 5.4 不支持 read,但支持 readv (亲身教训)
除了 io_uring的结构要了解外,我们还需了解两个用到的东西
这两个分别代表了 完成队列和提交队列的一项元素
其中的user_data可以是一个可以由我们进行diy的指针
struct io_uring_cqe {
__u64 user_data; /* sqe->data submission passed back */
__s32 res; /* result code for this event */
__u32 flags;
/*
* If the ring is initialized with IORING_SETUP_CQE32, then this field
* contains 16-bytes of padding, doubling the size of the CQE.
*/
__u64 big_cqe[];
};
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};
我们来看两个使用 liburing的简单例子
第一个例子诠释了 io_uring 最直接的一个流程
/**
* 读取文件
**/
#include
#include
#include
char buf[1024] = {0};
int main() {
int fd = open("1.txt", O_RDONLY, 0);
io_uring ring;
io_uring_queue_init(32, &ring, 0); // 初始化
auto sqe = io_uring_get_sqe(&ring); // 从环中得到一块空位
io_uring_prep_read(sqe, fd, buf, sizeof(buf), 0); // 为这块空位准备好操作
io_uring_submit(&ring); // 提交任务
io_uring_cqe* res; // 完成队列指针
io_uring_wait_cqe(&ring, &res); // 阻塞等待一项完成的任务
assert(res);
std::cout << "read bytes: " << res->res << " \n";
std::cout << buf << std::endl;
io_uring_cqe_seen(&ring, res); // 将任务移出完成队列
io_uring_queue_exit(&ring); // 退出
return 0;
}
io_uring 有三个东西
提交队列
完成队列
任务实体
提交队列和完成队列都可以看成持有一项指针
我们得到一个 任务实体,通过 io_uring_prep_read
准备任务 和 io_uring_submit
提交任务
提交任务之后就到了提交队列中去
在提交队列里面,内核操作完以后。
任务就到了完成队列中去。
然后我们可以阻塞等待 io_uring_wait_cqe
一项任务
当然,我们也可以使用非阻塞的方式,去干其他事情
在拿到这一项任务之后,我们就可以对其进行处理,处理完成记得 从完成队列中清除
(至于 完成队列和提交队列是如何高效的且不出错的并发执行 暂且不谈)
echo_server
上述 的 io_uring写着还是比较长,我们可以把它封装一下。
比如要用 read的操作,要用accpet的操作,都给他封装一下
同时,我们在写echo_server时,我们是 几个不同的操作
可能是 ACCEPT 操作,可能是 READ 操作, 可能是 WRITE操作
并且READ和WRITE操作都要有自己的缓冲区
所以,我们定义一下我们在任务之间传递的结构体
然后把它放到 user_data 中。
__u64 user_data; /* data to be passed back at completion time */
结构体如下:
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
我们对可能用到的操作进行一下封装
class IOuring {
io_uring ring;
public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }
~IOuring() { io_uring_queue_exit(&ring); }
void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }
int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }
int submit() { return io_uring_submit(&ring); }
void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}
void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};
#include
#include
#include
#include
#include
#include
const int BUFSIZE = 1024;
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
class IOuring {
io_uring ring;
public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }
~IOuring() { io_uring_queue_exit(&ring); }
void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }
int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }
int submit() { return io_uring_submit(&ring); }
void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}
void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};
int main() {
/*init socket*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in sock_addr;
sock_addr.sin_port = htons(8000);
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(sock_fd, (sockaddr*)&sock_addr, sizeof(sock_addr));
perror("");
listen(sock_fd, 10);
std::cout << "listen begin ..." << std::endl;
/*io_uring*/
IOuring ring(1024);
ring.accpet_asyn(sock_fd, new request);
ring.submit();
while (true) {
io_uring_cqe* cqe;
ring.wait(&cqe);
request* res = (request*)cqe->user_data;
switch (res->state) {
case request::ACCEPT:
if (cqe->res > 0) {
int client_fd = cqe->res;
ring.accpet_asyn(sock_fd, res);
ring.read_asyn(client_fd, new request);
ring.submit();
}
std::cout << cqe->res << std::endl;
break;
case request::READ:
if (cqe->res > 0) std::cout << res->buf << std::endl;
ring.write_asyn(res->fd, res);
ring.submit();
break;
case request::WRITE:
if (cqe->res > 0) {
close(res->fd);
delete res;
}
break;
default:
std::cout << "error " << std::endl;
break;
}
ring.seen(cqe);
}
return 0;
}
在这里,我们的程序只是简单的单线程,对于任务,我们可以将其放入工作线程中进行操作
这样我们的主线程主负责事件分发,工作线程负责处理逻辑
[amjieker]