在内核将数据准备好之前,系统调用会一直等待,所有的套接字默认都是阻塞方式。
如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询,不过这对CPU来说浪费较大,一般只有特定的场景下才使用。
内核将数据准备好的时候,使用SIGIO信号通知程序进行IO操作。
IO多路转接能够同时等待多个文件描述符的就绪状态。
由内核在数据拷贝完成时,通知应用程序(信号驱动时告诉应用程序何时可以开始拷贝数据,这里的异步IO是内核已经完成拷贝)。
同步和异步关注的是消息通知机制。
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态
任何IO过程中,都包含两个步骤,第一是等待,第二是拷贝。实际应用中,等待消耗的时间往往都远远高于拷贝的时间,让IO更高效,最核心的办法就是让等待的时间尽量少。
非阻塞IO,纪录锁,系统V流机制,I/O多路转接(也叫I/O多路复用),readv和writev函数以及存储映射IO(mmap),这些统称为高级IO.
设定非阻塞
对一个文件描述符进行指定命令式的操作,如果失败返回-1。
轮询方式读取标准输入(非阻塞)
#include
#include
#include
#include
// 设置非阻塞接口
bool SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); // 在底层获取当前fd对应的文件读写标志位,放在fl
if (fl < 0)
return false;
fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 设置选项,将fl设置为非阻塞,这个非阻塞属性是新增的
return true;
}
int main()
{
// 标准输入 0 默认是阻塞的
SetNonBlock(0); // 只要设置一次,后续都是非阻塞
char buffer[1024];
while (true)
{
sleep(1);
errno = 0;
// 非阻塞的时候,我们是以出错的形式返回,告知上层数据没有就绪
// a. 如何甄别是真的出错了
// b. 还是数据仅仅没有就绪呢
// 数据有的话,正常读取就行
ssize_t s = read(0, buffer, sizeof(buffer) - 1); // 出错,不仅仅是错误返回值,errno变量也会被设置,表明出错原因
if (s > 0)
{
buffer[s - 1] = 0;
std::cout << "echo# " << buffer << " errno[---]: " << errno << " errstring: " << strerror(errno) << std::endl;
}
else
{
// 如果失败的errno值是11,就代表其实没错,只不过是底层数据没就绪
// std::cout << "read \"error\" " << " errno: " << errno << " errstring: " << strerror(errno) << std::endl;
if(errno == EWOULDBLOCK || errno == EAGAIN){
std::cout << "当前0号fd数据没有就绪, 请下一次再来试试吧" <<std::endl;
continue;
}
else if(errno == EINTR){
std::cout << "当前IO可能被中断, 再试一试吧" << std::endl;
continue;
}
else{
//进行差错处理
}
sleep(1);
}
}
return 0;
}
系统提供select函数来实现多路复用输入/输出模型
recv/recvfrom/read
等进行数据读取struct timeval *timeout
的意义:本质是一个位图结构
以readfds
为例:
输入时:用户 -> 内核,比特位的位置:文件描述符值,比特位内容:是否关心
00001010 则表示关心2号比特位和4号比特位的文件描述符的读事件,不关心其他比特位的文件描述符的读事件。
输出时:内核 -> 用户,比特位的位置:文件描述符值,比特位内容:是否就绪
0000 1000 则表示,后续用户可以直接读取3号,而不会被阻塞。
关于timeout:用户和内核都会修改同一个位图结构,因此这个参数用一次之后,一定需要进行重新设定。
struct timeval timeout = {5, 0};
关于nfds:随着获取的sock越来越多,添加到select的sock也会越来越多,此时nfds一定是动态变化,所以要对nfds进行动态计算
rfds/writefds/exceptfds:都是输入输出型参数,输入输出不一定一致,因此每一次要对rfds进行重新添加
由于2.3点,则:
#define BITS 8
#define NUM (sizeof(fd_set)*BITS)
#define FD_NONE -1
int _fd_array[NUM];
//初始化
for(int i = 0; i < NUM; i++) _fd_array[i] = FD_NONE;
fd_set rfds;
FD_ZERO(&rfds); // 初始化 清空
int maxfd = _listensock;
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == FD_NONE)
continue;
FD_SET(_fd_array[i], &rfds);
if (maxfd < _fd_array[i])
maxfd = _fd_array[i];
}
int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
void HandlerEvent(const fd_set &rfds)
{
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == FD_NONE)
continue;
if (FD_ISSET(_fd_array[i], &rfds))
{
// 指定的fd,读事件就绪
// 读事件就绪:连接事件到来,accept
if (_fd_array[i] == _listensock) Accepter();
else Recver(i);
}
}
}
server.hpp
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include
#include
#include
#include
#include
#include "Log.hpp"
#include "Sock.hpp"
#define BITS 8
#define NUM (sizeof(fd_set) * BITS)
#define FD_NONE -1
using namespace std;
// select 只完成读取,写入和异常不做处理 -- epoll(写完整)
class SelectServer
{
public:
SelectServer(const uint16_t &port = 8080) : _port(port)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG, "%s", "create base socket success");
for (int i = 0; i < NUM; i++)
_fd_array[i] = FD_NONE;
// 规定_fd_array[0] = _listensock;
_fd_array[0] = _listensock;
}
void Start()
{
while (true)
{
// struct timeval timeout = {0, 0};
// int sock = Sock::Accept(_listensock); //不能直接调用accept
// 加入select中,让select等
// FD_SET(_listensock, &rfds);
DebugPrint();
fd_set rfds;
FD_ZERO(&rfds); // 初始化 清空
int maxfd = _listensock;
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == FD_NONE)
continue;
FD_SET(_fd_array[i], &rfds);
if (maxfd < _fd_array[i])
maxfd = _fd_array[i];
}
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
logMessage(DEBUG, "%s", "time out...");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
default:
// 成功的
logMessage(DEBUG, "get a new link event...");
HandlerEvent(rfds);
break;
}
}
}
~SelectServer()
{
if (_listensock >= 0)
close(_listensock);
}
private:
void HandlerEvent(const fd_set &rfds)
{
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == FD_NONE)
continue;
if (FD_ISSET(_fd_array[i], &rfds))
{
// 指定的fd,读事件就绪
// 读事件就绪:连接事件到来,accept
if (_fd_array[i] == _listensock) Accepter();
else Recver(i);
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的读事件就绪了,表示可以读取了
// 获取新连接
int sock = Sock::Accept(_listensock, &clientip, &clientport);
if (sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
int pos = 1;
for (; pos < NUM; pos++){
if (_fd_array[pos] == FD_NONE)
break;
}
if (pos == NUM){
logMessage(WARNING, "%s:%d", "select server already full, close: %d", sock);
close(sock);
}else{
_fd_array[pos] = sock;
}
}
void Recver(int pos){
// 读事件就绪:INPUT事件到来,recv,read
logMessage(DEBUG, "messsage in, get IO event: %d", _fd_array[pos]);
// 先不考虑阻塞
char buffer[1024];
int n = recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);
if(n > 0){
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);
}
else if(n == 0){
logMessage(DEBUG, "cilent[%d] quit, me too...", _fd_array[pos]);
// 1. 不让select关心当前的fd了
close(_fd_array[pos]);
_fd_array[pos] = FD_NONE;
}else{
logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
// 1. 不让select关心当前的fd了
close(_fd_array[pos]);
_fd_array[pos] = FD_NONE;
}
}
void DebugPrint()
{
cout << "_fd_array[]: ";
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == FD_NONE)
continue;
cout << _fd_array[i] << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
int _fd_array[NUM];
};
#endif
通过编写代码可知select的优缺点:
优点:
缺点:
sizeof(fd_set)
的值,字节长度乘以8比特则为支持的最大文件描述数//pollServer.hpp
#ifndef __POLL_SVR_H__
#define __POLL_SVR_H__
#include
#include
#include
#include
#include
#include
#include "Log.hpp"
#include "Sock.hpp"
#define FD_NONE -1
using namespace std;
// select 只完成读取,写入和异常不做处理 -- epoll(写完整)
class PollServer
{
public:
static const int nfds = 100;
public:
PollServer(const uint16_t &port = 8080) : _port(port), _nfds(nfds)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG, "%s", "create base socket success");
_fds = new struct pollfd[_nfds];
for(int i = 0; i < _nfds; i++){
_fds[i].fd = FD_NONE;
_fds[i].events = _fds[i].revents = 0;
}
_fds[0].fd = _listensock;
_fds[0].events = POLLIN;
_timeout = 1000;
}
void Start()
{
while (true)
{
int n = poll(_fds, _nfds, _timeout);
switch (n)
{
case 0:
logMessage(DEBUG, "%s", "time out...");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
default:
// 成功的
HandlerEvent();
break;
}
}
}
~PollServer()
{
if (_listensock >= 0)
close(_listensock);
if (_fds) delete [] _fds;
}
private:
void HandlerEvent()
{
for (int i = 0; i < _nfds; i++)
{
if (_fds[i].fd == FD_NONE)
continue;
if (_fds[i].revents & POLLIN)
{
// 指定的fd,读事件就绪
// 读事件就绪:连接事件到来,accept
if (_fds[i].fd == _listensock) Accepter();
else Recver(i);
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的读事件就绪了,表示可以读取了
// 获取新连接
int sock = Sock::Accept(_listensock, &clientip, &clientport);
if (sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
int pos = 1;
for (; pos < _nfds; pos++){
if (_fds[pos].fd == FD_NONE)
break;
}
if (pos == _nfds){
// 对struct pollfd进行自动扩容
logMessage(WARNING, "%s:%d", "select server already full, close: %d", sock);
close(sock);
}else{
_fds[pos].fd = sock;
_fds[pos].events = POLLIN;
}
}
void Recver(int pos){
// 读事件就绪:INPUT事件到来,recv,read
logMessage(DEBUG, "messsage in, get IO event: %d", _fds[pos]);
// 先不考虑阻塞
char buffer[1024];
int n = recv(_fds[pos].fd, buffer, sizeof(buffer)-1, 0);
if(n > 0){
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fds[pos].fd, buffer);
}
else if(n == 0){
logMessage(DEBUG, "cilent[%d] quit, me too...", _fds[pos].fd);
// 1. 不让select关心当前的fd了
close(_fds[pos].fd);
_fds[pos].fd = FD_NONE;
_fds[pos].events = 0;
}else{
logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno));
// 1. 不让select关心当前的fd了
close(_fds[pos].fd);
_fds[pos].fd = FD_NONE;
_fds[pos].events = 0;
}
}
void DebugPrint()
{
cout << "_fd_array[]: ";
for (int i = 0; i < _nfds; i++)
{
if (_fds[i].fd == FD_NONE)
continue;
cout << _fds[i].fd << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
struct pollfd *_fds;
int _nfds;
int _timeout;
};
#endif
优点:
缺点:
创建一个epoll句柄,用完之后必须调用close()关闭
对创建的epoll模型进行相关操作
不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是先在这里注册要监听的事件类型
op有三个取值:
struct epoll_event的结构:
其中events可以是以下几个宏的集合:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里.
在特定的epoll当中获取已经就绪的事件,
关于epoll_wait的返回值:
设epoll_wait的返回值为n,每一次返回的n为就绪的事件的数量,并且存储在*events中,从0下标开始,按顺序存储。因此每次读 0 ~ (n-1) 即可读完就绪的事件,不会造成其他的浪费。
接口使用方便,虽然拆分成了三个函数,但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离
数据拷贝轻量,只在合适的时候调用EPOLL_CTL_ADD
将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/poll)是每次循环都有进行拷贝
事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度O(1),即时文件描述符数目很多,效率也不会受到影响
可监听文件描述符数目无上限
OS如何知道,网卡里面有数据,或者键盘有用户输入?以从网卡读入为例,下面是模拟示意图
用户告诉内核需要维护的fd已经相关事件,内核会维护一个红黑树用来存储。同时,会创造一个就绪队列。
在系统中,每一个epoll模型对应的都是一个eventpoll结构。
注:
由于epoll中关心的文件描述符都必须是合法的文件描述符,因此当客户端断开连接时,首先应该在epoll中移除对该sock文件描述符的关心,再close该文件描述符,否则会出错。
为了保证每一回合的正确读取,每一个socket都要有自己的缓冲区。
epoll有两种工作方式,水平触发(LT)和边缘触发(ET)。select,poll,epoll的默认模式都是LT模式。
1. LT(Level Triggered,水平触发)
工作模式 在LT模式下,当epoll_wait()检测到描述符上有事件发生时,会重复通知应用程序,直到应用程序处理完所有的事件并将相应的文件描述符设置为非阻塞状态后,epoll_wait()才会返回。这意味着,如果有一个文件描述符上有多个事件发生,但应用程序没有一次性处理完所有的事件,那么epoll_wait()将继续通知应用程序该文件描述符上尚未处理的事件。
LT模式适用于处理普通的I/O事件,即不需要立即响应的事件。使用LT模式,应用程序可以在任何时候处理事件,而不必担心错过任何事件。
2. ET(Edge Triggered,边缘触发)工作模式
在ET模式下,当epoll_wait()检测到描述符上有事件发生时,只会通知应用程序一次,直到应用程序处理完该描述符上所有待处理事件之后,才会再次通知应用程序有新的事件发生。在ET模式下,应用程序需要立即响应事件,否则将会错过事件。
ET模式适用于处理高速、高流量的I/O事件,即需要尽可能快地响应事件。使用ET模式,应用程序可以尽可能地多次处理事件,从而提高效率。在ET模式下,应用程序需要对每个文件描述符上的所有事件进行处理,否则将会错过事件。
ET模式更高效:
select
优点:
缺点:
poll
优点:
缺点:
epoll
优点:
缺点:
综上所述,epoll相较于select和poll,在性能和扩展性方面具有明显优势,但在跨平台和编程复杂性方面存在一些限制。因此,在选择使用哪种多路复用机制时,需要根据具体的应用场景和需求进行权衡和选择。