• 159 Linux C++ 通讯架构实战14,epoll 函数代码实战


    ngx_epoll_init函数的调用

        //(3.2)ngx_epoll_init函数的调用(要在子进程中执行)
        //四章,四节 project1.cpp:nginx中创建worker子进程;
        //nginx中创建worker子进程
        //官方nginx ,一个master进程,创建了多个worker子进程;
        // master process ./nginx 
        // worker process
        //(i)ngx_master_process_cycle()        //创建子进程等一系列动作
        //(i)    ngx_setproctitle()            //设置进程标题    
        //(i)    ngx_start_worker_processes()  //创建worker子进程   
        //(i)        for (i = 0; i < threadnums; i++)   //master进程在走这个循环,来创建若干个子进程
        //(i)            ngx_spawn_process(i,"worker process");
        //(i)                pid = fork(); //分叉,从原来的一个master进程(一个叉),分成两个叉(原有的master进程,以及一个新fork()出来的worker进程
        //(i)                //只有子进程这个分叉才会执行ngx_worker_process_cycle()
        //(i)                ngx_worker_process_cycle(inum,pprocname);  //子进程分叉
        //(i)                    ngx_worker_process_init();  //ngx_epoll_init函数的调用
        //(i)                        sigemptyset(&set);  
        //(i)                        sigprocmask(SIG_SETMASK, &set, NULL); //允许接收所有信号
        //(i)                        g_socket.ngx_epoll_init();  //初始化epoll相关内容,同时 往监听socket上增加监听事件,从而开始让监听端口履行其职责
        //(i)                            m_epollhandle = epoll_create(m_worker_connections); 
        //(i)                            ngx_epoll_add_event((*pos)->fd....);
        //(i)                                epoll_ctl(m_epollhandle,eventtype,fd,&ev);
        //(i)                    ngx_setproctitle(pprocname);          //重新为子进程设置标题为worker process
        //(i)                    for ( ;; ) {

                                            ngx_process_events_and_timers(); //处理网络事件和定时器事件

                                  }. ....                   //子进程开始在这里不断的死循环

        //(i)    sigemptyset(&set); 
        //(i)    for ( ;; ) {}.                //父进程[master进程]会一直在这里循环

    ngx_epoll_init 函数内容

    该函数从完成的事情有:

    1.epoll_create  创建红黑树

    2.m_pconnections = new ngx_connection_t[m_connection_n];  创建连接池 并初始化连接池

    3.遍历所有监听socket【监听端口】,我们为每个监听socket增加一个 连接池中的连接

    for(pos = m_ListenSocketList.begin(); pos != m_ListenSocketList.end(); ++pos){

    c = ngx_get_connection((*pos)->fd); //从连接池中获取一个空闲连接对象

    c->listening = (*pos);   //连接对象 和监听对象关联,方便通过连接对象找监听对象

    (*pos)->connection = c;  //监听对象 和连接对象关联,方便通过监听对象找连接对象

            //对监听端口的读事件设置处理方法,因为监听端口是用来等对方连接的发送三路握手的,所以监听端口关心的就是读事件

            c->rhandler = &CSocekt::ngx_event_accept;

    1. //往监听socket上增加监听事件,从而开始让监听端口履行其职责【如果不加这行,虽然端口能连上,但不会触发ngx_epoll_process_events()里边的epoll_wait()往下走】
    2. if(ngx_epoll_add_event((*pos)->fd, //socekt句柄
    3. 1,0, //读,写【只关心读事件,所以参数2:readevent=1,而参数3:writeevent=0
    4. 0, //其他补充标记
    5. EPOLL_CTL_ADD, //事件类型【增加,还有删除/修改】
    6. c //连接池中的连接
    7. ) == -1)
    8. {
    9. exit(2); //有问题,直接退出,日志 已经写过了
    10. }

    }

    4.ngx_epoll_add_event 函数 本质上就是epoll_ctl。并设置了回调函数。

    1. #和网络相关
    2. [Net]
    3. #监听的端口数量,一般都是1个,当然如果支持多于一个也是可以的
    4. ListenPortCount = 2
    5. #ListenPort+数字【数字从0开始】,这种ListenPort开头的项有几个,取决于ListenPortCount的数量,
    6. ListenPort0 = 80
    7. ListenPort1 = 443
    8. #epoll连接的最大数【是每个worker进程允许连接的客户端数】,实际其中有一些连接要被监听socket使用,实际允许的客户端连接数会比这个数小一些
    9. worker_connections = 1024

    1. //初始化函数【fork()子进程之前干这个事】
    2. //成功返回true,失败返回false
    3. bool CSocekt::Initialize()
    4. {
    5. ReadConf(); //读配置项
    6. bool reco = ngx_open_listening_sockets(); //打开监听端口
    7. return reco;
    8. }
    9. //专门用于读各种配置项
    10. void CSocekt::ReadConf()
    11. {
    12. CConfig *p_config = CConfig::GetInstance();
    13. m_worker_connections = p_config->GetIntDefault("worker_connections",m_worker_connections); //epoll连接的最大项数
    14. m_ListenPortCount = p_config->GetIntDefault("ListenPortCount",m_ListenPortCount); //取得要监听的端口数量
    15. return;
    16. }

        //连接池: 数组,元素数量就是worker_connections【1024】,每个数组元素类型为 ngx_connection_t【结构】; ---结构数组;
         //为什么要引入这个数组:  2个监听套接字, 用户连入进来,每个用户多出来一个套接字;
           //把 套接字数字跟一块内存捆绑,达到的效果就是将来我通过这个套接字,就能够把这块内存拿出来;

    初始化 连接池

    当执行完成如下的代码后,参考图

    1. do
    2. {
    3. i--; //注意i是数字的末尾,从最后遍历,i递减至数组首个元素
    4. //好从屁股往前来---------
    5. c[i].data = next; //设置连接对象的next指针,注意第一次循环时next = NULL;
    6. c[i].fd = -1; //初始化连接,无socket和该连接池中的连接【对象】绑定
    7. c[i].instance = 1; //失效标志位设置为1【失效】,此句抄自官方nginx,这句到底有啥用,后续再研究
    8. c[i].iCurrsequence = 0; //当前序号统一从0开始
    9. //----------------------
    10. next = &c[i]; //next指针前移
    11. } while (i); //循环直至i为0,即数组首地址
    12. m_pfree_connections = next; //设置空闲连接链表头指针,因为现在next指向c[0],注意现在整个链表都是空的
    13. m_free_connection_n = m_connection_n; //空闲连接链表长度,因为现在整个链表都是空的,这两个长度相等;

    ngx_get_connection()重要函数:从连接池中找空闲连接;注意在这段代码中 应该只是给 监听fd 一个空闲连接对象。

    1. for(pos = m_ListenSocketList.begin(); pos != m_ListenSocketList.end(); ++pos)
    2. {
    3. c = ngx_get_connection((*pos)->fd); //从连接池中获取一个空闲连接对象

    理论上,后面应该要给每一个connectfd也要给一个 空闲连接对象。

        //c)ngx_epoll_add_event() ************
        //    epoll_ctl();
        //d)ev.data.ptr = (void *)( (uintptr_t)c | c->instance);    把一个指针和一个位 合二为一,塞到一个void *中去,
             //后续能够把这两个值全部取出来,如何取,取出来干嘛,后续再说;

        //ps -eo pid,ppid,sid,tty,pgrp,comm,stat,cmd | grep -E 'bash|PID|nginx'

        //如下命令用root权限执行
        //sudo su       获得root权限
        //lsof -i:80    列出哪些进程在监听80端口
        //netstat -tunlp | grep 80

        //总结:
        //a)epoll_create();epoll_ctl();
        //b)连接池技巧ngx_get_connection(),ngx_free_connection();学习这种编程方法;
        //c)同时传递 一个指针和一个二进制数字技巧;

        //(3.2)ngx_epoll_init函数的调用(要在子进程中执行)
        //四章,四节 project1.cpp:nginx中创建worker子进程;
        //nginx中创建worker子进程
        //官方nginx ,一个master进程,创建了多个worker子进程;
        // master process ./nginx 
        // worker process
        //(i)ngx_master_process_cycle()        //创建子进程等一系列动作
        //(i)    ngx_setproctitle()            //设置进程标题    
        //(i)    ngx_start_worker_processes()  //创建worker子进程   
        //(i)        for (i = 0; i < threadnums; i++)   //master进程在走这个循环,来创建若干个子进程
        //(i)            ngx_spawn_process(i,"worker process");
        //(i)                pid = fork(); //分叉,从原来的一个master进程(一个叉),分成两个叉(原有的master进程,以及一个新fork()出来的worker进程
        //(i)                //只有子进程这个分叉才会执行ngx_worker_process_cycle()
        //(i)                ngx_worker_process_cycle(inum,pprocname);  //子进程分叉
        //(i)                    ngx_worker_process_init();
        //(i)                        sigemptyset(&set);  
        //(i)                        sigprocmask(SIG_SETMASK, &set, NULL); //允许接收所有信号
        //(i)                        g_socket.ngx_epoll_init();  //初始化epoll相关内容,同时 往监听socket上增加监听事件,从而开始让监听端口履行其职责
        //(i)                            m_epollhandle = epoll_create(m_worker_connections); 
        //(i)                            ngx_epoll_add_event((*pos)->fd....);
        //(i)                                epoll_ctl(m_epollhandle,eventtype,fd,&ev);
        //(i)                    ngx_setproctitle(pprocname);          //重新为子进程设置标题为worker process
        //(i)                    for ( ;; ) {}. ....                   //子进程开始在这里不断的死循环

        //(i)    sigemptyset(&set); 
        //(i)    for ( ;; ) {}.                //父进程[master进程]会一直在这里循环

    上述,我们已经完成了epoll_init,对于 listenfd的 epoll_ctl

    那么epoll_wait是什么时候呢?以及对于connectfd的 epoll_ctl以及对于 connectfd的epoll_wait在哪里呢?

    ngx_process_events_and_timers(); 这个函数就是做epoll_wait的 //处理网络事件和定时器事件

        g_socket.ngx_epoll_process_events(-1); //-1表示卡着等待把。 -1表示阻塞等待,为什么要阻塞等待,这是 epoll模型决定的,多路IO复用,listenfd wait的时候是阻塞等待。

    1. //等待事件,事件会返回到m_events里,最多返回NGX_MAX_EVENTS个事件【因为我只提供了这些内存】;
    2. //阻塞timer这么长时间除非:a)阻塞时间到达 b)阻塞期间收到事件会立刻返回c)调用时有事件也会立刻返回d)如果来个信号,比如你用kill -1 pid测试
    3. //如果timer为-1则一直阻塞,如果timer为0则立即返回,即便没有任何事件
    4. //返回值:有错误发生返回-1,错误在errno中,比如你发个信号过来,就返回-1,错误信息是(4: Interrupted system call)
    5. // 如果你等待的是一段时间,并且超时了,则返回0
    6. // 如果返回>0则表示成功捕获到这么多个事件【返回值里】
    7. int events = epoll_wait(m_epollhandle,m_events,NGX_MAX_EVENTS,timer);

    如果是读事件,注意这里是的读事件,是listenfd的读事件,对应的是 客户端有连接过来了。

    对应的回调函数是:   ngx_event_accept; 该设定在前面

    1. if(revents & EPOLLIN) //如果是读事件
    2. {
    3. //一个客户端新连入,这个会成立
    4. //c->r_ready = 1; //标记可以读;【从连接池拿出一个连接时这个连接的所有成员都是0
    5. (this->* (c->rhandler) )(c); //注意括号的运用来正确设置优先级,防止编译出错;【如果是个新客户连入
    6. //如果新连接进入,这里执行的应该是CSocekt::ngx_event_accept(c)】
    7. //如果是已经连入,发送数据到这里,则这里执行的应该是 CSocekt::ngx_wait_request_handler
    8. }

            c->rhandler = &CSocekt::ngx_event_accept;

    void CSocekt::ngx_event_accept(lpngx_connection_t oldc) 函数调用:

    通过accept4函数得到 connectfd。注意这里都是NON_BLOCK,取到值

    1. if(use_accept4)
    2. {
    3. s = accept4(oldc->fd, &mysockaddr, &socklen, SOCK_NONBLOCK); //从内核获取一个用户端连接,最后一个参数SOCK_NONBLOCK表示返回一个非阻塞的socket,节省一次ioctl【设置为非阻塞】调用
    4. }
    5. else
    6. {
    7. s = accept(oldc->fd, &mysockaddr, &socklen);
    8. }

    设置copnnectfd的 回调函数为 ngx_wait_request_handler函数

    通过ngx_epoll_add_event,将 connectfd 加入红黑树。 这时候如果客户端有数据发送过来,就会调用 ngx_wait_request_handler 函数

    1. newc->rhandler = &CSocekt::ngx_wait_request_handler; //设置数据来时的读处理函数,其实官方nginx中是ngx_http_wait_request_handler()
    2. //客户端应该主动发送第一次的数据,这里将读事件加入epoll监控
    3. if(ngx_epoll_add_event(s, //socket句柄
    4. 1,0, //读,写
    5. EPOLLET, //其他补充标记【EPOLLET(高速模式,边缘触发ET)】
    6. EPOLL_CTL_ADD, //事件类型【增加,还有删除/修改】
    7. newc //连接池中的连接
    8. ) == -1)

    也就是说,当客户端有数据发送过来的时候,会调用 ngx_wait_Request_handler函数。在这个函数中

    1. 收包,这里就要涉及到一个问题,当我们

  • 相关阅读:
    Spring事务简介说明
    微信小程序上拉触底事件
    机器学习---支持向量机的初步理解
    SELECT和GROUP BY语句
    React高频面试题(附答案)
    【Arduino+ESP32专题】串口的简单使用
    字符流学习14.3
    JSON+<boost/property_tree/json_parser.hpp>+<boost/property_tree/ptree.hpp>
    我们为什么要阅读webpack源码
    优化算法 - Adadelta
  • 原文地址:https://blog.csdn.net/hunandede/article/details/137269326