内存和外设的交互叫做IO,网络IO就是将数据在内存和网卡间拷贝。
IO本质就是等待和拷贝,一般等待耗时往往远高于拷贝耗时。所以提高IO效率就是尽可能减少等待时间的比重。
IO模型 | 简单对比解释 |
---|---|
阻塞IO | 阻塞等待数据到来 |
非阻塞IO | 轮询等待数据到来 |
信号驱动 | 信号递达时再来读取或写入数据 |
多路转接 | 让大批线程等待,自身读取数据 |
异步通信 | 让其他进程或线程进行等待和读取,自身获取结果 |
执行流在某个文件描述符下读取数据时,执行流一直等待IO条件就绪后读取数据,这就是阻塞IO。
执行流会以循环的方式反复尝试读取数据,如果IO条件未就绪,执行流会直接返回继续其他任务。
可通过fcntl设置文件的状态。
非阻塞读取时,数据未就绪是以出错的形式返回的,错误码为EAGIN
或EWOULDBLOCK
,信号导致读取未成功错误码为EINTR
。
void set_nonblock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0) {
perror("fcntl failed");
return;
}
if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
perror("fcntl failed");
return;
}
}
int main() {
set_nonblock(0);
char buf[64] = {0};
while (true) {
ssize_t n = read(0, buf, sizeof(buf) - 1);
if (n > 0)
{
buf[n - 1] = 0;
std::cout << buf << std::endl;
}
else if (n == 0)
{
perror("end of file");
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK) // 非阻塞数据未就绪返回
continue;
else if (errno == EINTR) // IO被信号中断返回
continue;
else
{
perror("read error");
break;
}
}
}
return 0;
}
较为鸡肋,一般不用。
IO事件就绪时,内核通过SIGIO信号通知进程。等待的过程是异步的,但拷贝数据是同步的,所以我们认为信号驱动也是同步IO。
但信号处理是异步的,所以数据提取可能不及时。
内核提供select、poll、epoll等多路转接方案,最高可同时等待几百个文件。拷贝数据的任务仍由进程完成,等待数据的任务交给内核。
只要自身完全没有参与IO等待和拷贝就是异步通信,否则就是同步。
将缓冲区提供给异步接口,接口等待并拷贝将数据至缓冲区,最后通知进程。进程不参与IO可直接处理数据,所以是异步的。
异步IO系统提供有一些对应的系统接口,但大多使用复杂,也不建议使用。异步IO也有更好的替代方案。
IO事件就绪可分为读事件就绪和写事件就绪。
一般接收缓冲区设有高水位,高于该水位读事件就绪,发送缓冲区设有低水位,低于该水位写事件就绪。
因为频繁读写内核缓冲区需要状态切换,会附带一系列的处理工作,导致效率下降。
Linux下多路转接的方案常见的有三种:select、poll、epoll,select出现是最早的,使用也是最繁琐的。
select能够等待多个fd的IO条件是否就绪。
#include
int select(int nfds, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout);
struct timeval {
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};
参数 | 解释 |
---|---|
nfds | fd的总个数,select遍历fdset结构的范围(被等待的fd的最大值+1) |
readfds | 调用时表示需要关注的读事件,返回时表示那些事件已经就绪 |
writefds | 调用时表示需要关注的写事件,返回时表示那些事件已经就绪 |
exceptfds | 调用时表示需要关注的异常事件,返回时表示那些事件已经就绪。如对端关闭,读写异常等 |
timeout | 调用时表示本次调用阻塞等待时间,返回时表示此次返回剩余的等待时间 |
返回值 | 大于0表示就绪fd的个数,为0表示本次调用结束,–1表示出错 |
fd_set
是文件描述符的位图结构,下标表示文件描述符,比特位内容表示是否需要等待。
// fd_set操作函数
void FD_CLR (int fd, fd_set *set); // 清除
int FD_ISSET(int fd, fd_set *set); // 检测
void FD_SET (int fd, fd_set *set); // 设置
void FD_ZERO ( fd_set *set); // 置零
const int GPORT = 8080;
const int GSIZE = 10;
enum event_type {
read_event = 0x1 << 1,
write_event = 0x1 << 2,
except_event = 0x1 << 3,
};
struct fd_collection {
fd_collection() {}
fd_collection(const fd_collection& fds) {
_rfds = fds._rfds, _wfds = fds._wfds, _efds = fds._efds, _maxfd = fds._maxfd;
}
bool set(int event, int fd) {
if (_fdarr.size() >= GSIZE) return false;
if (event & read_event) _rfds.set(fd);
if (event & write_event) _wfds.set(fd);
if (event & except_event) _wfds.set(fd);
_fdarr.push_back(fd);
if (_maxfd < fd) _maxfd = fd;
return true;
}
void clear(int fd) {
_rfds.clear(fd);
_wfds.clear(fd);
_efds.clear(fd);
for (int i = 0; i < _fdarr.size(); i++)
if (_fdarr[i] == fd) _fdarr[i] = -1;
}
class file_descptrs {
public:
file_descptrs() { bzero(); }
~file_descptrs() {}
void set (int fd) { FD_SET(fd, &_set); }
void clear(int fd) { FD_CLR(fd, &_set); }
bool isset(int fd) { return FD_ISSET(fd, &_set); }
void bzero() { FD_ZERO(&_set); }
fd_set* get() { return &_set; }
private:
fd_set _set;
};
file_descptrs _rfds;
file_descptrs _wfds;
file_descptrs _efds;
std::vector<int> _fdarr;
int _maxfd = -1;
};
class select_server : public inet::tcp::server {
public:
select_server(uint16_t port) : server(port), _wouldblock(true)
{}
select_server(uint16_t port, int sec, int usec) : server(port), _timeout({sec, usec})
{}
void start() {
_fds.set(read_event, _sock);
while (true) {
int n = 0;
struct timeval timeout = _timeout;
fd_collection fds_cp(_fds);
if (_wouldblock)
n = select(fds_cp._maxfd+1, fds_cp._rfds.get(), fds_cp._wfds.get(), fds_cp._efds.get(), nullptr);
else
n = select(fds_cp._maxfd+1, fds_cp._rfds.get(), fds_cp._wfds.get(), fds_cp._efds.get(), &timeout);
switch (n) {
case 0: INFO("time out: %.2f", timeout.tv_sec + timeout.tv_usec / 1.0 / 1000);
break;
case -1: ERROR("select error, %d %s", errno, strerror(errno));
break;
default: handler_event(fds_cp);
break;
}
}
}
private:
void handler_event(fd_collection& resfds) {
for (auto fd : _fds._fdarr) {
if (fd == -1) continue;
if (resfds._rfds.isset(fd)) {
if (fd == _sock) {
acceptor();
} else {
std::string buf;
recver(fd, &buf);
}
}
if (resfds._wfds.isset(fd)) {
std::string msg = "test";
sender(fd, msg);
}
if (resfds._efds.isset(fd)) {
WARN("excepton event occurred, fd: %d", fd);
}
}
}
void acceptor() {
std::string cip;
uint16_t cport;
int sock = accept(&cip, &cport);
INFO("a connect %d has been accepted [%s:%d]", sock, cip.c_str(), cport);
// if (!_fds.set(read_event | write_event | except_event, sock))
if (!_fds.set(read_event, sock)) {
close(sock);
WARN("connect close, fd array is full");
}
}
void recver(int fd, std::string* buf) {
ssize_t s = recv(fd, buf, 1024);
if (s > 0) {
std::cout << *buf << std::endl;
}
else {
if (s == 0) INFO("client quit");
else WARN("recv error, %d %s", errno, strerror(errno));
_fds.clear(fd);
close(fd);
}
}
void sender(int fd, const std::string& msg) {
size_t s = send(fd, msg);
if (s <= 0) {
if (s == 0) INFO("client quit");
else WARN("send error, %d %s", errno, strerror(errno));
_fds.clear(fd);
close(fd);
}
}
private:
bool _wouldblock;
struct timeval _timeout;
fd_collection _fds;
};
优点 |
---|
一次等待多个fd,使IO等待时间重叠,一定程度上提高IO效率 |
缺点 |
调用前要重新设置fd集,调用后要遍历检测就绪fd,需要额外数组 |
select能够检测fd的个数上限太小 |
频繁地将用户数据拷贝到内核中 |
select内部遍历fd_set结构以检测就绪 |
poll相比select在使用和实现上都有进步。不过重点是epoll。
#include
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; /* file descriptor */
short events; /* events to look for */
short revents; /* events returned */
};
参数 | 解释 |
---|---|
timeout | 阻塞等待时间,不过采用整数单位是毫秒。 |
struct pollfd* 和nfds_t | pollfd结构体数组以及数据长度 |
struct pollfd.fd :关注的文件描述符 | |
struct pollfd.events :关注的事件类型 | |
struct pollfd.revents :就绪的事件类型 |
事件类型 | 描述 |
---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 |
POLLRDNORM | 普通数据可读 |
POLLRDBAND | 优先级带数据可读(Linux 不支持) |
POLLPRI | 高优先级数据可读,比如 TCP 带外数据 |
POLLOUT | 数据(包括普通数据和优先数据)可写 |
POLLWRNORM | 普通数据可写 |
POLLWRBAND | 优先级带数据可写 |
POLLRDHUP | TCP 连接被对方关闭,或者对方关闭了写操作,它由GNU引入 |
POLLERR | 错误 |
POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符将收到 POLLHUP 事件 |
POLLNVAL | 文件描述符没有打开 |
const int default_port = 8080;
const int default_size = 20;
const int default_timeout = -1;
const int default_fd = -1;
const short default_event = 0;
class poll_server : public inet::tcp::server {
public:
poll_server(uint16_t port) : server(port), _fds(new struct pollfd[default_size])
, _cap(0), _timeout(default_timeout) {
pollfd_arr_init();
}
void pollfd_arr_init() {
for (int i = 0; i < default_size; i++) pollfd_init(_fds[i]);
}
void pollfd_init(struct pollfd& pf) {
pf.fd = default_fd;
pf.events = default_event;
pf.revents = default_event;
}
void pollfd_clear(struct pollfd& pf) {
pf.fd = default_fd;
pf.events = default_event;
pf.revents = default_event;
}
void start() {
_fds[0].fd = _sock;
_fds[0].events = POLLIN;
++_cap;
while (true) {
int timeout = _timeout;
switch (poll(_fds.get(), _cap, timeout)) {
case 0: INFO("time out: %d", timeout); break;
case -1: ERROR("select error, %d %s", errno, strerror(errno)); break;
default: event_handler(); break;
}
}
}
private:
void event_handler() {
for (int i = 0; i < _cap; i++) {
auto& fd = _fds[i].fd;
auto& revents = _fds[i].revents;
if (revents & POLLIN) {
if (fd == _sock) {
acceptor();
} else {
std::string buf;
recver(i, &buf);
}
}
if (revents & POLLOUT) {
std::string msg = "test";
sender(i, msg);
}
if (revents & POLLERR){
WARN("excepton event occurred, fd: %d", fd);
}
}
}
void acceptor() {
std::string cip;
uint16_t cport;
int newfd = accept(&cip, &cport);
if (_cap >= default_size) {
close(newfd);
WARN("connect close, fd array is full");
return;
}
for (int i = 0; i < default_size; i++) {
if (_fds[i].fd == default_fd) {
_fds[i].fd = newfd;
_fds[i].events = POLLIN | POLLOUT;
_cap++;
break;
}
}
INFO("a connect %d has been accepted [%s:%d]", newfd, cip.c_str(), cport);
}
void recver(int i, std::string* buf) {
ssize_t s = recv(_fds[i].fd, buf, 1024);
if (s > 0) {
std::cout << *buf << std::endl;
} else {
if (s == 0) INFO("client quit");
else WARN("recv error, %d %s", errno, strerror(errno));
close(_fds[i].fd);
pollfd_clear(_fds[i]);
--_cap;
}
}
void sender(int i, const std::string& msg) {
size_t s = send(_fds[i].fd, msg);
if (s <= 0) {
if (s == 0) INFO("client quit");
else WARN("send error, %d %s", errno, strerror(errno));
close(_fds[i].fd);
pollfd_clear(_fds[i]);
--_cap;
}
}
private:
std::unique_ptr<struct pollfd[]> _fds;
int _cap;
int _timeout;
};
优点 |
---|
监视fd的个数无上限 |
将事件输入输出分离,避免原始数据被修改 |
缺点 |
返回后仍需要遍历数组检测就绪事件 |
poll内部仍需要内核自己遍历检测就绪事件 |
每次调用都要将pollfd结构从内核空间拷贝到用户空间 |
#include
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
typedef union epoll_data {
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epoll_create | 负责创建epoll模型 |
---|---|
size | 目前size被忽略,为兼容可写128/256 |
返回值 | epoll句柄 |
epoll_ctl | 负责用户告诉内核那些事件需要关注 |
epfd | epoll句柄 |
op | 指定相关操作 |
EPOLL_CTL_ADD :添加事件 | |
EPOLL_CTL_MOD :修改事件 | |
EPOLL_CTL_DEL :删除事件 | |
fd | 事件关注的文件描述符 |
epoll_event | 用来指定fd上关注的事件 |
epoll_wait | 负责内核告诉用户那些事件就绪 |
epfd | epoll句柄 |
epoll_event | 输出缓冲区,存放已就绪的事件 |
maxevents | 缓冲区的长度 |
timeout | 阻塞等待的时间 |
返回值 | 就绪事件的个数 |
events宏常量取值 | 解释 |
---|---|
EPOLLIN | 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) |
EPOLLOUT | 表示对应的文件描述符可以写 |
EPOLLPRI | 表示对应的文件描述符有紧急的数据可读(带外数据) |
EPOLLERR | 表示对应的文件描述符发生错误 |
EPOLLHUP | 表示对应的文件描述符被挂断 |
EPOLLET | 将EPOLL设为边缘触发(Edge Triggered)模式 |
EPOLLONESHOT | 只监听一次事件,本次之后自动将该fd删去 |
epoll有两种工作方式,分别是水平触发LT和边缘触发ET。
事件就绪时,可以不立刻处理或只部分处理。
只要事件处于就绪状态,每次调用epoll_wait都会通知该事件就绪,直到处理完毕处于未就绪状态。
设置事件为EPOLLET,表示对于该事件使用ET模式。
事件就绪时必须一次性处理清空数据,否则下次是不会通知该事件就绪的,直到该事件再次就绪。
数据剩余ET不会提醒,所以必须一次性读取所有数据,但如果读取时刚好无数据就会被阻塞。 所以ET必须采用非阻塞读写。
LT模式事件就绪时读取一定不会被阻塞,因为一定有数据。
一般ET的效率>=LT的效率。原因如下:
ET高IO,LT高响应。
优点 | 解释 |
---|---|
接口分离解耦 | 每次调用不需要重新设置事件集,做到输入输出事件分离 |
使用简单高效 | 调用后用户不需要遍历,内核提供就绪事件缓冲区 |
轻量数据拷贝 | 不需要频繁的进行将数据从内核和用户之间的拷贝 |
无遍历效率高 | 底层不需要遍历,利用回调将就绪事件添加到就绪队列中 |
没有数量限制 | 文件描述符数目无上限 |
一般构建响应后,直接发送数据,只有当缓冲区满的时候,再将没写完的数据交给epoll处理。
select、poll、epoll都是如此,但epoll的ET模式可以常设置写事件。