• Reactor



    Reactor

    Reactor 是对 I/O 多路复⽤作了⼀层封装,让使⽤者不⽤考虑底层⽹络 API 的细
    节,只需要关注应⽤代码的编写。

    Reactor 翻译过来的意思是「反应堆」,这⾥的反应指的是「对事件反应」,也就是来了⼀个事件,Reactor 就有相对应的反应/响应。
    事实上,Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复⽤监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。

    Reactor 模式主要由 Reactor 和处理资源池这两个核⼼部分组成,它俩负责的事情如下:

    • Reactor 负责监听和分发事件,事件类型包含连接事件、读写事件;
    • 处理资源池负责处理事件,如 read -> 业务逻辑 -> send;

    Reactor 模式是灵活多变的,可以应对不同的业务场景,灵活在于:
    Reactor 的数量可以只有⼀个,也可以有多个。
    处理资源池可以是单个进程 / 线程,也可以是多个进程 /线程;

    Reactor中具体的操作步骤如下:
    读取操作:
    (1)Reactor 注册读就绪事件和相关联的回调函数
    (2)Reactor 等待事件的发生
    (3)当发生读就需事件的时候,Reactor 调用第一步注册的回调函数
    (4)Reactor 执行读取操作,将读取到的数据进行业务处理,然后注册写就绪时间


    具体的实现代码

    实现简易计算器的业务:客户端发来一段字符串,格式为"a+bXc-dX",X主要用来分隔表达式,防止出现“粘包”问题,计算完成之后,将结果发回去,也用X进行分隔。

    Sock.hpp:

    #pragma once
    
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    using namespace std;
    
    class Sock{
    public:
        static int Socket()
        {
            int sock=socket(AF_INET,SOCK_STREAM,0);
            if(sock<0)
            {
                cerr<<"socket error"<<endl;
                exit(2);
            }
            return sock;
        }
        static void Bind(int sock,int port)
        {
            struct sockaddr_in local;
            bzero(&local,sizeof(local));
            local.sin_port=htons(port);
            local.sin_family=AF_INET;
            local.sin_addr.s_addr=htonl(INADDR_ANY);
    
            if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
            {
                cerr<<"bind error"<<endl;
            }
        }
        static void Listen(int sock)
        {
            if(listen(sock,5)<0)
            {
                cerr<<"listen error"<<endl;
                exit(4);
            }
        }
        static int Accept(int sock)
        {
            struct sockaddr_in peer;
            socklen_t len=sizeof(peer);
            int fd=accept(sock,(struct sockaddr*)&peer,&len);
            if(fd<0)
            {
                cerr<<"accept error"<<endl;
            }
            return fd;
        }
        //端口复用
        static void Setsockopt(int listen_sock)
        {
            int opt=1;
            setsockopt(listen_sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
        }
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    Reactor.hpp:

    #pragma once
    #include
    #include
    #include
    #include
    #include
    #include 
    
    const int SIZE=128;
    const int NUM=64;
    class Event;
    class Reactor;
    //一般处理IO时,我们需要三种接口:读取、写入、异常
    typedef int (*callback_t)(Event*ev);
    
    //epoll管理的基本节点
    class Event
    {
        public:
            //对应的文件描述符
            int _sock;
            //sock的输入缓冲区
            std::string _in_buffer;
            //sock的输出缓冲区
            std::string _out_buffer;
            //设置回调
            callback_t _recver;
            callback_t _sender;
            callback_t _errorer;
    
            //设置Event回指Reactor的指针
            Reactor *_p_reactor;
        public:
            Event()
            {
                _sock=-1;
                _recver=nullptr;
                _sender=nullptr;
                _errorer=nullptr;
                _p_reactor=nullptr;
            }
            void RegisterCallback(callback_t recver,callback_t sender,callback_t error)
            {
                _recver=recver;
                _sender=sender;
                _errorer=error;
    
            }
            ~Event(){}
    };
    class Reactor
    {
        private:
            int _epfd;
            //文件描述符和Event的映射
            std::unordered_map<int,Event*> _events_map;
        public:
            Reactor()
                :_epfd(-1)
            {}
            void InitReactor()
            {
                _epfd=epoll_create(SIZE);
                if(_epfd<0)
                {
                    std::cerr<<"epoll_create 失败"<<std::endl;
                    exit(1);
                }
                std::cout<<"epoll_create 成功"<<std::endl;
            }
            //插入一个事件
            bool InsertEvent(Event*p_ev,uint32_t event)
            {
                int sock=p_ev->_sock;
                //将p_ev中的sock插入到epoll中
                struct epoll_event ev;
                ev.events=event;
                ev.data.fd=sock;
                if(epoll_ctl(_epfd,EPOLL_CTL_ADD,sock,&ev)<0)
                {
                    std::cerr<<"epoll_ctl 添加事件失败"<<std::endl;
                    return false;
                }
                //将p_ev本身插入到unordered_map中
                _events_map[sock]=p_ev;
                std::cout<<"添加到epoll中成功:"<<sock<<std::endl;
            }
            //删除事件
            void DeleteEvent(Event*p_ev)
            {
               int sock=p_ev->_sock;
               epoll_ctl(_epfd,EPOLL_CTL_DEL,sock,nullptr);
               _events_map.erase(sock);
            }
            //修改事件
            void ChangeEvent(int sock,bool read,bool write)
            {
                struct epoll_event ev;
                ev.data.fd=sock;
                ev.events=(read?EPOLLIN:0)|(write?EPOLLOUT:0)|EPOLLET;
                if(epoll_ctl(_epfd,EPOLL_CTL_MOD,sock,&ev)<0)
                {
                    std::cerr<<"修改事件失败"<<std::endl;
                }
            }
    
            //派发就绪事件
            void Dispatcher(int time_out)
            {
                //获取已经就绪的事件
                struct epoll_event revs[NUM];
                int num=epoll_wait(_epfd,revs,NUM,time_out);
                for(int i=0;i<num;i++)
                {
                    int sock=revs[i].data.fd;
                    uint32_t events=revs[i].events;
                    //差错处理
                    if((events&EPOLLERR)||(events&EPOLLHUP))
                    {
                        Event* p_ev=_events_map[sock];
                        //调用回调
                        if(p_ev->_recver!=nullptr)
                        {
                            p_ev->_errorer(p_ev);
                        }
                        std::cout<<"差错处理完成"<<std::endl;
                    }
                    //读事件
                    if(events&EPOLLIN)
                    {
                        Event* p_ev=_events_map[sock];
                        //调用回调
                        if(p_ev->_recver!=nullptr)
                        {
                            p_ev->_recver(p_ev);
                        }
                    }
                    //写事件
                    if(events&EPOLLOUT)
                    {
                        Event* p_ev=_events_map[sock];
                        if(p_ev->_sender!=nullptr)
                        {
                            p_ev->_sender(p_ev);
                        }
                    }
                }
            }
    
            ~Reactor()
            {
                if(_epfd>=0)
                {
                    close(_epfd);
                }
            }
    
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158

    ThreadPool.hpp

    #pragma once 
    #include 
    #include 
    #include
    #include
    #include
    #include
    #include"Reactor.hpp"
    #include"Util.hpp"
    
    namespace hjl_thread_pool
    {
        class Task
        {
        public:
            Event* p_ev; 
            Task(Event* p):p_ev(p)
            {
            }
            void Run()
            {
                std::vector<std::string>messages;
                hjl_util::StringUtil::SplitString(p_ev->_in_buffer,messages,"X");
                //针对一个个报文进行计算,比如一个报文为:1+2
                for(auto& str:messages)
                {
                    //进行业务处理,形参响应报文
                    //业务处理可以使用线程池完成 这样就实现了半同步半异步
                    int ans=hjl_util::StringUtil::calculate(str);
                    std::string response;
                    response+=str;
                    response+="=";
                    response+=std::to_string(ans);
    
                    //将响应报文放入发送缓冲区
                    p_ev->_out_buffer+=response;
                    //响应报文之间添加分隔符
                    p_ev->_out_buffer+="X";
                }
                //进行写回
                if(!p_ev->_out_buffer.empty())
                {
                    p_ev->_p_reactor->ChangeEvent(p_ev->_sock,true,true); 
                }
            }
            ~Task()
            {
            
            }
        };
    
        class ThreadPool
        {
            private:
                static ThreadPool* _ptr;
                static mutex _mtx;
    
                std::queue<Task*> q;//任务队列
                int max_num;//线程的总数
                pthread_mutex_t lock;
                pthread_cond_t cond;//只能消费者等待,因为生产者要从外部获取任务
                
                void LockQueue(){
                    pthread_mutex_lock(&lock);    
                }
                void UnlockQueue(){
                    pthread_mutex_unlock(&lock);
                }
                bool IsEmpty(){
                    return q.size()==0;
                }
                void ThreadWait(){
                    pthread_cond_wait(&cond,&lock);
                }  
                
                void ThreadWakeup(){
                    //唤醒线程
                    pthread_cond_signal(&cond);
                }
                void ThreadsWakeup(){
                    //唤醒所有的线程
                    pthread_cond_broadcast(&cond);
                }
                ThreadPool()
                {
    
                }
            public:
                static ThreadPool* GetInstance()//返回指针
                {
                    if (_ptr == nullptr)//双检查,防止多次加锁进行无关消耗
                    {
                    //初始化一次
                    unique_lock<mutex> lock(_mtx);//防止临界资源不安全,进行加锁
                    if (_ptr == nullptr)
                    {
                        _ptr = new ThreadPool;
                    }
                    }
                    return _ptr;
                }
                //必须设置为静态的,因为成员函数会有this形参
                static void*Routine(void*arg)
                {
    
                    ThreadPool*this_p=(ThreadPool*)arg;
                    while(true)
                    {
                        //从任务队列中拿任务
                        this_p->LockQueue();
                        while(this_p->IsEmpty())
                        {//如果任务队列为空则让线程等待
                            this_p->ThreadWait();
                        }
                        Task* t;
                        this_p->Get(t);
                        this_p->UnlockQueue();
                        t->Run();
                        delete t;
                    }
                }
                void ThreadPoolInit(int _max)
                {
                    max_num=_max;
                    pthread_mutex_init(&lock,nullptr);
                    pthread_cond_init(&cond,nullptr);  
                    pthread_t t;//创建一批线程
                    for(int i=0;i<max_num;i++)
                    {
                    pthread_create(&t,nullptr,Routine,this);
                    }
                }
    
                void Put(Task* &in)
                {//往任务队列中放任务
                    LockQueue();
                    q.push(in);
                    UnlockQueue();
                    ThreadWakeup();//放完任务后唤醒一个线程
    
                }
                void Get(Task*&out)
                {
                    Task*t=q.front();
                    q.pop();
                    out=t;
                }
    
                ~ThreadPool()
                {
                    pthread_mutex_destroy(&lock);
                    pthread_cond_destroy(&cond);
                }
        };
    
        ThreadPool* ThreadPool::_ptr = nullptr;//初始化为空
        mutex ThreadPool::_mtx;//初始化锁
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158

    Util.hpp:

    #pragma once
    #include
    #include
    #include
    #include
    //工具类
    namespace hjl_util
    {
        //将sock设置为非阻塞
        void SetNonBlock(int sock)
        {
            int fl=fcntl(sock,F_GETFL);
            if(fl<0)
            {
                std::cerr<<"获取标志位失败"<<std::endl;
                return;
            }
            fcntl(sock,F_SETFL,fl|O_NONBLOCK);
        }
        class StringUtil
        {
            public:
                static void SplitString(std::string&in,std::vector<std::string>&out,std::string sep)
                {
                    while(1)
                    {
                        int pos=in.find(sep);
                        if(pos==std::string::npos)
                        {
                            return;
                        }
                        std::string s=in.substr(0,pos);
                        out.push_back(s);
                        in.erase(0,pos+sep.size());
                    }
                }
                static int calculate(std::string& s)
                {
                    int result = 0, inter_res = 0, num = 0;
                    char op = '+';
                    char ch;
                            //找到第一个不为空的字符
                    for (int pos = s.find_first_not_of(' '); pos < s.size(); pos = s.find_first_not_of(' ', pos))
                    {
                        ch = s[pos];
                                    //判断是否是数字
                        if (ch >= '0' && ch <= '9')
                        {
                            int num = ch - '0';
                            //判断下一个字符是否也是数字
                            while (++pos < s.size() && s[pos] >= '0' && s[pos] <= '9')
                                num = num * 10 + s[pos] - '0';
                            switch (op)
                            {
                            case '+':
                                inter_res += num;
                                break;
                            case '-':
                                inter_res -= num;
                                break;
                            case '*':
                                inter_res *= num;
                                break;
                            case '/':
                                inter_res /= num;
                                break;
                            }
                        }
                        else
                        {
                            if (ch == '+' || ch == '-')
                            {
                                result += inter_res;
                                inter_res = 0;
                            }
                            op = s[pos++];
                        }
                    }
                    return result + inter_res;
                }   
    
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    Accepter.hpp:

    #pragma once
    #include"Reactor.hpp"
    #include"Sock.hpp"
    #include"Util.hpp"
    #include
    #include
    #include
    #include
    #include"ThreadPool.hpp"
    
    namespace hjl_service
    {
        //返回值为0表示读取成功
        int _Recver(int sock,std::string&out)
        {
            while(1)
            {
                char buffer[128];
                int size=recv(sock,buffer,sizeof(buffer)-1,0);
                if(size<0)
                {
                    if(errno==EAGAIN||errno==EWOULDBLOCK)
                    {
                        //循环读取完毕
                        return 0;
                    }
                    else if(errno==EINTR)
                    {
                        //被信号中断了,继续读
                        continue;
                    }
                    else
                    {
                        //读取出错
                        return -1;
                    }
                }
                else if(size==0)
                {
                    std::cout<<"用户退出"<<std::endl;
                    return -1;
                }
                else
                {
                    buffer[size]=0;
                    out+=buffer;
                }
            }
        }
        int Recver(Event*p_ev)
        {
            std::cout<<"开始读取数据,文件描述符:"<<p_ev->_sock<<std::endl;
            //需要非阻塞读取
            if(_Recver(p_ev->_sock,p_ev->_in_buffer)<0)
            {
                //读取失败了
                p_ev->_errorer(p_ev);
                return -1;
            }
            //进行包和包之间的分离,防止出现粘包问题,报文内容格式举例:1+1X1*2
            std::cout<<"client#"<<p_ev->_in_buffer<<std::endl;
    
            //读到数据,交给线程池运行
            hjl_thread_pool::Task* t=new hjl_thread_pool::Task(p_ev);
            hjl_thread_pool::ThreadPool::GetInstance()->Put(t);
    
            return 0;
        }
    
        int _Sender(int sock,std::string&in)
        {
            int total=0;
            while(1)
            {
                //累计一共写入多少字节
                int s=send(sock,in.c_str()+total,in.size()-total,0);
                if(s>0)
                {
                    total+=s;
                    //已经将缓冲区写满,不能再写入
                    if(total==in.size())
                    {
                        in.erase(0,total);
                        return 0;
                    }
                }
                else if(s<0)
                {
                    if(errno==EAGAIN||errno==EWOULDBLOCK)
                    {
                        //无论是否发送完,都需要把已经发送的数据,移除缓冲区
                        in.erase(0,total);
                        return 1;
                    }
                    else if(errno==EINTR)
                    {
                        //被信号中断
                        continue;
                    }
                    else
                    {
                        //写入失败
                        return -1;
                    }
                }
    
            }
        }
        int Sender(Event*p_ev)
        {
            std::cout<<"发送的数据:"<<p_ev->_out_buffer<<std::endl;
            int ret=_Sender(p_ev->_sock,p_ev->_out_buffer);
            //发回数据
            if(ret==0)
            {
                p_ev->_p_reactor->ChangeEvent(p_ev->_sock,true,false);
            }
            else if(ret==1)
            {
                //还没写完,打开写接口继续写
                p_ev->_p_reactor->ChangeEvent(p_ev->_sock,true,true);
            }
            else
            {
                //写出错
                p_ev->_errorer(p_ev);
            }
    
        }
        int Errorer(Event*p_ev)
        {
            close(p_ev->_sock);
            p_ev->_p_reactor->DeleteEvent(p_ev);
        }
    
        int Accepter(Event*p_ev)
        {
            std::cout<<"有新的链接到来,listen_sock是:"<<p_ev->_sock<<std::endl;
            int listen_sock=p_ev->_sock;
            while(1)
            {
                int sock=Sock::Accept(listen_sock);
                if(sock<0)
                {
                    std::cout<<"Accept完成"<<std::endl;
                    break;
                }
                std::cout<<"Accept成功:"<<sock<<std::endl;
    
                hjl_util::SetNonBlock(sock);
                Event* new_p_ev=new Event();
                new_p_ev->_sock=sock;
                //回指_p_reactor
                new_p_ev->_p_reactor=p_ev->_p_reactor;
                //给sock注册读写错误回调方法
                new_p_ev->RegisterCallback(Recver,Sender,Errorer);
    
                p_ev->_p_reactor->InsertEvent(new_p_ev,EPOLLIN|EPOLLET);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161

    epoll_server.cpp:

    #include"Reactor.hpp"
    #include"Sock.hpp"
    #include"Accepter.hpp"
    #include"ThreadPool.hpp"
    
    void Usage(std::string proc)
    {
        std::cout<<"Usage:"<<proc<<" port"<<std::endl;
    }
    int main(int argc,char* argv[])
    {
        if(argc!=2)
        {
            Usage(argv[0]);
            exit(1);
        }
        hjl_thread_pool::ThreadPool::GetInstance()->ThreadPoolInit(1);
        //创建socket
        int listen_sock=Sock::Socket();
        //ET模式下,设置非阻塞
        hjl_util::SetNonBlock(listen_sock);
        Sock::Bind(listen_sock,(uint16_t)atoi(argv[1]));
        Sock::Listen(listen_sock);
        Reactor* _p_reactor=new Reactor();
        _p_reactor->InitReactor();
    
        //将listen套接字和p_reactor绑定
        Event* p_ev=new Event();
        p_ev->_sock=listen_sock;
        p_ev->_p_reactor=_p_reactor;
        p_ev->RegisterCallback(hjl_service::Accepter,nullptr,nullptr);
        _p_reactor->InsertEvent(p_ev,EPOLLIN|EPOLLET);
    
        //开始进行事件派发
        int time_out=1000;
        while(1)
        {
            _p_reactor->Dispatcher(time_out);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    在这里插入图片描述

  • 相关阅读:
    从零学算法264
    【第2章 Node.js基础】2.7 Node.js 的流(一) 可读流
    RabbitMQ实现延迟队列
    二进制部署1.23.4版本k8s集群-3-部署架构及根证书签发
    十 动手学深度学习v2 ——卷积神经网络之NiN + GoogLeNet
    C++中迭代器的使用
    栈与队列c++算法练习
    Matlab中脚本的运用
    妙鸭相机功能代码复现
    Pwn学习随笔
  • 原文地址:https://blog.csdn.net/qq_52670477/article/details/126238348