通信的本质就是IO;
关于IO的效率问题(以读取为例):
因此,IO可以理解为等 + 数据拷贝;
低效的IO:单位时间,大部分的IO类接口其实都在等;
高效的IO:单位时间,让IO接口等的比重降低;
IO接口在缓冲区数据准备好之前,会一直阻塞,等待数据的就绪;是最普通且最常见的IO模型;

如果内核还未将数据准备好,系统调用依然会直接返回,并且返回EWOULEBLOCK错误码,表示数据还未准备好,该进程不会阻塞等待数据;
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询;这对CPU来说是较大的浪费,一般只有特定场景下才使用;

进程调用sigaction检查信号的状态,然后立即返回,当内核将数据准备好的时候,使用SIGIO信号通知进程,进程再调用IO接口进行IO操作;

IO多路转接是指IO接口能够同时等待多个文件描述符的就绪状态;

进程在调用了IO接口后,若无数据准备好,就立即返回,在内核将数据准备好之后,直接拷贝到缓冲区中,通过信号通知该进程,拷贝完毕;

如果一个进程(线程)全程参与了IO(等+拷贝),我们就称之为同步IO;
阻塞和非阻塞关注的是程序在等待调用结果(消息, 返回值)时的状态;
在设置IO接口的状态或网络套接字状态的时候,有一个NONBLOCK状态,这就是非阻塞状态;

有两种方式设置套接字为非阻塞:


由于标准输入的文件描述符是默认阻塞状态的,因此可以用它来进行实验,代码如下:
阻塞IO
#include
#include
#include
#include
#include
#include
#include
using namespace std;
int main()
{
char buffer[1024];
while(true)
{
sleep(1);
ssize_t s = read(0, buffer, sizeof(buffer) - 1);
if(s > 0)
{
buffer[s] = 0;
cout << "echo# " << buffer << " errno[---]: " << errno << " errstring: " << strerror(errno) << endl;
}
}
return 0;
}
运行结果:

当我们不从键盘输入数据的时候,进程就会一直阻塞;
非阻塞IO
#include
#include
#include
#include
#include
#include
#include
using namespace std;
//将文件描述符设置为非阻塞
bool SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); // 在底层获取当前fd对应的文件读写标志位
if(fl < 0)
{
return false;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 设置非阻塞
return true;
}
int main()
{
SetNonBlock(0); // 设置标准输入为非阻塞,只要设置一次,后续就都是非阻塞了
char buffer[1024];
while(true)
{
sleep(1);
errno = 0;
// 非阻塞的时候,我们是以出错的形式返回,告知上层数据没有就绪:
// 我们如何甄别是真的出错了,还是仅仅是数据没有就绪呢?
// 数据就绪了的话,我们就正常读取就行
ssize_t s = read(0, buffer, sizeof(buffer) - 1);
//出错,不仅仅是错误返回值,errno变量也会被设置,表明出错原因
if(s > 0)
{
buffer[s] = 0;
cout << "echo# " << buffer << " errno[---]: " << errno << " errstring: " << strerror(errno) << endl;
}
else
{
// 如果失败的errno值是11,就代表其实没错,只不过是底层数据没就绪
if(errno == EWOULDBLOCK || errno == EAGAIN)
{
cout << "当前0号fd的数据没有就绪,请下次再来试一试" << endl;
continue;
}
else if(errno == EINTR)
{
cout << "当前IO可能被中断,请下次再来试一试" << endl;
continue;
}
else
{
//进行差错处理
}
}
}
return 0;
}
运行结果:

可以看到,设置套接字为非阻塞后,当进程检测到缓冲区没有数据就绪时,进程不会阻塞,而是会一直循环执行,并轮询检测缓冲区,直到数据就绪;
系统提供select函数来实现多路复用输入/输出模型:
select解决的问题是等的问题,帮助用户一次等待多个文件sock;当某些sock就绪了,select就要通知用户,就绪的sock有哪些,然后用户再调用recv/recvfrom/read接口进行数据读取;

参数:
后面四个参数全都是输入输出型参数;
readfds,writefds,exceptfds这三个参数:
在输入时,用户告诉内核,需要帮忙关心哪些sock的哪一种事件;
在输出时,内核告诉用户,内核所关心的sock中,哪些sock上的哪类时间已经就绪了;
这三个参数都是fd_set类型的,这是一种位图结构,代表文件描述符集,需要使用匹配的方法对fd_set类型进行操作:

timeout:
类型是struct timeval结构体,可以用于获取时间:

