• Linux之epoll理解


    IO多路复用有几种实现方式:select poll和epoll。本篇文章对epoll进行总结理解。

    IO多路复用的含义,我个人的理解是通过一个线程实现对多个socket的侦听,epoll与select和poll的区别是epoll效率最高。select的最高管理1024个socket并且是通过轮询的方式实现的管理,管理的socket个数越多,耗时越长而epoll则没有1024这个限制并且不是通过轮询的方式实现,这也是epoll应用于高并发的场景的原因所在。

    epoll是一种IO事件通知机制。

    IO多路复用不同实现方式对比
    selectpollepoll
    性能随着连接数的增加,性能急剧下降,处理成千上万的并发连接数时,性能很差随着连接数的增加,性能急剧下降,处理成千上万的并发连接数时,性能很差随着连接数的增加,性能基本没有变化
    连接数一般1024无限制无限制
    内存拷贝每次调用select拷贝每次调用poll拷贝fd首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝
    数据结构bitmap数组红黑树
    内在处理机制线性轮询线性轮询FD挂在红黑树,通过事件回调callback
    时间复杂度O(n)O(n)O(log(n))

    epoll是IO多路复用的一种实现方式,也是目前主流的高并发实现方案。

    epoll的作用

    经常看到epoll的作用,也知道他是IO多路复用的一种实现形式,但是由于过往经历使用select比较多,对epoll总是知其然,而不知其所以然。

    epoll主要用于对socket进行侦听,实现一个线程对多个socket的管理,相对于select和poll能够有效的减少系统开销,性能稳定

    epoll的API接口

    1. int epoll_create(int size);
    2. 功能:该函数生成一个 epoll 专用的文件描述符。
    3. 参数size: 用来告诉内核这个监听的数目一共有多大,参数 size 并不是限制了 epoll 所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。自从 linux 2.6.8 之后,size 参数是被忽略的,也就是说可以填只有大于 0 的任意值。返回值:如果成功,返回poll 专用的文件描述符,否者失败,返回-1
    1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    2. 功能:epoll 的事件注册函数,它不同于 select() 是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。参数epfd: epoll 专用的文件描述符,epoll_create()的返回值参数op: 表示动作,用三个宏来表示:
    3. EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
    4. EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
    5. EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
    6. 参数fd: 需要监听的文件描述符参数event: 告诉内核要监听什么事件,struct epoll_event 结构如:events 可以是以下几个宏的集合:
    7. EPOLLIN :表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭);
    8. EPOLLOUT:表示对应的文件描述符可以写;
    9. EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    10. EPOLLERR:表示对应的文件描述符发生错误;
    11. EPOLLHUP:表示对应的文件描述符被挂断;
    12. EPOLLET :将 EPOLL 设为边缘触发(Edge Trigger)模式,这是相对于水平触发(Level Trigger)来说的。
    13. EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里返回值:0表示成功,-1表示失败。
    1. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
    2. 功能:等待事件的产生,收集在 epoll 监控的事件中已经发送的事件,类似于 select() 调用。参数epfd: epoll 专用的文件描述符,epoll_create()的返回值参数events: 分配好的 epoll_event 结构体数组,epoll 将会把发生的事件赋值到events 数组中(events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)。参数maxevents: maxevents 告之内核这个 events 有多少个 。参数timeout: 超时时间,单位为毫秒,为 -1 时,函数为阻塞。返回值:
    3. 如果成功,表示返回需要处理的事件数目
    4. 如果返回0,表示已超时
    5. 如果返回-1,表示失败

    epoll为什么高效?

    说到epoll为什么高效,还是要从IO多路复现的实现历史说起,IO多路复用的实现最初是select,然后select有几个问题:

    1. 默认的select实现管理的socket数量一般为1024,数量存在限制,虽然可以修改,但是需要重新编译内核
    2. 每次调用select接口,都会将侦听的fd的数组从用户态内存拷贝到内核态缓冲区;另外当有socket可读或者可写时也会将fd数组从内核态缓冲区拷贝至用户态内存。用户态至内核态或者内核态至用户态数据的拷贝,这样的拷贝对于资源的消耗是很大的。
    3. 无论是内核态还是用户态由于保存fd的是一个数组,都需要通过轮询的方式遍历fd数组,找到可读或者可写的fd,当fd数量增大时,性能是下降的。

    select运行原理示意视频:

    select-CSDN直播

    针对select存在这样的问题,后续发展出了poll,但是poll相对于select的优化有限,仅仅只改善了select管理socket上线的问题,其余两点都没有进行优化。

    再往后就发展了出了epoll,epoll相对于select和poll出现了跨越式的改进,将select涉及的问题都做了响应的改进:

    1. 管理的socket无上限,而且是通过函数传参的形式指定管理的socket个数,而select是通过头文件中的FD_SIZE来指定的。不言而喻,通过函数传参的方式更灵活。
    2. epoll内部管理fd的数据结构是红黑树,查找、修改和删除的时间复杂度都很优秀。
    3. epoll_wait的每次调用不会向select调用一样,每次都会产生用户态到内核态的拷贝,从而减少资源消耗
    4. 当内核检测到某个fd的可读或者可写事件时,会自动调用该fd的poll回调函数,将该fd的信息拷贝到数组中
    5. epoll仅会将检测到可读可写的事件fd写入到数组中,传递到用户态内存中,这一点与select是不同的,select是要所有监听的fd的集合拷贝到用户区中。

    总结起来就是:

    • 管理的socket无上限
    • 用户态内存和内核缓冲区内存拷贝次数减少
    • 传递出的可读或者可写的事件仅包含这些可读可写的fd,这一点也是与select不同的,select传出的是所有fd的集合。

    epoll运行原理示意视频:

    epoll-CSDN直播

    epoll的触发方式

    epoll有两种触发方式,一种是水平触发,一种是边缘触发。

    • 水平触发,这种触发方式的含义是只要读缓冲区存在数据,epoll会一直提示该fd有可读事件;当为写缓冲区时,如果写缓冲区空间不满,则epoll_wait会提示用户该fd有可写事件。epoll默认的触发方式是水平触发。

    对于读操作,只要缓冲内容不为空,LT模式返回读就绪。

    对于写操作,只要缓冲区还不满,LT模式会返回写就绪。

    当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在尚没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你。如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。

    • 边缘触发,只有当缓冲区的状态发生变化的时候才会触发可读可写事件。例如读缓冲区内由无数据变为有数据,只有此种情况下才会触发可读事件,也就是说对于读缓冲区,读缓冲区从数据变为有数据,只会发送一次可读事件,至于读缓冲区内的事件是否读完不太关心,需要用户自己去处理;若为写缓冲区,写缓冲区由不可写入变为可以写入的情况下会触发可写事件,其余情况不会触发该事件。若要修改边沿触发模式,则需要调用epoll_ctl接口修改,在event参数中添加EPOLLET即可。

    对于读操作

    当缓冲区由不可读变为可读的时候,即缓冲区由空变为不空的时候。

    当有新数据到达时,即缓冲区中的待读数据变多的时候。

    当缓冲区有数据可读,且应用进程对相应的描述符进行EPOLL_CTL_MOD 修改EPOLLIN事件时。

    对于写操作

    当缓冲区由不可写变为可写时。

    当有旧数据被发送走,即缓冲区中的内容变少的时候。

    当缓冲区有空间可写,且应用进程对相应的描述符进行EPOLL_CTL_MOD 修改EPOLLOUT事件时。

    当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。

    在ET模式下, 缓冲区从不可读变成可读,会唤醒应用进程,缓冲区数据变少的情况,则不会再唤醒应用进程。

     对于水平触发和边缘触发更形象的解释:

    水平触发:0为无数据,1为有数据。缓冲区有数据则一直为1,则一直触发。

    边缘触发发:0为无数据,1为有数据,只要在0变到1的上升沿才触发。

    JDK并没有实现边缘触发,Netty重新实现了epoll机制,采用边缘触发方式;另外像Nginx也采用边缘触发。

    JDK在Linux已经默认使用epoll方式,但是JDK的epoll采用的是水平触发,而Netty重新实现了epoll机制,采用边缘触发方式,netty epoll transport 暴露了更多的nio没有的配置参数,如 TCP_CORK, SO_REUSEADDR等等;另外像Nginx也采用边缘触发。

    1、对于非阻塞socket,如果epoll使用边缘模式检测事件可读,那么一旦触发,一定要一次性把socket上数据收取干净,即循环调用recv函数直到recv出错 

    1. bool recvEtMode()
    2. {
    3. // 每次只收取256个字节
    4. char buf[256];
    5. while (true) {
    6. int nRecv = ::recv(clientfd, buf, 256, 0);
    7. if (nRecv == -1) {
    8. if (errno == EWOULDBLOCK) {
    9. return true;
    10. } else if (errno == EINTR) {
    11. continue;
    12. } else {
    13. return false;
    14. }
    15. }
    16. else if (nRecv == 0) {
    17. // 对端关闭了socket
    18. return false;
    19. } else {
    20. inputBuffer.add(buf, (size_t)nRecv);
    21. }
    22. }
    23. return true;
    24. }

     2、如果是水平模式,可以根据业务一次性收取固定字节数
    下面总结一下两者在编码上需要注意的地方:
    1、LT模式下,读事件触发后可以按需收取想要的字节数,不用把本次数据收取干净;
    ET模式下,读事件必须把数据收取干净,因为我们不一定再有机会收取数据了。
    2、LT模式下,不需要写事件时一定要及时移除,避免不必要地触发且浪费CPU资源。
    ET模式下,写事件触发后,如果还需要下一次的写事件触发来驱动任务(例如发送上次剩余的数据),则我们需要继续注册一次检测可写事件
    3、LT会导致多次触发,ET优点是触发次数少

    epoll代码运行实例

    1. /*************************************************************************
    2. # > File Name:server.c
    3. # > Author: Jay
    4. # > Mail: billysturate@gmail.com
    5. # > Created Time: Tue 08 Nov 2022 02:07:34 PM CST
    6. ************************************************************************/
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. #include
    13. #include
    14. #include
    15. #include
    16. #include
    17. #include
    18. #include
    19. #include
    20. #include
    21. using namespace std;
    22. typedef struct socketinfo
    23. {
    24. int fd; //要操作的文件描述符
    25. int epfd; //红黑树实例
    26. } SocketInfo;
    27. void *acceptConn(void *arg)
    28. {
    29. //printf("acception tid: %ld\n", pthread_self());
    30. cout<<"acception tid: "<<pthread_self()<
    31. SocketInfo *info = (SocketInfo *)arg;
    32. cout<<"acceptConn 1111111"<
    33. // 建立新的连接
    34. int cfd = accept(info->fd, NULL, NULL);
    35. cout<<"#########acceptConn cfd : "<
    36. // 将文件描述符设置为非阻塞
    37. // 得到文件描述符的属性
    38. int flag = fcntl(cfd, F_GETFL);
    39. flag |= O_NONBLOCK;
    40. fcntl(cfd, F_SETFL, flag);
    41. // 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
    42. // 通信的文件描述符检测读缓冲区数据的时候设置为边沿模式
    43. struct epoll_event ev;
    44. ev.events = EPOLLIN | EPOLLET; // 读缓冲区是否有数据
    45. ev.data.fd = cfd;
    46. int ret = epoll_ctl(info->epfd, EPOLL_CTL_ADD, cfd, &ev);
    47. if (ret == -1)
    48. {
    49. perror("epoll_ctl-accept");
    50. exit(0);
    51. }
    52. free(info);
    53. return NULL;
    54. }
    55. void *communication(void *arg)
    56. {
    57. printf("communication tid: %ld\n", pthread_self());
    58. SocketInfo *info = (SocketInfo *)arg;
    59. int curfd = info->fd;
    60. int epfd = info->epfd;
    61. // 处理通信的文件描述符
    62. // 接收数据
    63. char buf[5];
    64. char temp[1024];
    65. memset(buf, 0, sizeof(buf));
    66. bzero(temp, sizeof(temp));
    67. // 循环读数据
    68. while (1)
    69. {
    70. int len = recv(curfd, buf, sizeof(buf), 0);
    71. if (len == 0)
    72. {
    73. // 非阻塞模式下和阻塞模式是一样的 => 判断对方是否断开连接
    74. printf("客户端断开了连接...\n");
    75. // 将这个文件描述符从epoll模型中删除
    76. epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
    77. close(curfd);
    78. break;
    79. }
    80. else if (len > 0)
    81. {
    82. // 通信
    83. // 接收的数据打印到终端
    84. for (int i = 0; i < len; i++)
    85. {
    86. buf[i] = toupper(buf[i]);
    87. }
    88. strncat(temp + strlen(temp), buf, len);
    89. write(STDOUT_FILENO, buf, len);
    90. // 发送数据
    91. // send(curfd, buf, len, 0);
    92. }
    93. else
    94. {
    95. // len == -1
    96. if (errno == EAGAIN)
    97. {
    98. printf("数据读完了...\n");
    99. //发送数据
    100. send(curfd, temp, strlen(temp) + 1, 0);
    101. break;
    102. }
    103. else
    104. {
    105. perror("recv error");
    106. break;
    107. // exit(0); //不能exit因为会结束整个程序
    108. }
    109. }
    110. }
    111. free(info);
    112. return NULL;
    113. }
    114. // server
    115. int main(int argc, const char *argv[])
    116. {
    117. //printf("begin\n");
    118. std::cout<<"begin"<
    119. // 创建监听的套接字
    120. int lfd = socket(AF_INET, SOCK_STREAM, 0);
    121. printf("create socket = %d\n", lfd);
    122. if (lfd == -1)
    123. {
    124. perror("socket error");
    125. exit(1);
    126. }
    127. // 绑定
    128. struct sockaddr_in serv_addr;
    129. memset(&serv_addr, 0, sizeof(serv_addr));
    130. serv_addr.sin_family = AF_INET;
    131. serv_addr.sin_port = htons(9527);
    132. serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 本地多有的IP
    133. // 127.0.0.1
    134. // inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr.s_addr);
    135. // 设置端口复用
    136. int opt = 1;
    137. setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    138. // 绑定端口
    139. int ret = bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
    140. if (ret == -1)
    141. {
    142. perror("bind error");
    143. exit(1);
    144. }
    145. // 监听
    146. ret = listen(lfd, 64);
    147. if (ret == -1)
    148. {
    149. perror("listen error");
    150. exit(1);
    151. }
    152. printf("已完成初始化\n");
    153. // 现在只有监听的文件描述符
    154. // 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的epoll
    155. // 创建一个epoll模型
    156. int epfd = epoll_create(100);
    157. if (epfd == -1)
    158. {
    159. perror("epoll_create");
    160. exit(0);
    161. }
    162. // 往epoll实例中添加需要检测的节点, 现在只有监听的文件描述符
    163. struct epoll_event ev;
    164. ev.events = EPOLLIN; // 检测lfd读读缓冲区是否有数据
    165. ev.data.fd = lfd;
    166. ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
    167. if (ret == -1)
    168. {
    169. perror("epoll_ctl");
    170. exit(0);
    171. }
    172. struct epoll_event evs[1024];
    173. int size = sizeof(evs) / sizeof(struct epoll_event);
    174. // 持续检测
    175. while (1)
    176. {
    177. // 调用一次, 检测一次
    178. std::cout<<" epoll_wait "<
    179. int num = epoll_wait(epfd, evs, size, -1);
    180. //printf("==== num: %d\n", num);
    181. std::cout<<"==== num: "<
    182. pthread_t tid;
    183. for (int i = 0; i < num; ++i)
    184. {
    185. // 取出当前的文件描述符
    186. int curfd = evs[i].data.fd;
    187. SocketInfo *info = (SocketInfo *)malloc(sizeof(SocketInfo));
    188. info->fd = curfd;
    189. info->epfd = epfd;
    190. // 判断这个文件描述符是不是用于监听的
    191. printf("curfd = %d, lfd=%d\n", curfd, lfd);
    192. if (curfd == lfd)
    193. {
    194. pthread_create(&tid, NULL, acceptConn, (void *)info);
    195. pthread_detach(tid);
    196. }
    197. else
    198. {
    199. pthread_create(&tid, NULL, communication, (void *)info);
    200. pthread_detach(tid);
    201. }
    202. }
    203. }
    204. return 0;
    205. }

    客户端代码

    1. #include
    2. //int main(int argc, char *argv[])
    3. //{
    4. // QCoreApplication a(argc, argv);
    5. // return a.exec();
    6. //}
    7. /*************************************************************************
    8. # > File Name:client.c
    9. # > Author: Jay
    10. # > Mail: billysturate@gmail.com
    11. # > Created Time: Tue 08 Nov 2022 03:10:51 PM CST
    12. ************************************************************************/
    13. #include
    14. #include
    15. #include
    16. #include
    17. #include
    18. #include
    19. #define MAXLINE 80
    20. #define SERV_PORT 9527
    21. int main(int argc, char *argv[])
    22. {
    23. struct sockaddr_in servaddr;
    24. char buf[MAXLINE];
    25. int sockfd, n;
    26. sockfd = socket(AF_INET, SOCK_STREAM, 0);
    27. if(sockfd < 0)
    28. {
    29. perror("create failed");
    30. exit(1);
    31. }
    32. bzero(&servaddr, sizeof(servaddr));
    33. servaddr.sin_family = AF_INET;
    34. inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
    35. servaddr.sin_port = htons(SERV_PORT);
    36. int i = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    37. if (i < 0)
    38. {
    39. perror("connect failed");
    40. exit(1);
    41. }
    42. int num = 0;
    43. printf("服务器连接成功\n");
    44. while (1)
    45. {
    46. sprintf(buf, "hello, world, %d\n", num++);
    47. printf("send msg: %s\n", buf);
    48. write(sockfd, buf, strlen(buf) + 1);
    49. recv(sockfd, buf, sizeof(buf), 0);
    50. printf("recv msg:%s\n", buf);
    51. usleep(10000);
    52. }
    53. recv(sockfd, buf, sizeof(buf), 0);
    54. printf("-------recv msg:%s\n", buf);
    55. printf("-------over-----------\n");
    56. close(sockfd);
    57. return 0;
    58. }

    epoll使用步骤介绍

    1、创建侦听socket

    2、设置socket属性(可选)

    3、调用bind listen接口

    4、通过epoll_create接口创建epoll实例

    5、调用epoll_ctl接口注册侦听socket,注册可读事件

    6、在通过accept接口创建子socket后,再次通过epoll_ctl注册子socket的可读事件

    7、epoll_wait收到可读可写事件后,进行根据输出参数中的fd值或者事件类型进行区别操作,这里读写操作可以分开。

    上面的的例子都是注册的可读事件,下面给出一个即注册可读事件,又注册可写事件的例子:

    1. /**********************server***********************/
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #define SERVER_CONSUMER "./server_consumer"
    12. #define CLIENT_SERVER "./client_server"
    13. #define N_THREAD 3 //线程数
    14. #define DATA_SIZE 5 //数据大小
    15. #define BUFFER_SIZE 20 //缓冲区大小
    16. int fd[N_THREAD]; //server-consumer-pipe描述符
    17. //定义循环队列缓冲区
    18. typedef struct Queue{
    19. int rear;
    20. int front;
    21. int elem[BUFFER_SIZE];
    22. }Queue;
    23. void initQueue(Queue* q)
    24. {
    25. memset(q, 0, sizeof(Queue));
    26. }
    27. int isEmpty(Queue* q)
    28. {
    29. return q->rear == q->front;
    30. }
    31. int isFull(Queue* q)
    32. {
    33. return (q->rear + 1) % BUFFER_SIZE == q->front;
    34. }
    35. int push(Queue* q, int data)
    36. {
    37. if(isFull(q))
    38. return 0;
    39. q->elem[q->rear] = data;
    40. q->rear = (q->rear + 1) % BUFFER_SIZE;
    41. return 1;
    42. }
    43. int pop(Queue* q, int* data)
    44. {
    45. if(isEmpty(q))
    46. return 0;
    47. *data = q->elem[q->front];
    48. q->front = (q->front + 1) % BUFFER_SIZE;
    49. return 1;
    50. }
    51. //创建消费者任务,其消费者通过id区分
    52. void* consumer(void *arg)
    53. {
    54. int id = *((int*)arg);
    55. char buf[DATA_SIZE] = {0};
    56. while(1){
    57. memset(buf, 0, sizeof(buf));
    58. read(fd[id], buf, sizeof(buf));
    59. sleep(rand() % 3 + 1);
    60. int data;
    61. sscanf(buf, "%d", &data);
    62. printf("id:%d data:%d\n", id, data);
    63. }
    64. return nullptr;
    65. }
    66. int epollserver()
    67. {
    68. //初始化环形队列
    69. Queue buffer;
    70. initQueue(&buffer);
    71. //创建并打开server-consumer-pipe
    72. for(int i = 0; i < N_THREAD; ++i){
    73. char path[128] = {0};
    74. sprintf(path, "%s%d", SERVER_CONSUMER, i);
    75. mkfifo(path, 0666);
    76. fd[i] = open(path, O_RDWR);
    77. }
    78. //打开client-server-pipe(由client创建)
    79. int cs = open(CLIENT_SERVER, O_RDONLY);
    80. //创建N个消费者子线程
    81. pthread_t tid[N_THREAD];
    82. int id[N_THREAD]; //线程标识
    83. for(int i = 0; i < N_THREAD; ++i){
    84. id[i] = i;
    85. pthread_create(&tid[i], NULL, consumer, id + i);
    86. }
    87. //创建epoll实例
    88. int epfd = epoll_create(N_THREAD + 1);
    89. struct epoll_event event[N_THREAD + 1];
    90. for(int i = 0; i < N_THREAD; ++i){
    91. event[i].data.fd = fd[i];
    92. event[i].events = EPOLLOUT; //监听写事件
    93. epoll_ctl(epfd, EPOLL_CTL_ADD, fd[i], event + i);
    94. }
    95. event[N_THREAD].data.fd = cs;
    96. event[N_THREAD].events = EPOLLIN; //监听读事件
    97. epoll_ctl(epfd, EPOLL_CTL_ADD, cs, event + N_THREAD);
    98. //监听epoll,等待事件可读可写的事件返回
    99. struct epoll_event wait_event[N_THREAD + 1];
    100. while(1){
    101. int n = epoll_wait(epfd, wait_event, N_THREAD + 1, -1);
    102. char buf[DATA_SIZE] = {0};
    103. for(int i = 0; i < n; ++i){
    104. if(wait_event[i].data.fd == cs){
    105. memset(buf, 0, sizeof(buf));
    106. read(cs, buf, sizeof(buf));
    107. if(!isFull(&buffer)){
    108. int data;
    109. sscanf(buf, "%d", &data);
    110. push(&buffer, data);
    111. }
    112. }
    113. else{
    114. if(!isEmpty(&buffer)){
    115. int data;
    116. pop(&buffer, &data);
    117. memset(buf, 0, sizeof(buf));
    118. sprintf(buf, "%d", data);
    119. write(wait_event[i].data.fd, buf, sizeof(buf));
    120. }
    121. }
    122. }
    123. }
    124. //等待线程退出
    125. for(int i = 0; i < N_THREAD; ++i){
    126. pthread_join(tid[i], NULL);
    127. }
    128. //关闭文件句柄
    129. for(int i = 0; i < N_THREAD; ++i){
    130. close(fd[i]);
    131. }
    132. return 0;
    133. }

    客户端例子:

    1. /**********************client***********************/
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #define PRODUCER_CLIENT "./producer_client"
    12. #define CLIENT_SERVER "./client_server"
    13. #define N_THREAD 3 //线程数
    14. #define DATA_SIZE 5 //数据大小
    15. #define BUFFER_SIZE 20//缓冲区大小
    16. int cp[N_THREAD]; //client-producer-pipe描述符
    17. //定义缓冲区
    18. typedef struct Queue{
    19. int rear;
    20. int front;
    21. int elem[BUFFER_SIZE];
    22. }Queue;
    23. void* producer(void *arg)
    24. {
    25. int id = *((int*)arg);
    26. char buf[DATA_SIZE] = {0};
    27. while(1){
    28. sleep(rand() % 3 + 1);
    29. int data = rand() % 1000;
    30. sprintf(buf, "%d", data);
    31. write(cp[id], buf, sizeof(buf));
    32. printf("id:%d data:%d\n", id, data);
    33. memset(buf, 0, sizeof(buf));
    34. }
    35. }
    36. int isEmpty(Queue* q)
    37. {
    38. return q->rear == q->front;
    39. }
    40. int isFull(Queue* q)
    41. {
    42. return (q->rear + 1) % BUFFER_SIZE == q->front;
    43. }
    44. void initQueue(Queue* q)
    45. {
    46. memset(q, 0, sizeof(Queue));
    47. }
    48. int push(Queue* q, int data)
    49. {
    50. if(isFull(q))
    51. return 0;
    52. q->elem[q->rear] = data;
    53. q->rear = (q->rear + 1) % BUFFER_SIZE;
    54. return 1;
    55. }
    56. int pop(Queue* q, int* data)
    57. {
    58. if(isEmpty(q))
    59. return 0;
    60. *data = q->elem[q->front];
    61. q->front = (q->front + 1) % BUFFER_SIZE;
    62. return 1;
    63. }
    64. int epoolclient(int argc, char *argv[])
    65. {
    66. Queue buffer;
    67. initQueue(&buffer);
    68. //创建并打开client-producer-pipe
    69. for(int i = 0; i < N_THREAD; ++i){
    70. char path[128] = {0};
    71. sprintf(path, "%s%d", PRODUCER_CLIENT, i);
    72. mkfifo(path, 0666);
    73. cp[i] = open(path, O_RDWR);
    74. }
    75. //创建并打开client-server-pipe
    76. mkfifo(CLIENT_SERVER, 0666);
    77. int cs = open(CLIENT_SERVER, O_WRONLY);
    78. //创建生产者子线程
    79. pthread_t tid[N_THREAD];
    80. int id[N_THREAD]; //线程标识
    81. for(int i = 0; i < N_THREAD; ++i){
    82. id[i] = i;
    83. pthread_create(&tid[i], NULL, producer, id + i);
    84. }
    85. //创建epoll
    86. int epfd = epoll_create(N_THREAD + 1);
    87. struct epoll_event event[N_THREAD + 1];
    88. for(int i = 0; i < N_THREAD; ++i){
    89. event[i].data.fd = cp[i];
    90. event[i].events = EPOLLIN;
    91. epoll_ctl(epfd, EPOLL_CTL_ADD, cp[i], event + i);
    92. }
    93. event[N_THREAD].data.fd = cs;
    94. event[N_THREAD].events = EPOLLOUT;
    95. epoll_ctl(epfd, EPOLL_CTL_ADD, cs, event + N_THREAD);
    96. //监听epoll
    97. struct epoll_event wait_event[N_THREAD + 1];
    98. char buf[DATA_SIZE] = {0};
    99. while(1){
    100. int n = epoll_wait(epfd, wait_event, N_THREAD + 1, -1);
    101. for(int i = 0; i < n; ++i){
    102. if(wait_event[i].data.fd == cs){
    103. if(!isEmpty(&buffer)){
    104. int data;
    105. pop(&buffer, &data);
    106. memset(buf, 0, sizeof(buf));
    107. sprintf(buf, "%d", data);
    108. write(cs, buf, sizeof(buf));
    109. }
    110. }
    111. else{
    112. memset(buf, 0, sizeof(buf));
    113. read(wait_event[i].data.fd, buf, sizeof(buf));
    114. if(!isFull(&buffer)){
    115. int data;
    116. sscanf(buf, "%d", &data);
    117. push(&buffer, data);
    118. }
    119. }
    120. }
    121. }
    122. for(int i = 0; i < N_THREAD; ++i){
    123. pthread_join(tid[i], NULL);
    124. }
    125. for(int i = 0; i < N_THREAD; ++i){
    126. close(cp[i]);
    127. }
    128. close(cs);
    129. return 0;
    130. }

    epoll与设计模式的关系

    待补充

    参考链接

    epoll详解

    不同的IO多路复用具体实现

  • 相关阅读:
    Ubuntu 22.04 | 20.04 |18.04 上安装 PHP 8.1
    LeetCode刷题复盘笔记——47. 全排列 II(一文搞懂回溯解决全排列问题下篇)
    AD域安全攻防实践(附攻防矩阵图)
    Spring 6 提前编译:AOT
    麦芽糖-聚乙二醇-顺铂 cisplatin-PEG-maltose
    第一篇 项目基本情况介绍
    java项目-第140期ssm高校二手交易平台-ssm毕业设计_计算机毕业设计
    【华为OD机试真题 JS】玩牌高手
    vue3+vite配置eslint&prettier
    一文解读所有HashMap的面试题
  • 原文地址:https://blog.csdn.net/iqanchao/article/details/133645673