• poll API接口 - 访问方式(三)


    poll()执行的任务同select很相似::它等待一组文件描述符(要监视的文件描述符集在fds参数中指定,它是一个结构数组)中的一个准备好执行I/O。

    两者间的主要区别在于我们要如何指定待检查的文件描述符。

    • 在select中,我们提供三个集合,在每个集合中标明我们感兴趣的文件描述符。
    • 而在poll中我们提供一列文件描述符,并在每个文件描述符上标明我们感兴趣的事件
    1. NAME
    2. poll, ppoll - 监视并等待多个文件描述符的属性变化
    3. SYNOPSIS
    4. #include
    5. int poll(struct pollfd *fds, nfds_t nfds, int timeout);

    参数:文件描述符集

    • fds:列出了我们需要poll()来检查的文件描述符们。该参数为结构体数组,每个数组元素都是一个struct pollfd结构。可以传递多个结构体,指示 poll() 监视多个文件描述符。
    • nfds:表示fds结构体数组的长度,简单说,就是向 poll 申请的事件检测的个数。
      • 在 select 里面,文件描述符的个数已经随着 fd_set 的实现而固定,没有办法对此进行配置
      • 而在 poll 函数里,我们可以控制 pollfd 结构的数组大小,这意味着我们可以突破原来 select 函数最大描述符的限制
    • timeout:描述了 poll 的行为,单位是毫秒
      • 如果是一个 <0 的数,表示在有事件发生之前永远等待;
      • 如果是 0,表示不阻塞进程,立即返回;
      • 如果是一个 >0 的数,表示 poll 调用方等待指定的毫秒数后返回。

    struct pollfd其定义如下:

    1. struct pollfd {
    2. int fd; /* file descriptor */
    3. short events; /* requested events */
    4. short revents; /* returned events */
    5. };

    个结构体由三个部分组成,
    (1)首先是描述符 fd:每一个 pollfd 结构体指定了一个被监视的文件描述符
    (2)然后是描述符上待检测的事件类型 events

    • 注意这里的 events 可以表示多个不同的事件,具体的实现可以通过使用二进制掩码位操作来完成,例如,POLLIN 和 POLLOUT 可以表示读和写事件。
    1. #define POLLIN 0x0001 /* any readable data available */
    2. #define POLLPRI 0x0002 /* OOB/Urgent readable data */
    3. #define POLLOUT 0x0004 /* file descriptor is writeable */

    (3)和 select 非常不同的地方在于,poll 每次检测之后的结果不会修改原来的传入值,而是将结果保留在 revents 字段中,这样就不需要每次检测完都得重置待检测的描述字和感兴趣的事件。我们可以把 revents 理解成“returned events”

    • revents字段是一个输出参数,由内核用实际发生的事件填充。revent中返回的位可以包括event中指定的任何位,也可以是POLLERR、POLLHUP或POLLNVAL值之一。(这三位在events字段中没有意义,当相应的条件为真时,将在revents字段中设置。)
    • 如果任何文件描述符都没有发生请求的事件(也没有发生错误),则poll()将阻塞,直到其中一个事件发生。

    pollfd 结构体中的events和revents字段都是位掩码。

    • 调用者初始化events来指定需要为描述符fd做检查的事件。
    • 当poll返回时,revents被设定以此来表示该文件描述符上实际发生的事件。

    每个结构体的 events 域是由用户来设置,告诉内核我们关注的是什么,而 revents 域是返回时内核设置的,以说明对该描述符发生了什么事件

    下表列出来可能会出现在 events 和 revents 字段中的位掩码。

    位掩码events的输入返回到revents模式
    可读事件
    POLLIN可读取非高优先级的数据
    POLLRDNORM等同于 POLLIN
    POLLRDBAND可读取优先级数据(Linux 中不使用)
    POLLPRI有紧急数据(高优先级数据)要读取(例如,TCP套接字上的带外数据)
    POLLRDHUP对端套接字关闭
    可写事件
    POLLOUT普通数据可写
    POLLWRNORM等同于 POLLOUT
    POLLWRBAND优先级数据可写入
    设定返回的附加信息
    POLLERR有错误发生
    POLLHUP出现挂断(描述符挂起)
    POLLNVAL文件描述符未打开(请求的事件无效,可能是fd未打开(仅输出))
    POLLMSGLinux 中不使用(SUSv3 中未指定)

    位掩码(POLLERR、POLLHUP 以及 POLLNVAL)是设定在 revents 字段中用来返回有关文件描述符的附加信息(即错误事件)。

    • POLLERR表示套接字收到异步错误。
      • 在TCP中,这通常意味着已经接收或发送了RST(RST表示复位,用来异常的关闭连接)。
      • 如果文件描述符不是套接字,则POLLERR可能意味着设备不支持轮询。
    • POLLHUP装置套接字不再连接。
      • 在TCP中,这意味着已接收和发送FIN(FIN表示客户端正常关闭)
      • 如果poll监听的fd是socket,表示这个socket并没有在网络上建立连接,比如说只调用了socket()函数,但是没有进行connect。
    • 对于上述两种情况,套接字文件描述符仍处于打开状态,并且尚未关闭(但shutdown()可能已经被调用-)。
    • 从理论上讲,应该可以立即重用套接字(例如,通过另一个connect()调用)。
    • 调用shutdown()只是进行了TCP断开, 并没有释放文件描述符, close()文件描述符上的将释放仍代表套接字保留的资源。
    • POLLNVAL表示套接字文件描述符未打开

    补充:对于如下输入事件

    • POLLRDHUP(自Linux 2.6.17):
      • 流套接字peer关闭连接,或关闭写连接的一半。
      • 为了获得这个定义,必须定义_GNU_SOURCE特性测试宏(在包含任何头文件之前)。

    poll()真正关心的标志位就是 POLLIN、POLLOUT、POLLPRI、POLLRDHUP、POLLHUP 以及 POLLERR。

    • 对于可读事件,一般我们在程序里面有 POLLIN 即可。套接字可读事件和 select 的 readset 基本一致,是系统内核通知应用程序有数据可以读,通过 read 函数执行操作不会被阻塞。
    • 对于可写事件,一般我们在程序里面统一使用 POLLOUT(现在写入不会阻塞)。套接字可写事件和 select 的 writeset 基本一致,是系统内核通知套接字缓冲区已准备好,通过 write 函数执行写操作不会被阻塞。
    • 可读事件和可写事件都可以在“returned events”得到复用。但是错误事件没有办法通过poll向系统内核递交检测请求,只能通过“returned events”来加以检测

    如果我们对某个特定的文件描述符上的事件不感兴趣

    • 可以将events设为0。
    • 给fd字段指定一个负值

    这样,poll 函数将忽略这样的 events 事件,检测完成以后,所对应的“returned events”的成员值也将设置为 0。这两种方法都可以用来关闭对单个文件描述符的检查,而不需要重新建立整个fds表。

    timeout 参数

    参数 timeout 决定了 poll()的阻塞行为,具体如下。

    • 如果 timeout 等于−1,poll()会一直阻塞直到 fds 数组中列出的文件描述符有一个达到就绪态(定义在对应的 events 字段中)或者捕获到一个信号。
    • 如果 timeout 等于 0,poll()不会阻塞—只是执行一次检查看看哪个文件描述符处于就绪态。
    • 如果 timeout 大于 0,poll()至多阻塞 timeout 毫秒,直到 fds 列出的文件描述符中有一个达到就绪态,或者直到捕获到一个信号为止

    timeout参数指定poll()将阻塞的最小毫秒数。(这个间隔将被舍入到系统时钟粒度,而内核调度延迟意味着阻塞间隔可能会超出一小部分。)在timeout中指定负值意味着无限超时。指定timeout为0会导致poll()立即返回,即使没有准备好文件描述符。

    poll的返回值

    作为函数的返回值,poll会返回如下几种情况中的一种

    • 返回-1表示有错误产生。一种可能的错误是EINTR,表示该调用被一个信号处理函数例程终端(如果被信号处理例程中断,poll绝不会自动恢复)
    • 返回 0:表示该调用在指定的时间到达之前没有任何事件发生
    • 返回正整数:表示检测到的事件个数,也就是“returned events”中非 0 的描述符个数。

    注意select()同poll()返回正整数值的细小差别。如果一个文件描述符在返回的描述符集合中出现了不止一次,系统调用select()会将同一个描述符计数多次。而poll()返回的是就绪态的文件描述符个数,而且一个文件描述符只会统计一次,就算在相应的 revents 字段中设定了多个位掩码也是如此。

    看个例子

    下面程序创建了一个管道,将字节写到随机选择的管道写端,然后通过poll来检测哪个管道中有数据可用。

    1. #include <unistd.h>
    2. #include <string.h>
    3. #include <stdio.h>
    4. #include <time.h>
    5. #include <poll.h>
    6. int main(int argc, char *argv[])
    7. {
    8. int numPipes, ready, randPipe, numWrites, j;
    9. struct pollfd *pollFd;
    10. int (*pfds)[2]; /* File descriptors for all pipes */
    11. if (argc < 2 || strcmp(argv[1], "--help") == 0){
    12. printf("%s num-pipes [num-writes]\n", argv[0]);
    13. exit(EXIT_FAILURE);
    14. }
    15. /* Allocate the arrays that we use. The arrays are sized according
    16. to the number of pipes specified on command line */
    17. numPipes = atoi(argv[1]);
    18. numWrites = (argc > 2) ? atoi(argv[2]) : 1;
    19. pfds = (int(*)[2])calloc(numPipes, sizeof(int [2]));
    20. if (pfds == NULL){
    21. perror("calloc");
    22. exit(EXIT_FAILURE);
    23. }
    24. pollFd = (struct pollfd *)calloc(numPipes, sizeof(struct pollfd));
    25. if (pollFd == NULL){
    26. perror("calloc");
    27. exit(EXIT_FAILURE);
    28. }
    29. /* Create the number of pipes specified on command line */
    30. for (j = 0; j < numPipes; j++)
    31. if (pipe(pfds[j]) == -1){
    32. printf("pipe %d", j);
    33. exit(EXIT_FAILURE);
    34. }
    35. /* Perform specified number of writes to random pipes */
    36. srandom((int) time(NULL));
    37. for (j = 0; j < numWrites; j++) {
    38. randPipe = random() % numPipes;
    39. printf("Writing to fd: %3d (read fd: %3d)\n",
    40. pfds[randPipe][1], pfds[randPipe][0]);
    41. if (write(pfds[randPipe][1], "a", 1) == -1){
    42. printf("write %d", pfds[randPipe][1]);
    43. exit(EXIT_FAILURE);
    44. }
    45. }
    46. /* Build the file descriptor list to be supplied to poll(). This list
    47. is set to contain the file descriptors for the read ends of all of
    48. the pipes. */
    49. for (j = 0; j < numPipes; j++) {
    50. pollFd[j].fd = pfds[j][0];
    51. pollFd[j].events = POLLIN;
    52. }
    53. ready = poll(pollFd, numPipes, 0);
    54. if (ready == -1){
    55. printf("poll");
    56. exit(EXIT_FAILURE);
    57. }
    58. printf("poll() returned: %d\n", ready);
    59. /* Check which pipes have data available for reading */
    60. for (j = 0; j < numPipes; j++)
    61. if (pollFd[j].revents & POLLIN)
    62. printf("Readable: %3d\n", pollFd[j].fd);
    63. exit(EXIT_SUCCESS);
    64. }

    程序的命令行参数指定了应该创建 10 个管道,而写操作应该随机选择其中的 3 个管道
    在这里插入图片描述
    从上面的输出我们可知 poll()发现3个管道上有数据可读取。

    实例(待补充)

    1. #define INIT_SIZE 128
    2. int main(int argc, char **argv) {
    3. int listen_fd, connected_fd;
    4. int ready_number;
    5. ssize_t n;
    6. char buf[MAXLINE];
    7. struct sockaddr_in client_addr;
    8. listen_fd = tcp_server_listen(SERV_PORT);
    9. // 初始化 pollfd 数组,这个数组的第一个元素是 listen_fd,其余的用来记录将要连接的 connect_fd
    10. struct pollfd event_set[INIT_SIZE];
    11. event_set[0].fd = listen_fd;
    12. event_set[0].events = POLLRDNORM; // 期望系统内核检测监听套接字上的连接建立完成事件。
    13. // 用 -1 表示这个数组位置还没有被占用
    14. int i;
    15. for (i = 1; i < INIT_SIZE; i++) {
    16. event_set[i].fd = -1; // 如果对应 pollfd 里的文件描述字 fd 为负数,poll 函数将会忽略这个 pollfd
    17. }
    18. for (;;) {
    19. if ((ready_number = poll(event_set, INIT_SIZE, -1)) < 0) { //这里之所以传入 INIT_SIZE,是因为 poll 函数已经能保证可以自动忽略 fd 为 -1 的 pollfd,否则我们每次都需要计算一下 event_size 里真正需要被检测的元素大小;timeout 设置为 -1,表示在 I/O 事件发生之前 poll 调用一直阻塞。
    20. error(1, errno, "poll failed ");
    21. }
    22. if (event_set[0].revents & POLLRDNORM) { // 如果系统内核检测到监听套接字上的连接建立事件
    23. socklen_t client_len = sizeof(client_addr);
    24. connected_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_len); // 调用 accept 函数获取了连接描述字
    25. // 找到一个可以记录该连接套接字的位置
    26. for (i = 1; i < INIT_SIZE; i++) {
    27. if (event_set[i].fd < 0) {
    28. event_set[i].fd = connected_fd; //把连接描述字 connect_fd 也加入到 event_set
    29. event_set[i].events = POLLRDNORM; // 说明我们感兴趣的事件类型为 POLLRDNORM,也就是套集字上有数据可以读
    30. break;
    31. }
    32. }
    33. if (i == INIT_SIZE) { // 如果在数组里找不到这样一个位置,说明我们的 event_set 已经被很多连接充满了,没有办法接收更多的连接了
    34. error(1, errno, "can not hold so many clients");
    35. }
    36. if (--ready_number <= 0) // 因为 poll 返回的一个整数,说明了这次 I/O 事件描述符的个数,如果处理完监听套接字之后,就已经完成了这次 I/O 复用所要处理的事情
    37. continue; // 那么我们就可以跳过后面的处理,再次进入 poll 调用。
    38. }
    39. // 接下来的循环处理是查看 event_set 里面其他的事件,也就是已连接套接字的可读事件。这是通过遍历 event_set 数组来完成的。
    40. for (i = 1; i < INIT_SIZE; i++) {
    41. int socket_fd;
    42. if ((socket_fd = event_set[i].fd) < 0) // 如果数组里的 pollfd 的 fd 为 -1,说明这个 pollfd 没有递交有效的检测,直接跳过;
    43. continue;
    44. if (event_set[i].revents & (POLLRDNORM | POLLERR)) { // 通过检测 revents 的事件类型是 POLLRDNORM 或者 POLLERR,我们可以进行读操作
    45. if ((n = read(socket_fd, buf, MAXLINE)) > 0) { // 读取数据正常之后,再通过 write 操作回显给客户端
    46. if (write(socket_fd, buf, n) < 0) {
    47. error(1, errno, "write error");
    48. }
    49. } else if (n == 0 || errno == ECONNRESET) { // 如果读到 EOF 或者是连接重置,则关闭这个连接,并且把 event_set 对应的 pollfd 重置
    50. close(socket_fd);
    51. event_set[i].fd = -1;
    52. } else {
    53. error(1, errno, "read error"); // 读取数据失败。
    54. }
    55. if (--ready_number <= 0) // 判断如果事件已经被完全处理完之后,直接跳过对 event_set 的循环处理,再次来到 poll 调用。
    56. break;
    57. }
    58. }
    59. }
    60. }

    实验

    我们启动这个服务器程序,然后通过 telnet 连接到这个服务器程序。为了检验这个服务器程序的 I/O 复用能力,我们可以多开几个 telnet 客户端,并且在屏幕上输入各种字符串。

    客户端 1:

    1. $telnet 127.0.0.1 43211
    2. Trying 127.0.0.1...
    3. Connected to 127.0.0.1.
    4. Escape character is '^]'.
    5. a
    6. a
    7. aaaaaaaaaaa
    8. aaaaaaaaaaa
    9. afafasfa
    10. afafasfa
    11. fbaa
    12. fbaa
    13. ^]
    14. telnet> quit
    15. Connection closed.

    客户端 2:

    1. telnet 127.0.0.1 43211
    2. Trying 127.0.0.1...
    3. Connected to 127.0.0.1.
    4. Escape character is '^]'.
    5. b
    6. b
    7. bbbbbbb
    8. bbbbbbb
    9. bbbbbbb
    10. bbbbbbb
    11. ^]
    12. telnet> quit
    13. Connection closed.

    可以看到,这两个客户端互不影响,每个客户端输入的字符很快会被回显到客户端屏幕上。一个客户端断开连接,也不会影响到其他客户端。

    poll与socket编程

    向套接字读数据

    1. int acl_socket_read(void *buf, size_t size) const{
    2. return read(sock_, buf, size);
    3. }
    1. /**
    2. * 从套接字读数据
    3. * @param buf {void*} 内存缓冲区地址
    4. * @param size {size_t} buf 缓冲区大小
    5. * @param timeout {size_t} 读超时时间(秒)
    6. * @return {int} 0: OK; -1: error
    7. */
    8. int ACL_VSTREAM::acl_socket_read(void *buf, size_t size, int timeout)
    9. {
    10. if (read_ready_) { // [读]已经准备好了
    11. read_ready_ = 0;
    12. } else if (timeout > 0 && socket_->acl_read_wait(timeout) < 0) {
    13. return -1;
    14. }
    15. return socket_->acl_socket_read(buf, size);
    16. }
    17. /**
    18. * 将数据读入至指定的缓冲区中
    19. * @param fp {ACL_VSTREAM*}
    20. * @param buf {void*} 目标缓冲区
    21. * @param size {size_t} buf 缓冲区大小
    22. * @return {int} 返回读到的数据,返回值如下:
    23. * > 0 当前读到缓冲区中的数据长度
    24. * == 0 对端连接关闭
    25. * < 0 在阻塞方式下表示读出错,采用非阻塞方式时也会返回 -1
    26. */
    27. int ACL_VSTREAM::read_to_buffer(void *buf, size_t size)
    28. {
    29. int read_cnt, nagain = 0;
    30. if (socket_->get_socket() == ACL_SOCKET_INVALID) {
    31. read_ready_ = 0;
    32. return -1;
    33. }
    34. AGAIN:
    35. // /* 清除系统错误号 */
    36. errno = (0);
    37. read_cnt = acl_socket_read(buf, size, rw_timeout_);
    38. //
    39. if (read_cnt > 0) {
    40. // flag_ &= ~ACL_VSTREAM_FLAG_BAD;
    41. errnum_ = 0;
    42. total_read_cnt_ += read_cnt;
    43. return read_cnt;
    44. } else if (read_cnt == 0) {
    45. flag_ = ACL_VSTREAM_FLAG_EOF;
    46. errnum_ = 0;
    47. return 0;
    48. }
    49. printf("-------------%d", errno);
    50. errnum_ = errno;
    51. if (errnum_ == EINTR) {
    52. if (nagain++ < 5) {
    53. goto AGAIN;
    54. }
    55. logger_error("[errno] %s(%d), %s: nagain: %d too much, fd: %d",
    56. __FILE__, __LINE__, __FUNCTION__, nagain, socket_->get_socket());
    57. } else if (errnum_ == ETIMEDOUT) {
    58. flag_ |= ACL_VSTREAM_FLAG_TIMEOUT;
    59. errno = ETIMEDOUT;
    60. } else if (errnum_ != EWOULDBLOCK) {
    61. flag_ |= ACL_VSTREAM_FLAG_ERR;
    62. }
    63. return -1;
    64. }
    65. int ACL_VSTREAM::read_buffed()
    66. {
    67. int n;
    68. read_ptr_ = read_buf_; // 当前缓冲区位置指向读取缓冲区的开头
    69. n = read_to_buffer(read_buf_, (size_t) read_buf_len_);
    70. read_cnt_ = n > 0 ? n : 0;
    71. return n;
    72. }
    1. /**
    2. * 将缓冲区内的数据拷贝到 vptr 中
    3. * @param vptr {void*} 用户的数据缓冲区指针地址
    4. * @param maxlen {size_t} vptr 数据缓冲区的空间大小
    5. * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示出错, 应该关闭本地数据流,
    6. * ret >= 0: 成功从 fp 数据流的缓冲区中读取了 ret 个字节的数据
    7. */
    8. int acl_vstream_bfcp_some(void *vptr, size_t maxlen){
    9. int n;
    10. /* input params error */
    11. if ( vptr == NULL || maxlen == 0) {
    12. printf("[errno] %s, %s(%d): input error, vptr %s, "
    13. "maxlen %d", __FUNCTION__, __FILE__, __LINE__,
    14. vptr ? "not null" : "null",
    15. (int) maxlen);
    16. return ACL_VSTREAM_EOF;
    17. }
    18. /* internal fatal error */
    19. if (read_cnt_ < 0) {
    20. printf("[errno] %s, %s(%d): read_cnt(=%d) < 0",
    21. __FUNCTION__, __FILE__, __LINE__, (int) read_cnt_);
    22. return ACL_VSTREAM_EOF;
    23. }
    24. /* there is no any data in buf */
    25. if (read_cnt_== 0) {
    26. read_ptr_ = read_buf_; // 重新将当前指针指向缓冲区的开头
    27. return 0;
    28. }
    29. if (read_ptr_ >= read_buf_ + (int) read_buf_len_) { // 当前指针位置 超出 读取缓冲区的区域
    30. read_cnt_ = 0;
    31. read_ptr_ = read_buf_;
    32. return 0;
    33. }
    34. n = (int) read_cnt_ > (int) maxlen ? (int) maxlen : (int) read_cnt_;
    35. memcpy(vptr, read_ptr_, n);
    36. read_cnt_ -= n;
    37. offset_ += n;
    38. if (read_cnt_ == 0) {
    39. read_ptr_ = read_buf_;
    40. } else {
    41. read_ptr_ += n;
    42. }
    43. return n;
    44. }
    1. int ACL_VSTREAM::acl_vstream_getc()
    2. {
    3. if (read_cnt_ <= 0 && read_buffed() <= 0) { // 如果当前读取到的数据为空,那么就去socket流中将数据读取到缓冲区中
    4. return ACL_VSTREAM_EOF;
    5. }
    6. // 本次读取成功或者上一次有剩余的
    7. read_cnt_--; // 可用数据减少1
    8. offset_++;
    9. return *read_ptr_++;
    10. }
    11. /**
    12. * 从数据流中读取一行数据, 直到读到 "\n" 或读结束为止, 正常情况下包括 "\n"
    13. * @param fp {ACL_VSTREAM*} 数据流
    14. * @param vptr {void*} 用户所给的内存缓冲区指针
    15. * @param maxlen {size_t} vptr 缓冲区的大小
    16. * @return ret {int}, ret == ACL_VSTREAM_EOF: 读出错或对方关闭了连接,
    17. * 应该关闭本地数据流; n > 0: 读到 了 n 个字节的数据, 如果该 n 个数据
    18. * 的最后一个非 0 字符为 "\n" 表明读到了一个完整的行, 否则表明读到了 n
    19. * 个数据但对方未发送 "\n" 就关闭了连接; 还可以通过检查
    20. * (fp->flag & ACL_VSTREAM_FLAG_TAGYES)
    21. * 不等于 0 来判断是否读到了 "\n", 如果非 0 则表示读到了 "\n".
    22. */
    23. int ACL_VSTREAM::acl_vstream_gets(void *vptr, size_t maxlen){
    24. if (vptr == NULL || maxlen <= 0) {
    25. logger_error("%s(%d), %s: vptr %s, maxlen %d",
    26. __FILE__, __LINE__, __FUNCTION__,
    27. vptr ? "not null" : "null", (int) maxlen);
    28. return ACL_VSTREAM_EOF;
    29. }
    30. int n, ch;
    31. unsigned char *ptr = (unsigned char *) vptr;
    32. for ( n = 1; n < (int) maxlen; n++) {
    33. /* left one byte for '\0' */
    34. ch = acl_vstream_getc();
    35. if (ch == ACL_VSTREAM_EOF) {
    36. if (n == 1) { /* EOF, nodata read */
    37. return ACL_VSTREAM_EOF;
    38. }
    39. break; /* EOF, some data was read */
    40. }
    41. *ptr++ = ch;
    42. if (ch == '\n') {
    43. /* newline is stored, like fgets() */
    44. //fp->flag |= ACL_VSTREAM_FLAG_TAGYES;
    45. break;
    46. }
    47. }
    48. /* null terminate like fgets() */
    49. *ptr = 0;
    50. return n;
    51. }
    52. /**
    53. * 从数据流中读取一行数据, 直到读到 "\n" 或读结束为止, 返回的结果中不包括 "\n"
    54. * @param vptr {void*} 用户所给的内存缓冲区指针
    55. * @param maxlen {size_t} vptr 缓冲区的大小
    56. * @return ret {int}, ret == ACL_VSTREAM_EOF: 读出错或对方关闭了连接,
    57. * 应该关闭本地数据流, n == 0: 读到了一行数据, 但该行数据仅有 "\r\n",
    58. * n > 0: 读到 了 n 个字节的数据.
    59. */
    60. int ACL_VSTREAM::acl_vstream_gets_nonl(void *vptr, size_t maxlen){
    61. if (vptr == NULL || maxlen <= 0) {
    62. logger_error("[error] %s(%d), %s: vptr %s, maxlen %d",
    63. __FILE__, __LINE__, __FUNCTION__,
    64. vptr ? "not null" : "null", (int) maxlen);
    65. return ACL_VSTREAM_EOF;
    66. }
    67. int n, ch;
    68. unsigned char *ptr;
    69. ptr = (unsigned char *) vptr;
    70. for (n = 1; n < (int) maxlen; n++) {
    71. ch = acl_vstream_getc();
    72. if (ch == ACL_VSTREAM_EOF) {
    73. if (n == 1) /* EOF, nodata read */
    74. return ACL_VSTREAM_EOF;
    75. break; /* EOF, some data was read */
    76. }
    77. *ptr++ = ch;
    78. if (ch == '\n') {
    79. break; /* newline is stored, like fgets() */
    80. }
    81. }
    82. *ptr = 0; /* null terminate like fgets() */
    83. ptr--;
    84. while (ptr >= (unsigned char *) vptr) {
    85. if (*ptr != '\r' && *ptr != '\n')
    86. break;
    87. *ptr-- = 0;
    88. n--;
    89. }
    90. return n;
    91. }
    1. /**
    2. * 从数据流中一次性读取 n 个数据, 该 n 有可能会小于用户所需要的 maxlen
    3. * @param fp {ACL_VSTREAM*} 数据流
    4. * @param vptr {void*} 用户的数据缓冲区指针地址
    5. * @param maxlen {size_t} vptr 数据缓冲区的空间大小
    6. * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示出错, 应该关闭本地数据流,
    7. * ret > 0: 表示读到了 ret 个字节的数据
    8. * 注: 如果缓冲区内有数据, 则直接把缓冲区内的数据复制到用户的缓冲区然后直接返回;
    9. * 如果缓冲区内无数据, 则需要调用系统读操作(有可能会阻塞在系统读操作上), 该
    10. * 次调用返回后则把读到数据复制到用户缓冲区返回.
    11. * 在这两种情况下都不能保证读到的字节数等于所要求的字节数, 若想读到所要求的
    12. * 字节后才返回则请调用 vstream_loop_readn() 函数.
    13. */
    14. int acl_vstream_read(void *buf, size_t buf_size){
    15. if (buf == NULL || buf_size == 0) {
    16. logger_error("[errno] %s(%d): buf: %s, size: %d",
    17. __FUNCTION__, __LINE__, buf ? "not null" : "null", (int) buf_size);
    18. return ACL_VSTREAM_EOF;
    19. }
    20. if (read_cnt_ < 0) { // 当前read_buf_还剩下的字节数
    21. logger_error("[errno] %s, %s(%d): read_cnt(%d) < 0",
    22. __FUNCTION__, __FILE__, __LINE__, (int) read_cnt_);
    23. return ACL_VSTREAM_EOF;
    24. }
    25. if (read_cnt_ > 0) {
    26. return acl_vstream_bfcp_some((unsigned char*) buf, buf_size);
    27. }
    28. /* fp->read_cnt == 0 */
    29. /* 当缓冲区较大时,则直接将数据读到该缓冲区从而避免大数据拷贝 */
    30. if (buf_size >= (size_t) read_buf_len_ / 4) {
    31. int n = this->read_to_buffer(buf, buf_size);
    32. return n <= 0 ? ACL_VSTREAM_EOF : n;
    33. }else{
    34. /* 否则将数据读到流缓冲区中,然后再拷贝,从而减少 read 次数 */
    35. int n = this->read_buffed();
    36. if (n <= 0) {
    37. return ACL_VSTREAM_EOF;
    38. }
    39. return this->acl_vstream_bfcp_some((unsigned char*) buf, buf_size);
    40. }
    41. }
    42. /**
    43. * 循环读取 maxlen 个数据, 直到读到 maxlen 个字节为止或读出错
    44. * @param fp {ACL_VSTREAM*} 数据流
    45. * @param vptr {void*} 用户的数据缓冲区指针地址
    46. * @param maxlen {size_t} vptr 数据缓冲区的空间大小
    47. * @return ret {int}, ret == ACL_VSTREAM_EOF: 读出错或对方关闭了连接, 应该
    48. * 关闭本地数据流 n > 0: 成功读取了 maxlen 个字节的数据
    49. * 如果实际读取的字节数与 maxlen 不相等也返回错误(ACL_VSTREAM_EOF)
    50. */
    51. int ACL_VSTREAM::acl_vstream_readn( void *buf, size_t size){
    52. if ( buf == NULL || size == 0) {
    53. logger_error("%s(%d): buf %s, size %d",
    54. __FUNCTION__, __LINE__, buf ? "not null" : "null", (int) size);
    55. return ACL_VSTREAM_EOF;
    56. }
    57. size_t size_saved = size;
    58. unsigned char *ptr = (unsigned char*) buf;
    59. int n;
    60. /* 如果缓冲区中有上次读残留数据时,则优先将其拷贝至目标缓冲区 */
    61. if (read_cnt_ > 0) {
    62. n = acl_vstream_bfcp_some(ptr, size);
    63. ptr += n;
    64. size -= n;
    65. if (size == 0) {
    66. return (int) size_saved;
    67. }
    68. }
    69. /* 为减少 read 次数,当输入缓冲区较小时,则自动启用双缓冲读方式 */
    70. if (size_saved < (size_t) read_buf_len_ / 4) {
    71. while (size > 0) {
    72. if (read_buffed() <= 0) {
    73. return ACL_VSTREAM_EOF;
    74. }
    75. n = acl_vstream_bfcp_some( ptr, size);
    76. ptr += n;
    77. size -= n;
    78. }
    79. }
    80. /* 否则,则直接将读到的数据存入缓冲区,从而避免大数据的二次拷贝 */
    81. else {
    82. while (size > 0) {
    83. n = read_to_buffer( ptr, size);
    84. if (n <= 0) {
    85. return ACL_VSTREAM_EOF;
    86. }
    87. size -= n;
    88. ptr += n;
    89. }
    90. }
    91. return (int) size_saved;
    92. }

    向套接字写数据

    1. /**
    2. * 向套接字写数据
    3. * @param buf {void*} 数据地址
    4. * @param size {size_t} buf 数据大小
    5. * @param timeout {int} 写超时时间(秒)
    6. * @return {int} 0: OK; -1: error
    7. */
    8. int acl_socket::acl_socket_write(const void *buf, size_t size, int timeout)
    9. {
    10. int ret;
    11. ret = (int) write(sock_, buf, size);
    12. if (ret > 0) {
    13. return ret;
    14. }
    15. if (timeout <= 0) {
    16. return ret;
    17. }
    18. // 写数据时,若一次发送的数据超过TCP发送缓冲区,则返EAGAIN/EWOULDBLOCK,表示数据没有发送完,这个时候一定要继续等待可写事件发生!!!!
    19. if (errno != EWOULDBLOCK) {
    20. return ret;
    21. }
    22. if (this->acl_write_wait(timeout) < 0) {
    23. return -1;
    24. }
    25. ret = write(sock_, buf, size);
    26. return ret;
    27. }
    28. int acl_socket::acl_write_wait(int timeout)
    29. {
    30. return acl_write_wait_ms(timeout * 1000);
    31. }
    32. /**
    33. * 写等待操作,直到套接字可写、出错或超时
    34. * @param timeout {int} 超时时间,单位为毫秒,该值分下面三种情形:
    35. * > 0 : 表示最大超时时间的秒数,
    36. * == 0 : 表示不等待,检测完后立即返回
    37. * < 0 : 时表示直接该套接字可读或出错为止
    38. * @return {int} 0: 可写; -1: 失败或超时
    39. */
    40. int acl_socket::acl_write_wait_ms(int timeout)
    41. {
    42. const char *myname = "acl_write_wait";
    43. struct pollfd fds;
    44. int delay = timeout;
    45. fds.events = POLLOUT; // 对可写事件感兴趣
    46. fds.revents = 0; // 清空返回值
    47. fds.fd = this->sock_; //监听当前socket
    48. for (;;) {
    49. switch (poll(&fds, 1, delay)) {
    50. case -1:
    51. if (errno == EINTR) {
    52. continue; //继续poll
    53. }
    54. logger_error("%s(%d), %s: poll error(%s), fd: %d",
    55. __FILE__, __LINE__, myname,
    56. strerror(errno), (int) this->sock_);
    57. return -1;
    58. case 0:
    59. errno = ETIMEDOUT;
    60. logger_error(" %s(%d), %s: poll return 0",
    61. __FILE__, __LINE__, myname);
    62. return -1; //超时就直接返回
    63. default:
    64. if (fds.revents & POLLOUT) {
    65. return 0; //变得可写了
    66. }
    67. if (!(fds.revents & (POLLHUP | POLLERR | POLLNVAL))) {
    68. logger_error("%s(%d), %s: error: %s, fd: %d",
    69. __FILE__, __LINE__, myname,
    70. strerror(errno), this->sock_);
    71. return -1; //出现错误
    72. }
    73. logger_warn(" %s(%d), %s: %s, revents=%d, %d, %d, %d",
    74. __FILE__, __LINE__, myname,
    75. strerror(errno), fds.revents,
    76. fds.revents & POLLHUP,
    77. fds.revents& POLLERR,
    78. fds.revents& POLLNVAL);
    79. return 0;
    80. }
    81. }
    82. }
    1. int ACL_VSTREAM::write_once(const void *vptr, int dlen)
    2. {
    3. int n, neintr = 0;
    4. if (vptr == NULL || dlen <= 0) {
    5. if (vptr == NULL) {
    6. printf("[error] %s, %s(%d): vptr null",
    7. __FUNCTION__, __FILE__, __LINE__);
    8. }
    9. if (dlen <= 0) {
    10. printf("[error] %s, %s(%d): dlen(%d) <= 0",
    11. __FUNCTION__, __FILE__, __LINE__, dlen);
    12. }
    13. errnum_ = EINVAL;
    14. return ACL_VSTREAM_EOF;
    15. }
    16. if (socket_->get_socket() == ACL_SOCKET_INVALID) {
    17. printf("[error] %s, %s(%d): sockfd invalid",
    18. __FUNCTION__, __FILE__, __LINE__);
    19. errnum_ = EINVAL;
    20. return ACL_VSTREAM_EOF;
    21. }
    22. TAG_AGAIN:
    23. /* 清除系统错误号 */
    24. errno = 0;
    25. n = socket_->acl_socket_write(vptr, dlen,0);
    26. if (n > 0) {
    27. total_write_cnt_ += n;
    28. return n;
    29. }
    30. errnum_ = errno;
    31. if (errnum_ == EINVAL) {
    32. if (++neintr >= 5) {
    33. flag_ |= ACL_VSTREAM_FLAG_ERR;
    34. return ACL_VSTREAM_EOF;
    35. }
    36. goto TAG_AGAIN;
    37. }
    38. if (get_errnum() == EWOULDBLOCK) {
    39. errno = EAGAIN;
    40. } else if (get_errnum() == ETIMEDOUT) {
    41. flag_ |= ACL_VSTREAM_FLAG_TIMEOUT;
    42. } else {
    43. flag_ |= ACL_VSTREAM_FLAG_ERR;
    44. }
    45. return ACL_VSTREAM_EOF;
    46. }
    47. int ACL_VSTREAM::loop_writen(const void *vptr, size_t size)
    48. {
    49. const unsigned char *ptr = (const unsigned char *) vptr;
    50. int once_dlen = 64 * 1024 * 1024; /* xxx: 以 64KB 为单位写 */
    51. int nleft = (int) size, n, len;
    52. while (nleft > 0) {
    53. len = nleft > once_dlen ? once_dlen : nleft;
    54. n = write_once(ptr, len);
    55. if (n < 0)
    56. return ACL_VSTREAM_EOF;
    57. nleft -= n;
    58. ptr += n;
    59. }
    60. return (int) (ptr - (const unsigned char *) vptr);
    61. }
    62. /**
    63. * 循环向数据流中写 dlen 个字节的数据直至写完或出错为止
    64. * @param vptr {const char*} 数据区指针地址
    65. * @param dlen {size_t} 待写的数据区数据长度
    66. * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示写出错, 应该关闭本地数据流,
    67. * ret > 0: 表示成功写了 dlen 个字节的数据
    68. */
    69. int ACL_VSTREAM::acl_vstream_writen(const void *vptr, size_t dlen){
    70. if (vptr == NULL || dlen == 0) {
    71. printf("[errno] %s(%d), %s: vptr %s, dlen %d", __FILE__,
    72. __LINE__, __FUNCTION__, vptr ? "not null" : "null", (int) dlen);
    73. return ACL_VSTREAM_EOF;
    74. }
    75. return loop_writen(vptr, dlen);
    76. }
    77. /**
    78. * 带格式的流输出, 类似于 vfprintf()
    79. * @param fmt {const char*} 数据格式
    80. * @param ap {va_list}
    81. * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示写出错, 应该关闭本地数据流,
    82. * ret > 0: 表示成功写了 dlen 个字节的数据
    83. */
    84. int ACL_VSTREAM::acl_vstream_vfprintf( const char *fmt, va_list ap)
    85. {
    86. if (fmt == NULL || *fmt == 0) {
    87. printf("[errno] %s, %s(%d): fmt %s", __FUNCTION__,
    88. __FILE__, __LINE__,
    89. fmt && *fmt ? "not null" : "null");
    90. return ACL_VSTREAM_EOF;
    91. }
    92. #define ACL_VSTREAM_BUFSIZE 4096
    93. char buffer[ACL_VSTREAM_BUFSIZE];
    94. memset(buffer, 0, ACL_VSTREAM_BUFSIZE);
    95. int n = vsprintf(buffer, fmt, ap);
    96. if (n <= 0 ) {
    97. printf("[fatal] %s, %s(%d): len(%d) <= 0",
    98. __FUNCTION__, __FILE__, __LINE__, n);
    99. exit(0);
    100. }else if(n > ACL_VSTREAM_BUFSIZE){
    101. printf("[fatal] %s, %s(%d): len(%d) > 4096",
    102. __FUNCTION__, __FILE__, __LINE__, n);
    103. exit(0);
    104. }
    105. n = acl_vstream_writen(buffer, n);
    106. return n;
    107. }
    108. /**
    109. * 带格式的流输出, 类似于 fprintf()
    110. * @param fmt {const char*} 数据格式
    111. * @param ... 变参序列
    112. * @return ret {int}, ret == ACL_VSTREAM_EOF: 表示写出错, 应该关闭本地数据流,
    113. * ret > 0: 表示成功写了 dlen 个字节的数据
    114. */
    115. int ACL_VSTREAM::acl_vstream_fprintf(const char *fmt, ...){
    116. if (fmt == NULL) {
    117. logger_error("[error] %s, %s(%d): input invalid",
    118. __FUNCTION__, __FILE__, __LINE__);
    119. return ACL_VSTREAM_EOF;
    120. }
    121. va_list ap;
    122. va_start(ap, fmt);
    123. int n = acl_vstream_vfprintf(fmt, ap);
    124. va_end(ap);
    125. return n;
    126. }

    1. #include <stdio.h>
    2. #include <string.h>
    3. #include <stdlib.h>
    4. #include <unistd.h>
    5. #include <sys/types.h>
    6. #include <sys/stat.h>
    7. #include <fcntl.h>
    8. #include <poll.h>
    9. #define KEY_ENTER 28
    10. //0、设计一个描述按键的数据的对象
    11. struct key_event{
    12. int code; //按键类型:home,esc,enter
    13. int value; //表状态,按下,松开
    14. };
    15. int main(int argc, char *argv[])
    16. {
    17. struct key_event event;
    18. int ret;
    19. char in_buf[128];
    20. int fd;
    21. fd = open("/dev/key0", O_RDWR);
    22. if(fd < 0)
    23. {
    24. perror("open");
    25. exit(1);
    26. }
    27. //监控多个文件fd
    28. struct pollfd pfd[2];
    29. pfd[0].fd = fd; //监控按键输入
    30. pfd[0].events = POLLIN;
    31. pfd[1].fd = 0; //标准输入:0,标准输出:1,标准出错:2
    32. pfd[1].events = POLLIN;
    33. while(1)
    34. {
    35. printf("-----------------start to poll--------------------\n");
    36. ret = poll(pfd, 2, -1); //在驱动中fops要实现poll接口
    37. if(ret > 0)
    38. {
    39. //表示2fd中至少一个发生读事件
    40. if(pfd[0].revents & POLLIN) //revents用于判断,会由内核自动填充
    41. {
    42. read(pfd[0].fd, &event, sizeof(struct key_event)); //每次读必有数据
    43. if(event.code == KEY_ENTER)
    44. {
    45. if(event.value)
    46. {
    47. printf("APP__ key enter down\n");
    48. }else{
    49. printf("APP__ key enter up\n");
    50. }
    51. }
    52. }
    53. if(pfd[1].revents & POLLIN)
    54. {
    55. fgets(in_buf, 128, stdin); //从标准输入中获取128字节存入in_buf
    56. printf("in_buf = %s\n",in_buf);
    57. }
    58. }
    59. else{
    60. perror("poll");
    61. exit(1);
    62. }
    63. printf("----------------------End poll----------------------\n");
    64. }
    65. close(pfd[0].fd);
    66. return 0;
    67. }

    简单来说,select/poll能监听多个设备的文件描述符,只要有任何一个设备满足条件,select/poll就会返回,否则将进行睡眠等待。看起来,select/poll像是一个管家了,统一负责来监听处理了。

    已经迫不及待来看看原理了,由于底层的机制大体差不多,我将选择select来做进一步分析。

  • 相关阅读:
    Postman持久化保存/设置断言详解
    当你碰到了MySQL中的死锁,你了解这些机制吗?
    Java 多线程 —— 内存合并
    多线程知识点总结之温故而知新
    计算机毕业设计ssm出租车预约系统o8i8r系统+程序+源码+lw+远程部署
    SAP UI5 应用开发教程之一百零三 - 如何在 SAP UI5 应用中消费第三方库试读版
    基于GPU的kokkos加速安装
    代码随想录算法训练营第二十一天 | LeetCode 235. 二叉搜索树的最近公共祖先、701. 二叉搜索树中的插入操作、450. 删除二叉搜索树中的节点
    公司实战 ElasticSearch+Kafka+Redis+MySQL
    脚踏实地,步步高升丨吴高升学长采访录
  • 原文地址:https://blog.csdn.net/u012294613/article/details/126787420