两个成员分别是单位为秒和微妙的值;
根据timeout参数能选择slect的等待方式:
返回值:若返回值为0,代表timeout返回;若返回值为-1,代表select错误;其他返回值代表select返回成功;
以readfds参数为例,分析一下select过程:
Log.hpp
#pragma once
#include
#include
#include
#include
#include
// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./http.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level == DEBUG) return;
#endif
// va_list ap;
// va_start(ap, format);
// while()
// int x = va_arg(ap, int);
// va_end(ap); //ap=nullptr
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
// vprintf(format, args);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
//FILE *fp = fopen(LOGFILE, "a");
printf("%s%s\n", stdBuffer, logBuffer);
//fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
//fclose(fp);
}
Sock.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "Log.hpp"
class Sock
{
private:
const static int gbacklog = 20;
public:
Sock() {}
static int Socket()
{
int listensock = socket(AF_INET, SOCK_STREAM, 0);
if (listensock < 0)
{
logMessage(FATAL, "create socket error, %d:%s", errno, strerror(errno));
exit(2);
}
logMessage(NORMAL, "create socket success, listensock: %d", listensock);
return listensock;
}
static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
{
struct sockaddr_in local;
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno));
exit(3);
}
}
static void Listen(int sock)
{
if (listen(sock, gbacklog) < 0)
{
logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno));
exit(4);
}
logMessage(NORMAL, "init server success");
}
// 一般经验
// const std::string &: 输入型参数
// std::string *: 输出型参数
// std::string &: 输入输出型参数
static int Accept(int listensock, std::string *ip, uint16_t *port)
{
struct sockaddr_in src;
socklen_t len = sizeof(src);
int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
if (servicesock < 0)
{
logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno));
return -1;
}
if(port) *port = ntohs(src.sin_port);
if(ip) *ip = inet_ntoa(src.sin_addr);
return servicesock;
}
static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(server_port);
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
else return false;
}
~Sock() {}
};
main.cc
#include "selectServer.hpp"
#include
int main()
{
// 1. fd_set是一个固定大小位图,直接决定了select能同时关心的fd的个数是有上限的!
// std::cout << sizeof(fd_set) * 8 << std::endl;
std::unique_ptr<SelectServer> svr(new SelectServer);
svr->Start();
return 0;
}
selectServer.hpp
这段代码只是完成了用select接口同时等待多个文件描述符就绪,文件描述符就绪后的读取工作还未完成;
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include
#include
#include
#include
#include
#include "Log.hpp"
#include "Sock.hpp"
using namespace std;
class SelectServer
{
public:
SelectServer(const uint16_t &port = 8080)
: _port(port)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG, "%s", "create base socket success");
}
void Start()
{
fd_set rfds;
FD_ZERO(&rfds);// 将rfds清零
while(true)
{
//struct timeval timeout = {0, 0};
// 如何看待listensock? 获取新连接,我们把它依旧看做成为IO,input事件,如果没有连接到来呢?阻塞
//不能直接调用accept了
FD_SET(_listensock, &rfds); // 将listensock添加到读文件描述符集中
//int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);
int n = select(_listensock + 1, &rfds, nullptr, nullptr, nullptr);
switch(n)
{
case 0:
logMessage(DEBUG, "%s", "timeout");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
break;
default:
//select成功
logMessage(DEBUG, "%s", "get a new link event");
HandlerEvent(rfds);//对就绪的fd进行处理
break;
}
}
}
void HandlerEvent(const fd_set& rfds)
{
string clientip;
uint16_t clientport = 0;
if(FD_ISSET(_listensock, &rfds))
{
//listensock上面的读事件就绪了,表示可以读取了
//获取新连接了
int sock = Sock::Accept(_listensock, &clientip, &clientport); // 在这里进行accept是不会阻塞的
if(sock < 0)
{
logMessage(WARNING, "%s", "accept error");
return;
}
logMessage(DEBUG, "get a new link success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
}
}
private:
uint16_t _port;
int _listensock;
};
#endif
运行结果:

