poll()执行的任务同select很相似::它等待一组文件描述符(要监视的文件描述符集在fds参数中指定,它是一个结构数组)中的一个准备好执行I/O。
两者间的主要区别在于我们要如何指定待检查的文件描述符。
- NAME
- poll, ppoll - 监视并等待多个文件描述符的属性变化
-
- SYNOPSIS
- #include
-
- int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd其定义如下:
- struct pollfd {
- int fd; /* file descriptor */
- short events; /* requested events */
- short revents; /* returned events */
- };
个结构体由三个部分组成,
(1)首先是描述符 fd:每一个 pollfd 结构体指定了一个被监视的文件描述符
(2)然后是描述符上待检测的事件类型 events
- #define POLLIN 0x0001 /* any readable data available */
- #define POLLPRI 0x0002 /* OOB/Urgent readable data */
- #define POLLOUT 0x0004 /* file descriptor is writeable */
(3)和 select 非常不同的地方在于,poll 每次检测之后的结果不会修改原来的传入值,而是将结果保留在 revents 字段中,这样就不需要每次检测完都得重置待检测的描述字和感兴趣的事件。我们可以把 revents 理解成“returned events”
pollfd 结构体中的events和revents字段都是位掩码。
revents
被设定以此来表示该文件描述符上实际发生的事件。每个结构体的 events 域是由用户来设置,告诉内核我们关注的是什么,而 revents 域是返回时内核设置的,以说明对该描述符发生了什么事件
下表列出来可能会出现在 events 和 revents 字段中的位掩码。
位掩码 | events的输入 | 返回到revents | 模式 |
---|---|---|---|
可读事件 | |||
POLLIN | ● | ● | 可读取非高优先级的数据 |
POLLRDNORM | ● | ● | 等同于 POLLIN |
POLLRDBAND | ● | ● | 可读取优先级数据(Linux 中不使用) |
POLLPRI | ● | ● | 有紧急数据(高优先级数据)要读取(例如,TCP套接字上的带外数据) |
POLLRDHUP | ● | ● | 对端套接字关闭 |
可写事件 | |||
POLLOUT | ● | ● | 普通数据可写 |
POLLWRNORM | ● | ● | 等同于 POLLOUT |
POLLWRBAND | ● | ● | 优先级数据可写入 |
设定返回的附加信息 | |||
POLLERR | ● | 有错误发生 | |
POLLHUP | ● | 出现挂断(描述符挂起) | |
POLLNVAL | ● | 文件描述符未打开(请求的事件无效,可能是fd未打开(仅输出)) | |
POLLMSG | Linux 中不使用(SUSv3 中未指定) |
位掩码(POLLERR、POLLHUP 以及 POLLNVAL)是设定在 revents 字段中用来返回有关文件描述符的附加信息(即错误事件)。
- 对于上述两种情况,套接字文件描述符仍处于打开状态,并且尚未关闭(但shutdown()可能已经被调用-)。
- 从理论上讲,应该可以立即重用套接字(例如,通过另一个connect()调用)。
- 调用shutdown()只是进行了TCP断开, 并没有释放文件描述符, close()文件描述符上的将释放仍代表套接字保留的资源。
补充:对于如下输入事件
poll()真正关心的标志位就是 POLLIN、POLLOUT、POLLPRI、POLLRDHUP、POLLHUP 以及 POLLERR。
如果我们对某个特定的文件描述符上的事件不感兴趣
这样,poll 函数将忽略这样的 events 事件,检测完成以后,所对应的“returned events”的成员值也将设置为 0。这两种方法都可以用来关闭对单个文件描述符的检查,而不需要重新建立整个fds表。
参数 timeout 决定了 poll()的阻塞行为,具体如下。
timeout参数指定poll()将阻塞的最小毫秒数。(这个间隔将被舍入到系统时钟粒度,而内核调度延迟意味着阻塞间隔可能会超出一小部分。)在timeout中指定负值意味着无限超时。指定timeout为0会导致poll()立即返回,即使没有准备好文件描述符。
作为函数的返回值,poll会返回如下几种情况中的一种
注意select()同poll()返回正整数值的细小差别。如果一个文件描述符在返回的描述符集合中出现了不止一次,系统调用select()会将同一个描述符计数多次。而poll()返回的是就绪态的文件描述符个数,而且一个文件描述符只会统计一次,就算在相应的 revents 字段中设定了多个位掩码也是如此。
下面程序创建了一个管道,将字节写到随机选择的管道写端,然后通过poll来检测哪个管道中有数据可用。
- #include <unistd.h>
- #include <string.h>
- #include <stdio.h>
- #include <time.h>
- #include <poll.h>
- int main(int argc, char *argv[])
- {
- int numPipes, ready, randPipe, numWrites, j;
- struct pollfd *pollFd;
- int (*pfds)[2]; /* File descriptors for all pipes */
-
- if (argc < 2 || strcmp(argv[1], "--help") == 0){
- printf("%s num-pipes [num-writes]\n", argv[0]);
- exit(EXIT_FAILURE);
- }
-
-
- /* Allocate the arrays that we use. The arrays are sized according
- to the number of pipes specified on command line */
-
- numPipes = atoi(argv[1]);
- numWrites = (argc > 2) ? atoi(argv[2]) : 1;
-
- pfds = (int(*)[2])calloc(numPipes, sizeof(int [2]));
- if (pfds == NULL){
- perror("calloc");
- exit(EXIT_FAILURE);
- }
-
- pollFd = (struct pollfd *)calloc(numPipes, sizeof(struct pollfd));
- if (pollFd == NULL){
- perror("calloc");
- exit(EXIT_FAILURE);
- }
- /* Create the number of pipes specified on command line */
-
- for (j = 0; j < numPipes; j++)
- if (pipe(pfds[j]) == -1){
- printf("pipe %d", j);
- exit(EXIT_FAILURE);
- }
-
-
- /* Perform specified number of writes to random pipes */
-
- srandom((int) time(NULL));
- for (j = 0; j < numWrites; j++) {
- randPipe = random() % numPipes;
- printf("Writing to fd: %3d (read fd: %3d)\n",
- pfds[randPipe][1], pfds[randPipe][0]);
- if (write(pfds[randPipe][1], "a", 1) == -1){
- printf("write %d", pfds[randPipe][1]);
- exit(EXIT_FAILURE);
- }
-
- }
-
- /* Build the file descriptor list to be supplied to poll(). This list
- is set to contain the file descriptors for the read ends of all of
- the pipes. */
-
- for (j = 0; j < numPipes; j++) {
- pollFd[j].fd = pfds[j][0];
- pollFd[j].events = POLLIN;
- }
-
- ready = poll(pollFd, numPipes, 0);
- if (ready == -1){
- printf("poll");
- exit(EXIT_FAILURE);
- }
-
-
- printf("poll() returned: %d\n", ready);
-
- /* Check which pipes have data available for reading */
-
- for (j = 0; j < numPipes; j++)
- if (pollFd[j].revents & POLLIN)
- printf("Readable: %3d\n", pollFd[j].fd);
-
- exit(EXIT_SUCCESS);
- }
程序的命令行参数指定了应该创建 10 个管道,而写操作应该随机选择其中的 3 个管道
从上面的输出我们可知 poll()发现3个管道上有数据可读取。
- #define INIT_SIZE 128
-
- int main(int argc, char **argv) {
- int listen_fd, connected_fd;
- int ready_number;
- ssize_t n;
- char buf[MAXLINE];
- struct sockaddr_in client_addr;
-
- listen_fd = tcp_server_listen(SERV_PORT);
-
- // 初始化 pollfd 数组,这个数组的第一个元素是 listen_fd,其余的用来记录将要连接的 connect_fd
- struct pollfd event_set[INIT_SIZE];
- event_set[0].fd = listen_fd;
- event_set[0].events = POLLRDNORM; // 期望系统内核检测监听套接字上的连接建立完成事件。
-
- // 用 -1 表示这个数组位置还没有被占用
- int i;
- for (i = 1; i < INIT_SIZE; i++) {
- event_set[i].fd = -1; // 如果对应 pollfd 里的文件描述字 fd 为负数,poll 函数将会忽略这个 pollfd
- }
-
- for (;;) {
- if ((ready_number = poll(event_set, INIT_SIZE, -1)) < 0) { //这里之所以传入 INIT_SIZE,是因为 poll 函数已经能保证可以自动忽略 fd 为 -1 的 pollfd,否则我们每次都需要计算一下 event_size 里真正需要被检测的元素大小;timeout 设置为 -1,表示在 I/O 事件发生之前 poll 调用一直阻塞。
- error(1, errno, "poll failed ");
- }
-
- if (event_set[0].revents & POLLRDNORM) { // 如果系统内核检测到监听套接字上的连接建立事件
- socklen_t client_len = sizeof(client_addr);
- connected_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_len); // 调用 accept 函数获取了连接描述字
-
- // 找到一个可以记录该连接套接字的位置
- for (i = 1; i < INIT_SIZE; i++) {
- if (event_set[i].fd < 0) {
- event_set[i].fd = connected_fd; //把连接描述字 connect_fd 也加入到 event_set 里
- event_set[i].events = POLLRDNORM; // 说明我们感兴趣的事件类型为 POLLRDNORM,也就是套集字上有数据可以读
- break;
- }
- }
-
- if (i == INIT_SIZE) { // 如果在数组里找不到这样一个位置,说明我们的 event_set 已经被很多连接充满了,没有办法接收更多的连接了
- error(1, errno, "can not hold so many clients");
- }
-
- if (--ready_number <= 0) // 因为 poll 返回的一个整数,说明了这次 I/O 事件描述符的个数,如果处理完监听套接字之后,就已经完成了这次 I/O 复用所要处理的事情
- continue; // 那么我们就可以跳过后面的处理,再次进入 poll 调用。
- }
-
- // 接下来的循环处理是查看 event_set 里面其他的事件,也就是已连接套接字的可读事件。这是通过遍历 event_set 数组来完成的。
- for (i = 1; i < INIT_SIZE; i++) {
- int socket_fd;
- if ((socket_fd = event_set[i].fd) < 0) // 如果数组里的 pollfd 的 fd 为 -1,说明这个 pollfd 没有递交有效的检测,直接跳过;
- continue;
- if (event_set[i].revents & (POLLRDNORM | POLLERR)) { // 通过检测 revents 的事件类型是 POLLRDNORM 或者 POLLERR,我们可以进行读操作
- if ((n = read(socket_fd, buf, MAXLINE)) > 0) { // 读取数据正常之后,再通过 write 操作回显给客户端
- if (write(socket_fd, buf, n) < 0) {
- error(1, errno, "write error");
- }
- } else if (n == 0 || errno == ECONNRESET) { // 如果读到 EOF 或者是连接重置,则关闭这个连接,并且把 event_set 对应的 pollfd 重置
- close(socket_fd);
- event_set[i].fd = -1;
- } else {
- error(1, errno, "read error"); // 读取数据失败。
- }
-
- if (--ready_number <= 0) // 判断如果事件已经被完全处理完之后,直接跳过对 event_set 的循环处理,再次来到 poll 调用。
- break;
- }
- }
- }
- }
实验
我们启动这个服务器程序,然后通过 telnet 连接到这个服务器程序。为了检验这个服务器程序的 I/O 复用能力,我们可以多开几个 telnet 客户端,并且在屏幕上输入各种字符串。
客户端 1:
- $telnet 127.0.0.1 43211
- Trying 127.0.0.1...
- Connected to 127.0.0.1.
- Escape character is '^]'.
- a
- a
- aaaaaaaaaaa
- aaaaaaaaaaa
- afafasfa
- afafasfa
- fbaa
- fbaa
- ^]
-
-
- telnet> quit
- Connection closed.
客户端 2:
- telnet 127.0.0.1 43211
- Trying 127.0.0.1...
- Connected to 127.0.0.1.
- Escape character is '^]'.
- b
- b
- bbbbbbb
- bbbbbbb
- bbbbbbb
- bbbbbbb
- ^]
-
-
- telnet> quit
- Connection closed.
可以看到,这两个客户端互不影响,每个客户端输入的字符很快会被回显到客户端屏幕上。一个客户端断开连接,也不会影响到其他客户端。
- int acl_socket_read(void *buf, size_t size) const{
- return read(sock_, buf, size);
- }
- /**
- * 从套接字读数据
- * @param buf {void*} 内存缓冲区地址
- * @param size {size_t} buf 缓冲区大小
- * @param timeout {size_t} 读超时时间(秒)
- * @return {int} 0: OK; -1: error
- */
- int ACL_VSTREAM::acl_socket_read(void *buf, size_t size, int timeout)
- {
- if (read_ready_) { // [读]已经准备好了
- read_ready_ = 0;
- } else if (timeout > 0 && socket_->acl_read_wait(timeout) < 0) {
- return -1;
- }
-
-
- return socket_->acl_socket_read(buf, size);
- }
-
- /**
- * 将数据读入至指定的缓冲区中
- * @param fp {ACL_VSTREAM*}
- * @param buf {void*} 目标缓冲区
- * @param size {size_t} buf 缓冲区大小
- * @return {int} 返回读到的数据,返回值如下:
- * > 0 当前读到缓冲区中的数据长度
- * == 0 对端连接关闭
- * < 0 在阻塞方式下表示读出错,采用非阻塞方式时也会返回 -1
- */
- int ACL_VSTREAM::read_to_buffer(void *buf, size_t size)
- {
- int read_cnt, nagain = 0;
-
- if (socket_->get_socket() == ACL_SOCKET_INVALID) {
- read_ready_ = 0;
- return -1;
- }
-
- AGAIN:
-
- // /* 清除系统错误号 */
- errno = (0);
-
- read_cnt = acl_socket_read(buf, size, rw_timeout_);
- //
- if (read_cnt > 0) {
- // flag_ &= ~ACL_VSTREAM_FLAG_BAD;
- errnum_ = 0;
- total_read_cnt_ += read_cnt;
- return read_cnt;
- } else if (read_cnt == 0) {
- flag_ = ACL_VSTREAM_FLAG_EOF;
- errnum_ = 0;
- return 0;
- }
-
- printf("-------------%d", errno);
- errnum_ = errno;
-
- if (errnum_ == EINTR) {
- if (nagain++ < 5) {
- goto AGAIN;
- }
-
- logger_error("[errno] %s(%d), %s: nagain: %d too much, fd: %d",
- __FILE__, __LINE__, __FUNCTION__, nagain, socket_->get_socket());
- } else if (errnum_ == ETIMEDOUT) {
- flag_ |= ACL_VSTREAM_FLAG_TIMEOUT;
- errno = ETIMEDOUT;
- } else if (errnum_ != EWOULDBLOCK) {
- flag_ |= ACL_VSTREAM_FLAG_ERR;
- }
-
- return -1;
- }
-
-
- int ACL_VSTREAM::read_buffed()
- {
- int n;
- read_ptr_ = read_buf_; // 当前缓冲区位置指向读取缓冲区的开头
- n = read_to_buffer(read_buf_, (size_t) read_buf_len_);
- read_cnt_ = n > 0 ? n : 0;
- return n;
- }
- /**
- * 将缓冲区内的数据拷贝到 vptr 中
- * @param vptr {void*} 用户的数据缓冲区指针地址
- * @param maxlen {size_t} vptr 数据缓冲区的空间大小
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示出错, 应该关闭本地数据流,
- * ret >= 0: 成功从 fp 数据流的缓冲区中读取了 ret 个字节的数据
- */
- int acl_vstream_bfcp_some(void *vptr, size_t maxlen){
- int n;
-
- /* input params error */
- if ( vptr == NULL || maxlen == 0) {
- printf("[errno] %s, %s(%d): input error, vptr %s, "
- "maxlen %d", __FUNCTION__, __FILE__, __LINE__,
- vptr ? "not null" : "null",
- (int) maxlen);
- return ACL_VSTREAM_EOF;
- }
-
- /* internal fatal error */
- if (read_cnt_ < 0) {
- printf("[errno] %s, %s(%d): read_cnt(=%d) < 0",
- __FUNCTION__, __FILE__, __LINE__, (int) read_cnt_);
- return ACL_VSTREAM_EOF;
- }
-
- /* there is no any data in buf */
- if (read_cnt_== 0) {
- read_ptr_ = read_buf_; // 重新将当前指针指向缓冲区的开头
- return 0;
- }
-
- if (read_ptr_ >= read_buf_ + (int) read_buf_len_) { // 当前指针位置 超出 读取缓冲区的区域
- read_cnt_ = 0;
- read_ptr_ = read_buf_;
- return 0;
- }
-
-
- n = (int) read_cnt_ > (int) maxlen ? (int) maxlen : (int) read_cnt_;
-
- memcpy(vptr, read_ptr_, n);
-
- read_cnt_ -= n;
- offset_ += n;
-
- if (read_cnt_ == 0) {
- read_ptr_ = read_buf_;
- } else {
- read_ptr_ += n;
- }
-
- return n;
- }
-
- int ACL_VSTREAM::acl_vstream_getc()
- {
- if (read_cnt_ <= 0 && read_buffed() <= 0) { // 如果当前读取到的数据为空,那么就去socket流中将数据读取到缓冲区中
- return ACL_VSTREAM_EOF;
- }
-
- // 本次读取成功或者上一次有剩余的
- read_cnt_--; // 可用数据减少1
- offset_++;
- return *read_ptr_++;
- }
-
- /**
- * 从数据流中读取一行数据, 直到读到 "\n" 或读结束为止, 正常情况下包括 "\n"
- * @param fp {ACL_VSTREAM*} 数据流
- * @param vptr {void*} 用户所给的内存缓冲区指针
- * @param maxlen {size_t} vptr 缓冲区的大小
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 读出错或对方关闭了连接,
- * 应该关闭本地数据流; n > 0: 读到 了 n 个字节的数据, 如果该 n 个数据
- * 的最后一个非 0 字符为 "\n" 表明读到了一个完整的行, 否则表明读到了 n
- * 个数据但对方未发送 "\n" 就关闭了连接; 还可以通过检查
- * (fp->flag & ACL_VSTREAM_FLAG_TAGYES)
- * 不等于 0 来判断是否读到了 "\n", 如果非 0 则表示读到了 "\n".
- */
- int ACL_VSTREAM::acl_vstream_gets(void *vptr, size_t maxlen){
- if (vptr == NULL || maxlen <= 0) {
- logger_error("%s(%d), %s: vptr %s, maxlen %d",
- __FILE__, __LINE__, __FUNCTION__,
- vptr ? "not null" : "null", (int) maxlen);
- return ACL_VSTREAM_EOF;
- }
-
-
-
- int n, ch;
- unsigned char *ptr = (unsigned char *) vptr;
- for ( n = 1; n < (int) maxlen; n++) {
- /* left one byte for '\0' */
-
- ch = acl_vstream_getc();
- if (ch == ACL_VSTREAM_EOF) {
- if (n == 1) { /* EOF, nodata read */
- return ACL_VSTREAM_EOF;
- }
- break; /* EOF, some data was read */
- }
-
- *ptr++ = ch;
- if (ch == '\n') {
- /* newline is stored, like fgets() */
- //fp->flag |= ACL_VSTREAM_FLAG_TAGYES;
- break;
- }
- }
-
- /* null terminate like fgets() */
- *ptr = 0;
- return n;
- }
-
-
-
- /**
- * 从数据流中读取一行数据, 直到读到 "\n" 或读结束为止, 返回的结果中不包括 "\n"
- * @param vptr {void*} 用户所给的内存缓冲区指针
- * @param maxlen {size_t} vptr 缓冲区的大小
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 读出错或对方关闭了连接,
- * 应该关闭本地数据流, n == 0: 读到了一行数据, 但该行数据仅有 "\r\n",
- * n > 0: 读到 了 n 个字节的数据.
- */
- int ACL_VSTREAM::acl_vstream_gets_nonl(void *vptr, size_t maxlen){
- if (vptr == NULL || maxlen <= 0) {
- logger_error("[error] %s(%d), %s: vptr %s, maxlen %d",
- __FILE__, __LINE__, __FUNCTION__,
- vptr ? "not null" : "null", (int) maxlen);
- return ACL_VSTREAM_EOF;
- }
-
-
- int n, ch;
- unsigned char *ptr;
-
- ptr = (unsigned char *) vptr;
- for (n = 1; n < (int) maxlen; n++) {
- ch = acl_vstream_getc();
-
- if (ch == ACL_VSTREAM_EOF) {
- if (n == 1) /* EOF, nodata read */
- return ACL_VSTREAM_EOF;
- break; /* EOF, some data was read */
- }
-
- *ptr++ = ch;
- if (ch == '\n') {
- break; /* newline is stored, like fgets() */
- }
- }
-
-
- *ptr = 0; /* null terminate like fgets() */
- ptr--;
- while (ptr >= (unsigned char *) vptr) {
- if (*ptr != '\r' && *ptr != '\n')
- break;
-
- *ptr-- = 0;
- n--;
- }
-
- return n;
- }
- /**
- * 从数据流中一次性读取 n 个数据, 该 n 有可能会小于用户所需要的 maxlen
- * @param fp {ACL_VSTREAM*} 数据流
- * @param vptr {void*} 用户的数据缓冲区指针地址
- * @param maxlen {size_t} vptr 数据缓冲区的空间大小
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示出错, 应该关闭本地数据流,
- * ret > 0: 表示读到了 ret 个字节的数据
- * 注: 如果缓冲区内有数据, 则直接把缓冲区内的数据复制到用户的缓冲区然后直接返回;
- * 如果缓冲区内无数据, 则需要调用系统读操作(有可能会阻塞在系统读操作上), 该
- * 次调用返回后则把读到数据复制到用户缓冲区返回.
- * 在这两种情况下都不能保证读到的字节数等于所要求的字节数, 若想读到所要求的
- * 字节后才返回则请调用 vstream_loop_readn() 函数.
- */
- int acl_vstream_read(void *buf, size_t buf_size){
- if (buf == NULL || buf_size == 0) {
- logger_error("[errno] %s(%d): buf: %s, size: %d",
- __FUNCTION__, __LINE__, buf ? "not null" : "null", (int) buf_size);
- return ACL_VSTREAM_EOF;
- }
-
- if (read_cnt_ < 0) { // 当前read_buf_还剩下的字节数
- logger_error("[errno] %s, %s(%d): read_cnt(%d) < 0",
- __FUNCTION__, __FILE__, __LINE__, (int) read_cnt_);
- return ACL_VSTREAM_EOF;
- }
-
- if (read_cnt_ > 0) {
- return acl_vstream_bfcp_some((unsigned char*) buf, buf_size);
- }
-
- /* fp->read_cnt == 0 */
- /* 当缓冲区较大时,则直接将数据读到该缓冲区从而避免大数据拷贝 */
- if (buf_size >= (size_t) read_buf_len_ / 4) {
- int n = this->read_to_buffer(buf, buf_size);
- return n <= 0 ? ACL_VSTREAM_EOF : n;
- }else{
- /* 否则将数据读到流缓冲区中,然后再拷贝,从而减少 read 次数 */
- int n = this->read_buffed();
- if (n <= 0) {
- return ACL_VSTREAM_EOF;
- }
- return this->acl_vstream_bfcp_some((unsigned char*) buf, buf_size);
- }
-
- }
- /**
- * 循环读取 maxlen 个数据, 直到读到 maxlen 个字节为止或读出错
- * @param fp {ACL_VSTREAM*} 数据流
- * @param vptr {void*} 用户的数据缓冲区指针地址
- * @param maxlen {size_t} vptr 数据缓冲区的空间大小
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 读出错或对方关闭了连接, 应该
- * 关闭本地数据流 n > 0: 成功读取了 maxlen 个字节的数据
- * 如果实际读取的字节数与 maxlen 不相等也返回错误(ACL_VSTREAM_EOF)
- */
- int ACL_VSTREAM::acl_vstream_readn( void *buf, size_t size){
- if ( buf == NULL || size == 0) {
- logger_error("%s(%d): buf %s, size %d",
- __FUNCTION__, __LINE__, buf ? "not null" : "null", (int) size);
- return ACL_VSTREAM_EOF;
- }
-
- size_t size_saved = size;
- unsigned char *ptr = (unsigned char*) buf;
- int n;
- /* 如果缓冲区中有上次读残留数据时,则优先将其拷贝至目标缓冲区 */
- if (read_cnt_ > 0) {
- n = acl_vstream_bfcp_some(ptr, size);
- ptr += n;
- size -= n;
- if (size == 0) {
- return (int) size_saved;
- }
- }
-
- /* 为减少 read 次数,当输入缓冲区较小时,则自动启用双缓冲读方式 */
-
- if (size_saved < (size_t) read_buf_len_ / 4) {
- while (size > 0) {
- if (read_buffed() <= 0) {
- return ACL_VSTREAM_EOF;
- }
- n = acl_vstream_bfcp_some( ptr, size);
- ptr += n;
- size -= n;
- }
- }
- /* 否则,则直接将读到的数据存入缓冲区,从而避免大数据的二次拷贝 */
- else {
- while (size > 0) {
- n = read_to_buffer( ptr, size);
- if (n <= 0) {
- return ACL_VSTREAM_EOF;
- }
- size -= n;
- ptr += n;
- }
- }
-
- return (int) size_saved;
- }
- /**
- * 向套接字写数据
- * @param buf {void*} 数据地址
- * @param size {size_t} buf 数据大小
- * @param timeout {int} 写超时时间(秒)
- * @return {int} 0: OK; -1: error
- */
- int acl_socket::acl_socket_write(const void *buf, size_t size, int timeout)
- {
- int ret;
-
- ret = (int) write(sock_, buf, size);
- if (ret > 0) {
- return ret;
- }
-
- if (timeout <= 0) {
- return ret;
- }
-
-
- // 写数据时,若一次发送的数据超过TCP发送缓冲区,则返EAGAIN/EWOULDBLOCK,表示数据没有发送完,这个时候一定要继续等待可写事件发生!!!!
- if (errno != EWOULDBLOCK) {
- return ret;
- }
-
- if (this->acl_write_wait(timeout) < 0) {
- return -1;
- }
-
- ret = write(sock_, buf, size);
-
- return ret;
- }
- int acl_socket::acl_write_wait(int timeout)
- {
- return acl_write_wait_ms(timeout * 1000);
- }
- /**
- * 写等待操作,直到套接字可写、出错或超时
- * @param timeout {int} 超时时间,单位为毫秒,该值分下面三种情形:
- * > 0 : 表示最大超时时间的秒数,
- * == 0 : 表示不等待,检测完后立即返回
- * < 0 : 时表示直接该套接字可读或出错为止
- * @return {int} 0: 可写; -1: 失败或超时
- */
- int acl_socket::acl_write_wait_ms(int timeout)
- {
- const char *myname = "acl_write_wait";
- struct pollfd fds;
- int delay = timeout;
-
- fds.events = POLLOUT; // 对可写事件感兴趣
- fds.revents = 0; // 清空返回值
- fds.fd = this->sock_; //监听当前socket
-
- for (;;) {
- switch (poll(&fds, 1, delay)) {
- case -1:
- if (errno == EINTR) {
- continue; //继续poll
- }
- logger_error("%s(%d), %s: poll error(%s), fd: %d",
- __FILE__, __LINE__, myname,
- strerror(errno), (int) this->sock_);
- return -1;
- case 0:
- errno = ETIMEDOUT;
- logger_error(" %s(%d), %s: poll return 0",
- __FILE__, __LINE__, myname);
- return -1; //超时就直接返回
- default:
- if (fds.revents & POLLOUT) {
- return 0; //变得可写了
- }
-
- if (!(fds.revents & (POLLHUP | POLLERR | POLLNVAL))) {
- logger_error("%s(%d), %s: error: %s, fd: %d",
- __FILE__, __LINE__, myname,
- strerror(errno), this->sock_);
- return -1; //出现错误
- }
-
-
- logger_warn(" %s(%d), %s: %s, revents=%d, %d, %d, %d",
- __FILE__, __LINE__, myname,
- strerror(errno), fds.revents,
- fds.revents & POLLHUP,
- fds.revents& POLLERR,
- fds.revents& POLLNVAL);
-
- return 0;
- }
- }
- }
- int ACL_VSTREAM::write_once(const void *vptr, int dlen)
- {
- int n, neintr = 0;
-
- if (vptr == NULL || dlen <= 0) {
- if (vptr == NULL) {
- printf("[error] %s, %s(%d): vptr null",
- __FUNCTION__, __FILE__, __LINE__);
- }
- if (dlen <= 0) {
- printf("[error] %s, %s(%d): dlen(%d) <= 0",
- __FUNCTION__, __FILE__, __LINE__, dlen);
- }
- errnum_ = EINVAL;
- return ACL_VSTREAM_EOF;
- }
-
- if (socket_->get_socket() == ACL_SOCKET_INVALID) {
- printf("[error] %s, %s(%d): sockfd invalid",
- __FUNCTION__, __FILE__, __LINE__);
- errnum_ = EINVAL;
- return ACL_VSTREAM_EOF;
- }
-
- TAG_AGAIN:
-
- /* 清除系统错误号 */
- errno = 0;
-
-
-
- n = socket_->acl_socket_write(vptr, dlen,0);
-
- if (n > 0) {
- total_write_cnt_ += n;
- return n;
- }
-
- errnum_ = errno;
- if (errnum_ == EINVAL) {
- if (++neintr >= 5) {
- flag_ |= ACL_VSTREAM_FLAG_ERR;
- return ACL_VSTREAM_EOF;
- }
-
- goto TAG_AGAIN;
- }
-
- if (get_errnum() == EWOULDBLOCK) {
- errno = EAGAIN;
- } else if (get_errnum() == ETIMEDOUT) {
- flag_ |= ACL_VSTREAM_FLAG_TIMEOUT;
- } else {
- flag_ |= ACL_VSTREAM_FLAG_ERR;
- }
-
- return ACL_VSTREAM_EOF;
- }
-
- int ACL_VSTREAM::loop_writen(const void *vptr, size_t size)
- {
- const unsigned char *ptr = (const unsigned char *) vptr;
- int once_dlen = 64 * 1024 * 1024; /* xxx: 以 64KB 为单位写 */
- int nleft = (int) size, n, len;
-
- while (nleft > 0) {
- len = nleft > once_dlen ? once_dlen : nleft;
- n = write_once(ptr, len);
- if (n < 0)
- return ACL_VSTREAM_EOF;
-
- nleft -= n;
- ptr += n;
- }
-
- return (int) (ptr - (const unsigned char *) vptr);
- }
- /**
- * 循环向数据流中写 dlen 个字节的数据直至写完或出错为止
- * @param vptr {const char*} 数据区指针地址
- * @param dlen {size_t} 待写的数据区数据长度
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示写出错, 应该关闭本地数据流,
- * ret > 0: 表示成功写了 dlen 个字节的数据
- */
- int ACL_VSTREAM::acl_vstream_writen(const void *vptr, size_t dlen){
- if (vptr == NULL || dlen == 0) {
- printf("[errno] %s(%d), %s: vptr %s, dlen %d", __FILE__,
- __LINE__, __FUNCTION__, vptr ? "not null" : "null", (int) dlen);
- return ACL_VSTREAM_EOF;
- }
- return loop_writen(vptr, dlen);
- }
-
-
- /**
- * 带格式的流输出, 类似于 vfprintf()
- * @param fmt {const char*} 数据格式
- * @param ap {va_list}
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示写出错, 应该关闭本地数据流,
- * ret > 0: 表示成功写了 dlen 个字节的数据
- */
- int ACL_VSTREAM::acl_vstream_vfprintf( const char *fmt, va_list ap)
- {
- if (fmt == NULL || *fmt == 0) {
- printf("[errno] %s, %s(%d): fmt %s", __FUNCTION__,
- __FILE__, __LINE__,
- fmt && *fmt ? "not null" : "null");
- return ACL_VSTREAM_EOF;
- }
-
- #define ACL_VSTREAM_BUFSIZE 4096
- char buffer[ACL_VSTREAM_BUFSIZE];
- memset(buffer, 0, ACL_VSTREAM_BUFSIZE);
- int n = vsprintf(buffer, fmt, ap);
- if (n <= 0 ) {
- printf("[fatal] %s, %s(%d): len(%d) <= 0",
- __FUNCTION__, __FILE__, __LINE__, n);
- exit(0);
- }else if(n > ACL_VSTREAM_BUFSIZE){
- printf("[fatal] %s, %s(%d): len(%d) > 4096",
- __FUNCTION__, __FILE__, __LINE__, n);
- exit(0);
- }
-
- n = acl_vstream_writen(buffer, n);
- return n;
- }
- /**
- * 带格式的流输出, 类似于 fprintf()
- * @param fmt {const char*} 数据格式
- * @param ... 变参序列
- * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示写出错, 应该关闭本地数据流,
- * ret > 0: 表示成功写了 dlen 个字节的数据
- */
- int ACL_VSTREAM::acl_vstream_fprintf(const char *fmt, ...){
- if (fmt == NULL) {
- logger_error("[error] %s, %s(%d): input invalid",
- __FUNCTION__, __FILE__, __LINE__);
- return ACL_VSTREAM_EOF;
- }
-
- va_list ap;
- va_start(ap, fmt);
- int n = acl_vstream_vfprintf(fmt, ap);
- va_end(ap);
- return n;
- }
-
- #include <stdio.h>
- #include <string.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <fcntl.h>
- #include <poll.h>
-
-
- #define KEY_ENTER 28
-
- //0、设计一个描述按键的数据的对象
- struct key_event{
- int code; //按键类型:home,esc,enter
- int value; //表状态,按下,松开
- };
-
-
- int main(int argc, char *argv[])
- {
- struct key_event event;
- int ret;
- char in_buf[128];
-
- int fd;
- fd = open("/dev/key0", O_RDWR);
- if(fd < 0)
- {
- perror("open");
- exit(1);
- }
-
- //监控多个文件fd
- struct pollfd pfd[2];
- pfd[0].fd = fd; //监控按键输入
- pfd[0].events = POLLIN;
-
- pfd[1].fd = 0; //标准输入:0,标准输出:1,标准出错:2
- pfd[1].events = POLLIN;
-
- while(1)
- {
- printf("-----------------start to poll--------------------\n");
- ret = poll(pfd, 2, -1); //在驱动中fops要实现poll接口
-
- if(ret > 0)
- {
- //表示2个fd中至少一个发生读事件
- if(pfd[0].revents & POLLIN) //revents用于判断,会由内核自动填充
- {
- read(pfd[0].fd, &event, sizeof(struct key_event)); //每次读必有数据
- if(event.code == KEY_ENTER)
- {
- if(event.value)
- {
- printf("APP__ key enter down\n");
- }else{
-
- printf("APP__ key enter up\n");
- }
- }
- }
-
- if(pfd[1].revents & POLLIN)
- {
- fgets(in_buf, 128, stdin); //从标准输入中获取128字节存入in_buf
- printf("in_buf = %s\n",in_buf);
- }
- }
- else{
- perror("poll");
- exit(1);
- }
- printf("----------------------End poll----------------------\n");
- }
-
-
- close(pfd[0].fd);
-
- return 0;
- }
简单来说,select/poll
能监听多个设备的文件描述符,只要有任何一个设备满足条件,select/poll
就会返回,否则将进行睡眠等待。看起来,select/poll
像是一个管家了,统一负责来监听处理了。
已经迫不及待来看看原理了,由于底层的机制大体差不多,我将选择select
来做进一步分析。