• 模拟Proactor模式实现 I/O 处理单元


    编写main.cpp

    1.socket通信

    服务器应用程序可以通过读取和写入 Socket 对象

    • 监听来自客户端的请求
    • 并向客户端返回响应
    1. #define MAX_FD 65536 // 最大的文件描述符个数
    2. #define MAX_EVENT_NUMBER 10000 // 监听的最大的事件数量
    3. // 添加信号捕捉
    4. void addsig(int sig, void( handler )(int)){//信号处理函数
    5. struct sigaction sa;//创建信号量
    6. memset( &sa, '\0', sizeof( sa ) );
    7. sa.sa_handler = handler;
    8. sigfillset( &sa.sa_mask );//设置信号临时阻塞等级
    9. assert( sigaction( sig, &sa, NULL ) != -1 );//注册信号
    10. }
    1. int main( int argc, char* argv[] ) {
    2. // 判断参数个数,至少要传递一个端口号
    3. if( argc <= 1 ) {
    4. printf( "usage: %s port_number\n", basename(argv[0]));
    5. return 1;
    6. }
    7. // 获取端口号,转换成整数
    8. int port = atoi( argv[1] );
    9. addsig( SIGPIPE, SIG_IGN );
    10. // 使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
    11. int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); // 创建用于监听的socket文件描述符
    12. int ret = 0;
    13. // 存放服务器的地址信息
    14. struct sockaddr_in address;
    15. address.sin_family = AF_INET;//使用IPv4协议
    16. address.sin_addr.s_addr = INADDR_ANY; //监听所有网卡的连接请求
    17. address.sin_port = htons( port );//将端口号(大端小端)转换为网络字节序,并保存到address结构体中
    18. // 端口复用
    19. int reuse = 1;
    20. // 让多个进程绑定同一个端口,从而实现负载均衡或者高可用等功能
    21. setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ) );
    22. // 绑定服务器的地址信息
    23. ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    24. ret = listen( listenfd, 5 );
    25. ...
    26. return 0;
    27. }

     2.初始化线程池

    1. // 创建线程池,并初始化
    2. // 来一个任务之后,要封装成一个任务对象,交给线程池去处理
    3. threadpool< http_conn >* pool = NULL;
    4. try {
    5. pool = new threadpool;
    6. } catch( ... ) {
    7. return 1;
    8. }
    • 事件处理器:在初始化线程池时,输入的参数是http_conn对象,这个就是用于处理任务的任务类
    1. // 创建一个数组用于保存所有的客户端信息
    2. // 每当有新连接进来时,都会在 users 数组中找到一个未使用的 http_conn 对象,
    3. // 进行初始化并保存该连接对应的信息
    4. http_conn* users = new http_conn[ MAX_FD ];

    3.创建 epoll 对象 和事件数组 events

    • 监听的文件描述符 listenfd 添加到 epoll 对象
    1. // 创建epoll对象,和事件数组,添加
    2. epoll_event events[ MAX_EVENT_NUMBER ];
    3. int epollfd = epoll_create( 5 );//创建epoll对象,通过该文件描述符对 epoll 进行控制和管理(监听)
    4. // 将监听的文件描述符添加到 epoll 对象中
    5. addfd( epollfd, listenfd, false );
    6. http_conn::m_epollfd = epollfd;//赋值
    1. // 添加文件描述符到 epoll 中
    2. extern void addfd( int epollfd, int fd, bool one_shot );
    • 其中,在http_conn.cpp 中编写 addfd 函数 
    1. // 向epoll中添加需要监听的文件描述符
    2. void addfd( int epollfd, int fd, bool one_shot ) {
    3. epoll_event event;
    4. event.data.fd = fd;
    5. event.events = EPOLLIN | EPOLLRDHUP;
    6. if(one_shot)
    7. {
    8. // 防止同一个通信被不同的线程处理
    9. event.events |= EPOLLONESHOT;
    10. }
    11. epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    12. // 设置文件描述符非阻塞
    13. setnonblocking(fd);
    14. }

    4.同步 I/O 模拟 Proactor 模式

    1. // 创建epoll实例,通过一棵红黑树管理待检测集合
    2. // 参数 size 从 Linux 2.6.8 以后就不再使用,但是必须设置一个大于 0 的值。epoll_create 函数调用成功返回一个非负值的 epollfd,调用失败返回 -1。
    3. int epoll_create(int size);
    4. // 对epoll实例进行管理:添加文件描述符信息,删除信息,修改信息
    5. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    6. - 参数:
    7. - epfd : epoll实例对应的文件描述符
    8. - op : 要进行什么操作
    9. EPOLL_CTL_ADD: 添加
    10. EPOLL_CTL_MOD: 修改
    11. EPOLL_CTL_DEL: 删除
    12. - fd : 要检测的文件描述符
    13. - event : 检测文件描述符什么事情
    14. // 检测函数----检测epoll树中是否有就绪的文件描述符
    15. // 创建了epfd,设置好某个fd上需要检测事件并将该fd绑定到epfd上去后,就可以调用epoll_wait
    16. // 检测事件了
    17. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    18. - 参数:
    19. - epfd : epoll实例对应的文件描述符
    20. - events : 传出参数,保存了发送了变化的文件描述符的信息
    21. - maxevents : 第二个参数结构体数组的大小
    22. - timeout : 阻塞时间
    23. - 0 : 不阻塞
    24. - -1 : 阻塞,直到检测到fd数据发生变化,解除阻塞
    25. - > 0 : 阻塞的时长(毫秒)
    26. - 返回值:
    27. - 成功,返回发送变化的文件描述符的个数 > 0
    28. - 失败 -1
    29. struct epoll_event {
    30. uint32_t events; /* Epoll events */
    31. epoll_data_t data; /* User data variable */
    32. };
    33. events描述事件类型,其中epoll事件类型有以下几种
    34. - EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
    35. - EPOLLOUT:表示对应的文件描述符可以写
    36. - EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
    37. - EPOLLERR:表示对应的文件描述符发生错误
    38. - EPOLLHUP:表示对应的文件描述符被挂断;
    39. - EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的
    40. - EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

    epoll() 多路复用 和 两种工作模式_呵呵哒( ̄▽ ̄)"的博客-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/weixin_41987016/article/details/132523789?spm=1001.2014.3001.5501

    epoll 基于多线程的边沿非阻塞处理_呵呵哒( ̄▽ ̄)"的博客-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/weixin_41987016/article/details/132539393?spm=1001.2014.3001.5501 

    同步 I/O 模拟 Proactor 模式的工作流程
    1. // 同步 I/O 模拟 Proactor 模式的工作流程
    2. while(true){
    3. /*
    4. 1、阻塞等待文件描述符监听到的事件
    5. 2、遍历事件数组,判断事件类型,进行对应处理
    6. */
    7. }
    • while循环不断检测有无事件发生,具体就是使用 epoll_wait 获取监听 socket 的文件描述符所返回的事件数量 
    • epoll_wait()函数不断地检测文件描述符epollfd上是否有I/O事件发生
    • 当有可读或可写事件发生时,epoll_wait()函数 会返回一个events数组,并将其中的事件信息填充到数组中,遍历数组中的所有事件,根据事件类型进行相应的处理。
    1. // 模拟 Proactor 模式
    2. while(true) {
    3. // 具体来说就是使用epoll_wait获取监听socket的文件描述符所返回得到事件数量
    4. int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
    5. if ( ( number < 0 ) && ( errno != EINTR ) ) {
    6. printf( "epoll failure\n" );
    7. break;
    8. }
    9. // 循环遍历事件数组
    10. for ( int i = 0; i < number; i++ ) {
    11. int sockfd = events[i].data.fd;
    12. if( sockfd == listenfd ) {
    13. // 有客户端连接进来
    14. ...
    15. }else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ) {
    16. // 对方异常断开或错误异常
    17. ...
    18. }else if(events[i].events & EPOLLIN) {
    19. // 判断是否有读事件发生
    20. ...
    21. } else if( events[i].events & EPOLLOUT ) {
    22. // 判断是否有写事件发生
    23. ...
    24. }
    25. }
    26. }

    **************************************************************************************************************

    以下总结来自这篇文章:【从0开始编写webserver·基础篇#02】服务器的核心---I/O处理单元和任务类 - dayceng - 博客园 (cnblogs.com)

    • epoll_wait 是一个系统调用函数,用于等待文件描述符上的I/O事件;
    • epollfd 是通过 epoll_create函数 创建的 epoll实例的文件描述符它用于管理需要监视的文件描述符集合;
    • listenfd 是服务器应用程序使用的套接字文件描述符,它与 epollfd 关联,并使用 epoll_ctl函数 ,将其添加到 epollfd 所管理的文件描述符集合中。

    epollfd 代表了一个 epoll实例,负责管理需要监视的文件描述符集合,而 listenfd 则是需要被监视的文件描述符之一,它被添加到 epollfd 所管理的文件描述符集合中,以便在有新的客户端连接请求时能够及时通知服务器程序。epoll_wait 函数返回时,它会将事件列表填入events数组中,告诉服务器哪些文件描述符发生了I/O事件,然后服务器应用程序根据这些事件来执行相应的操作。

    • EPOLLRDHUP表示TCP连接的远程端关闭半关闭连接,即对方关闭了socket连接或者shutdown写操作。
    • EPOLLHUP表示挂起的连接或监听套接字已经关闭。它可能是一个错误,也可能是一个正常情况,因为它只代表文件描述符不再可用,而不是一定有错误。
    • EPOLLERR表示错误事件。例如:socket被对端重置(rst);对于udp的epoll来说,他可以支持多个端口绑定,当然你不能bind两次同一个端口,那么第二次就会返回-1并且errno会被设置为EADDRINUSE;还有就是当读取时没有数据则返回-1并且errno被设置为EAGAIN

    **************************************************************************************************************

    (1)listenfd 有读事件发生
    • 有新的客户端连接进来,接受客户端连接 accept() ,将新的客户端数据初始化并存储到http_conn 类型的 users 数组中 
    1. if( sockfd == listenfd ) {
    2. // 有客户端连接进来
    3. struct sockaddr_in client_address;
    4. socklen_t client_addrlength = sizeof( client_address );
    5. int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
    6. if ( connfd < 0 ) {
    7. printf( "errno is: %d\n", errno );
    8. continue;
    9. }
    10. if( http_conn::m_user_count >= MAX_FD ) {
    11. // 目前连接满了
    12. printf("服务器正忙...\n");
    13. close(connfd);
    14. continue;
    15. }
    16. // 将新的客户的数据初始化,放到数组中
    17. users[connfd].init( connfd, client_address);
    18. }
    • 在服务器中,通常会使用一个监听socket(listenfd)来接受客户端的连接请求
    • 当有新的客户端连接到来时,服务器会使用accept函数创建一个新的连接socket(connfd)
    • 这个新的socket会与客户端的socket建立起通信连接
    (2)sockfdEPOLLRDHUPEPOLLHUP EPOLLERR事件
    1. // 对方异常断开或错误异常
    2. else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ) {
    3. // http_conn类型的users数组中该客户端的状态设置为关闭
    4. users[sockfd].close_conn();
    5. }
    (3)sockfd 有读事件发生
    • events中存储的事件为sockfd可读事件时,表示该socket有数据可读, 此时主线程一次性读取完所有请求数据read(),成功读完后要交给工作线程处理。
    • 将读取到的数据封装成一个请求对象并插入请求队列。调用线程池,追加任务。线程池执行 run 函数,不断从队列去取。取到就做业务处理,解析、生成响应数据
    1. else if(events[i].events & EPOLLIN) {
    2. // 判断是否有读事件发生
    3. if(users[sockfd].read()) {// 一次性读取完所有请求数据,read()
    4. // 成功读完后要交给工作线程处理
    5. // 调用线程池,追加任务
    6. // 线程池执行 run 函数,不断从队列去取
    7. // 取到就做业务处理,解析、生成响应数据
    8. pool->append(users + sockfd);
    9. } else {//读失败,关闭
    10. users[sockfd].close_conn();
    11. }
    12. }
    • users 数组中的每个元素都代表一个客户端连接, 数组的下标是该客户端的文件描述符 fd
    • users + sockfd 就是获取到该客户端连接的 http_conn 对象的指针
    • 调用线程池对象的 append 函数,该指针作为参数
    • 将该指针所指向的 http_conn 对象添加到线程池的任务队列中, 等待线程池的工作线程来处理
    (4)sockfd有写事件发生
    • 主线程调用 epoll_wait 等待 socket 可写
    • 当 socket 可写时,epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果
    1. else if( events[i].events & EPOLLOUT ) {
    2. // 将响应数据发送给客户端,若发送成功则继续等待下一个写事件发生,
    3. // 否则关闭该链接
    4. if( !users[sockfd].write() ) {
    5. users[sockfd].close_conn();
    6. }
    7. }

    5.完整代码

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include "locker.h"
    12. #include "threadpool.h"
    13. #include "http_conn.h"
    14. #define MAX_FD 65536 // 最大的文件描述符个数
    15. #define MAX_EVENT_NUMBER 10000 // 监听的最大的事件数量
    16. /*
    17. 自定义函数 addfd,需要把监听的文件描述符 listenfd 添加到 epoll 对象中,
    18. 即将它加入到内核事件表中
    19. */
    20. // 添加文件描述符到 epoll 中
    21. extern void addfd( int epollfd, int fd, bool one_shot );
    22. extern void removefd( int epollfd, int fd );
    23. // 添加信号捕捉
    24. void addsig(int sig, void( handler )(int)){//信号处理函数
    25. struct sigaction sa;//创建信号量
    26. memset( &sa, '\0', sizeof( sa ) );
    27. sa.sa_handler = handler;
    28. sigfillset( &sa.sa_mask );//设置信号临时阻塞等级
    29. assert( sigaction( sig, &sa, NULL ) != -1 );//注册信号
    30. }
    31. /*
    32. 模拟 proactor 模式,主线程监听事件
    33. 当有读事件产生,在主线程中一次性读出来,封装成一个任务对象(用任务类)
    34. 然后交给子线程(线程池队列中的工作线程),线程池再去取任务做任务
    35. */
    36. int main( int argc, char* argv[] ) {
    37. // 判断参数个数,至少要传递一个端口号
    38. if( argc <= 1 ) {
    39. printf( "usage: %s port_number\n", basename(argv[0]));
    40. return 1;
    41. }
    42. // 获取端口号,转换成整数
    43. int port = atoi( argv[1] );
    44. addsig( SIGPIPE, SIG_IGN );
    45. // 创建线程池,并初始化
    46. // 来一个任务之后,要封装成一个任务对象,交给线程池去处理
    47. threadpool< http_conn >* pool = NULL;
    48. try {
    49. pool = new threadpool;
    50. } catch( ... ) {
    51. return 1;
    52. }
    53. // 创建一个数组用于保存所有的客户端信息
    54. // 每当有新连接进来时,都会在 users 数组中找到一个未使用的 http_conn 对象,进行初始化并保存该连接对应的信息
    55. http_conn* users = new http_conn[ MAX_FD ];
    56. // 使用socketAPI编写Reactor组件,通过监听socket文件描述符获取连接请求
    57. int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); // 创建用于监听的socket文件描述符
    58. int ret = 0;
    59. // 存放服务器的地址信息
    60. struct sockaddr_in address;
    61. address.sin_family = AF_INET;//使用IPv4协议
    62. address.sin_addr.s_addr = INADDR_ANY; //监听所有网卡的连接请求
    63. address.sin_port = htons( port );//将端口号(大端小端)转换为网络字节序,并保存到address结构体中
    64. // 端口复用
    65. int reuse = 1;
    66. setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ) );//让多个进程绑定同一个端口,从而实现负载均衡或者高可用等功能
    67. // 绑定服务器的地址信息
    68. ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    69. ret = listen( listenfd, 5 );
    70. // 创建epoll对象,和事件数组,添加
    71. epoll_event events[ MAX_EVENT_NUMBER ];
    72. int epollfd = epoll_create( 5 );//创建epoll对象,通过该文件描述符对 epoll 进行控制和管理(监听)
    73. // 将监听的文件描述符添加到 epoll 对象中
    74. addfd( epollfd, listenfd, false );
    75. http_conn::m_epollfd = epollfd;//赋值
    76. // 编写Reactor组件
    77. while(true) {
    78. // 具体来说就是使用epoll_wait获取监听socket的文件描述符所返回得到事件数量
    79. int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
    80. if ( ( number < 0 ) && ( errno != EINTR ) ) {
    81. printf( "epoll failure\n" );
    82. break;
    83. }
    84. // 循环遍历事件数组
    85. for ( int i = 0; i < number; i++ ) {
    86. int sockfd = events[i].data.fd;
    87. if( sockfd == listenfd ) {
    88. // 有客户端连接进来
    89. struct sockaddr_in client_address;
    90. socklen_t client_addrlength = sizeof( client_address );
    91. int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
    92. if ( connfd < 0 ) {
    93. printf( "errno is: %d\n", errno );
    94. continue;
    95. }
    96. if( http_conn::m_user_count >= MAX_FD ) {
    97. // 目前连接满了
    98. printf("服务器正忙...\n");
    99. close(connfd);
    100. continue;
    101. }
    102. // 将新的客户的数据初始化,放到数组中
    103. users[connfd].init( connfd, client_address);
    104. } else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ) {
    105. // 对方异常断开或错误异常
    106. users[sockfd].close_conn();
    107. } else if(events[i].events & EPOLLIN) {
    108. // 判断是否有读事件发生
    109. if(users[sockfd].read()) {// 一次性读出数据,read()
    110. // 成功读完后要交给工作线程处理
    111. // 调用线程池,追加任务
    112. // 线程池执行 run 函数,不断从队列去取
    113. // 取到就做业务处理,解析、生成响应数据
    114. pool->append(users + sockfd);
    115. } else {//读失败,关闭
    116. users[sockfd].close_conn();
    117. }
    118. } else if( events[i].events & EPOLLOUT ) {
    119. if( !users[sockfd].write() ) {
    120. users[sockfd].close_conn();
    121. }
    122. }
    123. }
    124. }
    125. close( epollfd );
    126. close( listenfd );
    127. delete [] users;
    128. delete pool;
    129. return 0;
    130. }

    推荐和参考此文章:

    【从0开始编写webserver·基础篇#02】服务器的核心---I/O处理单元和任务类 - dayceng - 博客园 (cnblogs.com)

  • 相关阅读:
    c编译器学习02:chibicc文档翻译
    FFmpeg入门详解之6:VLC播放器简介
    京东面试官:在Mybatis中 Dao接口和XML文件的SQL如何建立关联?
    青少年软件编程C++一级题库(31-40)
    b站黑马JavaScript的Ajax案例代码——评论列表案例
    如何编辑扫描的PDF文件?
    Java 并发编程解析 | 如何正确理解Java领域中的并发锁,我们应该具体掌握到什么程度?
    Java集成云打印机(芯烨云)——文档篇
    c++ primer plus 笔记 第十六章 string类和标准模板库
    Palantir,硅谷创业新黑帮?
  • 原文地址:https://blog.csdn.net/weixin_41987016/article/details/132718191