能够成功获取链接,但是此时还不能对该fd进行读取;
但是我们在Start中调用了HandlerEvent方法来获取连接,获取成功后如果还需要重新向select中添加新的fd,就很困难,因此需要更新编写代码的模式;
select的一般代码编写模式:
完整的selectServer.hpp代码
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include
#include
#include
#include
#include
#include "Log.hpp"
#include "Sock.hpp"
#define BITS 8
#define NUM (sizeof(fd_set) * BITS) // fd_set能够管理的fd的最大值
#define FD_NONE -1 // 文件描述符初始化状态
using namespace std;
class SelectServer
{
public:
SelectServer(const uint16_t &port = 8080)
: _port(port)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG, "%s", "create base socket success");
// 初始化数组
for (int i = 0; i < NUM; i++)
{
_fd_array[i] = FD_NONE;
}
// 规定:_fd_array[0] = _listensock
_fd_array[0] = _listensock;
}
void Start()
{
while (true)
{
// struct timeval timeout = {0, 0};
// 如何看待listensock? 获取新连接,我们把它依旧看做成为IO,input事件,如果没有连接到来呢?阻塞
// 不能直接调用accept了
// FD_SET(_listensock, &rfds); // 将listensock添加到读文件描述符集中
// int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);
// 1. nfds: 随着我们获取的sock越来越多,随着我们添加到select的sock越来越多,注定了nfds每一次都可能要变化,我们需要对它动态计算
// 2. rfds/writefds/exceptfds:都是输入输出型参数,输入输出不一定以一样的,所以注定了我们每一次都要对rfds进行重新添加
// 3. timeout: 都是输入输出型参数,每一次都要进行重置,前提是你要的话
// 1,2 => 注定了我们必须自己将合法的文件描述符需要单独全部保存起来 用来支持:1. 更新最大fd 2.更新位图结构
DebugPrint();
fd_set rfds;
FD_ZERO(&rfds); // 将rfds清零
int maxfd = _listensock;
// 将_fd_array中的需要关注的fd更新到rfds中
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == FD_NONE)
{
continue;
}
FD_SET(_fd_array[i], &rfds);
if (maxfd < _fd_array[i])
{
maxfd = _fd_array[i];
}
}
// rfds未来一定有两类sock:listensock和普通sock
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
logMessage(DEBUG, "%s", "timeout");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
break;
default:
// select成功
logMessage(DEBUG, "%s", "get a new link event");
HandlerEvent(rfds); // 对就绪的fd进行处理
break;
}
}
}
~SelectServer()
{
if (_listensock >= 0)
{
close(_listensock);
}
}
private:
fd_set 是一个集合,里面可能会存在多个sock,不同种的sock需要进行不同的处理,不能在这个函数中只有一种处理
void HandlerEvent(const fd_set &rfds)
{
for (int i = 0; i < NUM; i++)
{
// 1.去掉不合法fd
if (_fd_array[i] == FD_NONE)
{
continue;
}
// 2.合法fd也不一定就绪了
if (FD_ISSET(_fd_array[i], &rfds))
{
// 指定的fd,读事件就绪
// 读事件就绪:连接事件到来,accept
if (_fd_array[i] == _listensock)
{
Accepter(); // listensock需要进行accept
}
else
{
Recver(i); // 普通sock进行recv
}
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的读事件就绪了,表示可以读取了
// 获取新连接了
int sock = Sock::Accept(_listensock, &clientip, &clientport); // 在这里进行accept是不会阻塞的
if (sock < 0)
{
logMessage(WARNING, "%s", "accept error");
return;
}
logMessage(DEBUG, "get a new link success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
// read / recv? 不能!为什么不能?我们不清楚该sock上面数据什么时候到来,此时只是建立连接成功 ,recv、read就有可能先被阻塞,IO = 等+数据拷贝
// 谁可能最清楚呢?select!
// 得到新连接的时候,此时我们应该考虑的是,将新的sock托管给select,让select帮我们进行检测sock上是否有新的数据
// 有了数据select,读事件就绪,select就会通知我,我们在进行读取,此时我们就不会被阻塞了
// 要将sock添加 给 select, 其实我们只要将fd放入到数组中即可!
int pos = 1;
for (; pos < NUM; pos++)
{
if (_fd_array[pos] == FD_NONE) // 找出_fd_array中未设置合法fd的位置
{
break;
}
}
if (pos == NUM) // 数组满了
{
logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
close(sock);
}
else
{
_fd_array[pos] = sock; // 将sock加入_fd_array数组
}
}
void Recver(int pos)
{
// 读事件就绪:INPUT事件到来,recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fd_array[pos]);
// 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
// 这样读取有bug吗?有的,你怎么保证以读到了一个完整报文呢?
char buffer[1024];
int n = recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);
}
else if (n == 0) // 对端关闭连接
{
logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
// 1.我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2.不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
else
{
logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
// 1.我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2.不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
}
void DebugPrint()
{
cout << "_fd_array[]: ";
for (int i = 0; i < NUM; i++)
{
if (_fd_array[i] == FD_NONE)
continue;
cout << _fd_array[i] << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
int _fd_array[NUM]; // 第三方数组,用来保存有所得合法fd
};
#endif
运行结果:

可以看出select服务器可以同时关心多个fd的事件,是一个高并发的服务器;
优点:
缺点: