先简单了解一下reactor模型:

## 具体流程如下:
1.注册读就绪事件和相应的事件处理器
2.事件分离器等待事件
3.事件到来,激活分离器,分离器调用事件对应的处理器
4.事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。
## 优点:
1.响应快,不必为单个同步时间所阻塞,虽然reactor本身依然是同步的
2.编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
3可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源
4.可复用性,Reactor框架本身与具体事件处理逻辑无关,具有很高的复用性。
这里reactor有一个特殊的机制:采用回调函数,谁发生就自己调用自己去解决自己的问题;
相比于epoll的遍历消息列表,reactor的效率会大大增加。
这里先例举出一个简单的reactor框架
- #include<stdlib.h>
- #include<stdio.h>
- #include<unistd.h>
- #include<sys/socket.h>
- #include<arpa/inet.h>
- #include<poll.h>
- #include<sys/epoll.h>
- #include<pthread.h>
-
- #define MAXNUM 4096
- typedef struct reactor
- {
- int epfd;//树根
- struct epoll_event sockary[MAXNUM];
-
- }reactor;
- reactor*eventloop=NULL;
-
- typedef struct item
- {
- int sockfd;
- void(*callback)(int,int,void*);
- char szbuf[MAXNUM];
- int nbuffersize;
- }item;
-
- void recv_cb(int fd,int events,void *arg)
- {
- item*pitem=(item*)arg;
- int res=recv(fd,pitem->szbuf,MAXNUM,0);
- if(res>0)
- {
- printf("recv is :%s\n",pitem->szbuf);
- }
- else
- {
- struct epoll_event ee;
- ee.events=EPOLLIN;
- epoll_ctl(eventloop->epfd,EPOLL_CTL_DEL,fd,&ee);
- close(fd);
- free(pitem);
- }
-
- }
- void accept_cb(int fd,int events,void *arg)
- {
-
- int clientfd=accept(fd,0,0);
- printf("accept success");
- item*pitem=(item*)malloc(sizeof(item));
- pitem->sockfd=clientfd;
- pitem->callback=recv_cb;
- struct epoll_event ep;
- ep.events=EPOLLIN|EPOLLET;
- ep.data.ptr=pitem;
- epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,clientfd,&ep);
-
- }
-
- int main()
- {
- //创建套接子socket()
- int sockfd=socket(AF_INET,SOCK_STREAM,0);
- if(sockfd==-1)
- {
- perror("socket failed...");
- exit(0);
- }
- //绑定bind()
- struct sockaddr_in addr;
- addr.sin_family=AF_INET;
- addr.sin_addr.s_addr=0;
- addr.sin_port=htons(8899);
- if(-1==bind(sockfd,(struct sockaddr*)&addr,sizeof(addr)))
- {
- perror("bind failed...");
- exit(0);
- }
- //监听listen()
- if(-1==listen(sockfd,128))
- {
- perror("listen failed...");
- exit(0);
- }
- //定义struct
- eventloop=(reactor*)malloc(sizeof(reactor));
- eventloop->epfd=epoll_create(MAXNUM);
- struct epoll_event ep;
- ep.events=EPOLLIN|EPOLLET;
- item*pitem=(item*)malloc(sizeof(item));
- pitem->sockfd=sockfd;
- pitem->callback=accept_cb;
- ep.data.ptr=(void*)pitem;
- epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,sockfd,&ep);
- //连接accept()
-
- int nreadnum=0;
- while(1)
- {
- //监控的红黑数文件描述,已经发生的消息的连表,最大管理数量,不产生阻塞
- nreadnum=epoll_wait(eventloop->epfd,eventloop->sockary,MAXNUM,-1);
- int i=0;
- while(i<nreadnum)
- {
- if(eventloop->sockary[i].events&EPOLLIN)
- {
- item*pitem=(item*)eventloop->sockary[i].data.ptr;
- pitem->callback(pitem->sockfd,eventloop->sockary[i].events,pitem);
- }
- i++;
- sleep(1);
- }
-
-
- }
- close(sockfd);
-
- return 0;
- }
这个框架仅限于于客户端对话。
我们在这框架的基础上改进
简单介绍一下传输逻辑:
这里我们采用传输文件的大小,在传文件内容
在接受的时候,我们采用先接受文件大小,在接受文件内容的方式解决了粘包的问题。
改进后的代码如下
服务器:
- #include<stdlib.h>
- #include<stdio.h>
- #include<unistd.h>
- #include<sys/socket.h>
- #include<arpa/inet.h>
- #include<poll.h>
- #include<sys/epoll.h>
- #include<pthread.h>
- #include<string.h>
- #define MAXNUM 4096
- typedef struct fileinfo
- {
- int filesize;
- char filepath[1024];
- }fileinfo;
- typedef struct reactor
- {
- int epfd;//树根
- struct epoll_event sockary[MAXNUM];
-
- }reactor;
- reactor*eventloop=NULL;
-
- typedef struct item
- {
- int sockfd;
- void(*callback)(int,int,void*);
- char *szbuf;
- char filepath[MAXNUM];
- int nbuffersize;
- int noffest;
- }item;
-
- void recv_cb(int fd,int events,void *arg)
- {
- item*pitem=(item*)arg;
-
-
- //开文件看路径是否有效
- FILE*fp=fopen("/home/lzl/learn/network/test/recv/recv.txt","a");
- if(NULL==fp)
- {
- printf("fopen failed...\n");
- return;
- }
- //第一次接受包大小
- if(pitem->nbuffersize==0&&pitem->noffest==0)
- {
- int ner=recv(fd,&pitem->nbuffersize,sizeof(int),0);
- {
- if(ner>0)
- {
- if(pitem->nbuffersize>0)
- {
- printf("filesize is %d\n",pitem->nbuffersize);
- pitem->szbuf=(char*)malloc(pitem->nbuffersize);
- pitem->noffest=0;
- }
- }
- else
- {
- printf("下线了\n");
- struct epoll_event ee;
- ee.events=EPOLLIN;
- epoll_ctl(eventloop->epfd,EPOLL_CTL_DEL,fd,&ee);
- close(fd);
- free(pitem);
-
- }
- }
- }
- else
- {
- char filecont[10240];
- while(pitem->nbuffersize)
- {
- int res=recv(fd,filecont,pitem->nbuffersize,0);
- if(res>0)
- {
- printf("recv data is %s:\n",filecont);
- pitem->nbuffersize-=res;
- pitem->noffest+=res;
- }
-
-
- }
- fwrite(filecont,1,pitem->noffest,fp);
- fclose(fp);
- printf("recvdata success...\n");
- free(pitem->szbuf);
- }
-
-
- }
- void accept_cb(int fd,int events,void *arg)
- {
-
- int clientfd=accept(fd,0,0);
- printf("accept success");
- item*pitem=(item*)malloc(sizeof(item));
- pitem->sockfd=clientfd;
- pitem->callback=recv_cb;
- struct epoll_event ep;
- ep.events=EPOLLIN|EPOLLET;
- ep.data.ptr=pitem;
- epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,clientfd,&ep);
-
- }
-
- int main()
- {
- //创建套接子socket()
- int sockfd=socket(AF_INET,SOCK_STREAM,0);
- if(sockfd==-1)
- {
- perror("socket failed...");
- exit(0);
- }
- //绑定bind()
- struct sockaddr_in addr;
- addr.sin_family=AF_INET;
- addr.sin_addr.s_addr=0;
- addr.sin_port=htons(8899);
- if(-1==bind(sockfd,(struct sockaddr*)&addr,sizeof(addr)))
- {
- perror("bind failed...");
- exit(0);
- }
- //监听listen()
- if(-1==listen(sockfd,128))
- {
- perror("listen failed...");
- exit(0);
- }
- //定义struct
- eventloop=(reactor*)malloc(sizeof(reactor));
- eventloop->epfd=epoll_create(MAXNUM);
- struct epoll_event ep;
- ep.events=EPOLLIN|EPOLLET;
- item*pitem=(item*)malloc(sizeof(item));
- pitem->sockfd=sockfd;
- pitem->callback=accept_cb;
- ep.data.ptr=(void*)pitem;
- epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,sockfd,&ep);
- //连接accept()
-
- int nreadnum=0;
- while(1)
- {
- //监控的红黑数文件描述,已经发生的消息的连表,最大管理数量,不产生阻塞
- nreadnum=epoll_wait(eventloop->epfd,eventloop->sockary,MAXNUM,-1);
- int i=0;
- while(i<nreadnum)
- {
- if(eventloop->sockary[i].events&EPOLLIN)
- {
- item*pitem=(item*)eventloop->sockary[i].data.ptr;
- pitem->callback(pitem->sockfd,eventloop->sockary[i].events,pitem);
- }
- i++;
- sleep(1);
- }
-
-
- }
- close(sockfd);
-
- return 0;
- }
客户端
- #include<stdio.h>
- #include<stdlib.h>
- #include<unistd.h>
- #include<sys/socket.h>
- #include<arpa/inet.h>
- #include<string.h>
- #define MAXSIZE 1024
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <fcntl.h>
-
- typedef struct file
- {
- int filesize;
- char* filecont;
- }file;
-
- int main()
- {
- //socket()
- int clientfd=socket(AF_INET,SOCK_STREAM,0);
- //connect()
- struct sockaddr_in addr;
- addr.sin_family=AF_INET;
- addr.sin_addr.s_addr=inet_addr("127.0.0.1");
- addr.sin_port=htons(8899);
- if(-1==connect(clientfd,(struct sockaddr*)&addr,sizeof(addr)))
- {
- perror("connect failed");
- exit(0);
- }
-
- size_t nerl;
- FILE*fp=fopen("/home/lzl/learn/network/test/send/send.txt","r");
-
- file sfile;
- sfile.filesize=0;
- sfile.filecont=NULL;
- int bigfilesize=0;
- fseek(fp,0,SEEK_END);
- bigfilesize=ftell(fp);
- printf("bigsize is %d\n",bigfilesize);
- fseek(fp,0,SEEK_SET);
- sfile.filecont=(char*)malloc(sizeof(char)*bigfilesize);
- while((nerl=fread(sfile.filecont,1,1024,fp))>0)
- {
-
-
- sfile.filesize=bigfilesize;
- printf("filecont is %s\n",sfile.filecont);
- printf("filesize is %d\n",sfile.filesize);
-
- if(send(clientfd,(char*)&sfile.filesize,sizeof(int),0)<0)
- {
- printf("file send failed...\n");
- break;
- }
- if(send(clientfd,(char*)sfile.filecont,bigfilesize,0)<0)
- {
- printf("file send failed...\n");
- break;
- }
-
- }sleep(1);
- free(sfile.filecont);
- fclose(fp);
- close(clientfd);
-
- return 0;
- }