• linux下的单线程reactor文件传输


    先简单了解一下reactor模型:

     

    ## 具体流程如下:

    1.注册读就绪事件和相应的事件处理器

    2.事件分离器等待事件

    3.事件到来,激活分离器,分离器调用事件对应的处理器

    4.事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。

    ## 优点:

    1.响应快,不必为单个同步时间所阻塞,虽然reactor本身依然是同步的

    2.编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销

    3可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源

    4.可复用性,Reactor框架本身与具体事件处理逻辑无关,具有很高的复用性。

    这里reactor有一个特殊的机制:采用回调函数,谁发生就自己调用自己去解决自己的问题;

    相比于epoll的遍历消息列表,reactor的效率会大大增加。

    这里先例举出一个简单的reactor框架

    1. #include<stdlib.h>
    2. #include<stdio.h>
    3. #include<unistd.h>
    4. #include<sys/socket.h>
    5. #include<arpa/inet.h>
    6. #include<poll.h>
    7. #include<sys/epoll.h>
    8. #include<pthread.h>
    9. #define MAXNUM 4096
    10. typedef struct reactor
    11. {
    12. int epfd;//树根
    13. struct epoll_event sockary[MAXNUM];
    14. }reactor;
    15. reactor*eventloop=NULL;
    16. typedef struct item
    17. {
    18. int sockfd;
    19. void(*callback)(int,int,void*);
    20. char szbuf[MAXNUM];
    21. int nbuffersize;
    22. }item;
    23. void recv_cb(int fd,int events,void *arg)
    24. {
    25. item*pitem=(item*)arg;
    26. int res=recv(fd,pitem->szbuf,MAXNUM,0);
    27. if(res>0)
    28. {
    29. printf("recv is :%s\n",pitem->szbuf);
    30. }
    31. else
    32. {
    33. struct epoll_event ee;
    34. ee.events=EPOLLIN;
    35. epoll_ctl(eventloop->epfd,EPOLL_CTL_DEL,fd,&ee);
    36. close(fd);
    37. free(pitem);
    38. }
    39. }
    40. void accept_cb(int fd,int events,void *arg)
    41. {
    42. int clientfd=accept(fd,0,0);
    43. printf("accept success");
    44. item*pitem=(item*)malloc(sizeof(item));
    45. pitem->sockfd=clientfd;
    46. pitem->callback=recv_cb;
    47. struct epoll_event ep;
    48. ep.events=EPOLLIN|EPOLLET;
    49. ep.data.ptr=pitem;
    50. epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,clientfd,&ep);
    51. }
    52. int main()
    53. {
    54. //创建套接子socket()
    55. int sockfd=socket(AF_INET,SOCK_STREAM,0);
    56. if(sockfd==-1)
    57. {
    58. perror("socket failed...");
    59. exit(0);
    60. }
    61. //绑定bind()
    62. struct sockaddr_in addr;
    63. addr.sin_family=AF_INET;
    64. addr.sin_addr.s_addr=0;
    65. addr.sin_port=htons(8899);
    66. if(-1==bind(sockfd,(struct sockaddr*)&addr,sizeof(addr)))
    67. {
    68. perror("bind failed...");
    69. exit(0);
    70. }
    71. //监听listen()
    72. if(-1==listen(sockfd,128))
    73. {
    74. perror("listen failed...");
    75. exit(0);
    76. }
    77. //定义struct
    78. eventloop=(reactor*)malloc(sizeof(reactor));
    79. eventloop->epfd=epoll_create(MAXNUM);
    80. struct epoll_event ep;
    81. ep.events=EPOLLIN|EPOLLET;
    82. item*pitem=(item*)malloc(sizeof(item));
    83. pitem->sockfd=sockfd;
    84. pitem->callback=accept_cb;
    85. ep.data.ptr=(void*)pitem;
    86. epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,sockfd,&ep);
    87. //连接accept()
    88. int nreadnum=0;
    89. while(1)
    90. {
    91. //监控的红黑数文件描述,已经发生的消息的连表,最大管理数量,不产生阻塞
    92. nreadnum=epoll_wait(eventloop->epfd,eventloop->sockary,MAXNUM,-1);
    93. int i=0;
    94. while(i<nreadnum)
    95. {
    96. if(eventloop->sockary[i].events&EPOLLIN)
    97. {
    98. item*pitem=(item*)eventloop->sockary[i].data.ptr;
    99. pitem->callback(pitem->sockfd,eventloop->sockary[i].events,pitem);
    100. }
    101. i++;
    102. sleep(1);
    103. }
    104. }
    105. close(sockfd);
    106. return 0;
    107. }

    这个框架仅限于于客户端对话。

    我们在这框架的基础上改进

    简单介绍一下传输逻辑:

    这里我们采用传输文件的大小,在传文件内容

    在接受的时候,我们采用先接受文件大小,在接受文件内容的方式解决了粘包的问题。

    改进后的代码如下

    服务器:

    1. #include<stdlib.h>
    2. #include<stdio.h>
    3. #include<unistd.h>
    4. #include<sys/socket.h>
    5. #include<arpa/inet.h>
    6. #include<poll.h>
    7. #include<sys/epoll.h>
    8. #include<pthread.h>
    9. #include<string.h>
    10. #define MAXNUM 4096
    11. typedef struct fileinfo
    12. {
    13. int filesize;
    14. char filepath[1024];
    15. }fileinfo;
    16. typedef struct reactor
    17. {
    18. int epfd;//树根
    19. struct epoll_event sockary[MAXNUM];
    20. }reactor;
    21. reactor*eventloop=NULL;
    22. typedef struct item
    23. {
    24. int sockfd;
    25. void(*callback)(int,int,void*);
    26. char *szbuf;
    27. char filepath[MAXNUM];
    28. int nbuffersize;
    29. int noffest;
    30. }item;
    31. void recv_cb(int fd,int events,void *arg)
    32. {
    33. item*pitem=(item*)arg;
    34. //开文件看路径是否有效
    35. FILE*fp=fopen("/home/lzl/learn/network/test/recv/recv.txt","a");
    36. if(NULL==fp)
    37. {
    38. printf("fopen failed...\n");
    39. return;
    40. }
    41. //第一次接受包大小
    42. if(pitem->nbuffersize==0&&pitem->noffest==0)
    43. {
    44. int ner=recv(fd,&pitem->nbuffersize,sizeof(int),0);
    45. {
    46. if(ner>0)
    47. {
    48. if(pitem->nbuffersize>0)
    49. {
    50. printf("filesize is %d\n",pitem->nbuffersize);
    51. pitem->szbuf=(char*)malloc(pitem->nbuffersize);
    52. pitem->noffest=0;
    53. }
    54. }
    55. else
    56. {
    57. printf("下线了\n");
    58. struct epoll_event ee;
    59. ee.events=EPOLLIN;
    60. epoll_ctl(eventloop->epfd,EPOLL_CTL_DEL,fd,&ee);
    61. close(fd);
    62. free(pitem);
    63. }
    64. }
    65. }
    66. else
    67. {
    68. char filecont[10240];
    69. while(pitem->nbuffersize)
    70. {
    71. int res=recv(fd,filecont,pitem->nbuffersize,0);
    72. if(res>0)
    73. {
    74. printf("recv data is %s:\n",filecont);
    75. pitem->nbuffersize-=res;
    76. pitem->noffest+=res;
    77. }
    78. }
    79. fwrite(filecont,1,pitem->noffest,fp);
    80. fclose(fp);
    81. printf("recvdata success...\n");
    82. free(pitem->szbuf);
    83. }
    84. }
    85. void accept_cb(int fd,int events,void *arg)
    86. {
    87. int clientfd=accept(fd,0,0);
    88. printf("accept success");
    89. item*pitem=(item*)malloc(sizeof(item));
    90. pitem->sockfd=clientfd;
    91. pitem->callback=recv_cb;
    92. struct epoll_event ep;
    93. ep.events=EPOLLIN|EPOLLET;
    94. ep.data.ptr=pitem;
    95. epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,clientfd,&ep);
    96. }
    97. int main()
    98. {
    99. //创建套接子socket()
    100. int sockfd=socket(AF_INET,SOCK_STREAM,0);
    101. if(sockfd==-1)
    102. {
    103. perror("socket failed...");
    104. exit(0);
    105. }
    106. //绑定bind()
    107. struct sockaddr_in addr;
    108. addr.sin_family=AF_INET;
    109. addr.sin_addr.s_addr=0;
    110. addr.sin_port=htons(8899);
    111. if(-1==bind(sockfd,(struct sockaddr*)&addr,sizeof(addr)))
    112. {
    113. perror("bind failed...");
    114. exit(0);
    115. }
    116. //监听listen()
    117. if(-1==listen(sockfd,128))
    118. {
    119. perror("listen failed...");
    120. exit(0);
    121. }
    122. //定义struct
    123. eventloop=(reactor*)malloc(sizeof(reactor));
    124. eventloop->epfd=epoll_create(MAXNUM);
    125. struct epoll_event ep;
    126. ep.events=EPOLLIN|EPOLLET;
    127. item*pitem=(item*)malloc(sizeof(item));
    128. pitem->sockfd=sockfd;
    129. pitem->callback=accept_cb;
    130. ep.data.ptr=(void*)pitem;
    131. epoll_ctl(eventloop->epfd,EPOLL_CTL_ADD,sockfd,&ep);
    132. //连接accept()
    133. int nreadnum=0;
    134. while(1)
    135. {
    136. //监控的红黑数文件描述,已经发生的消息的连表,最大管理数量,不产生阻塞
    137. nreadnum=epoll_wait(eventloop->epfd,eventloop->sockary,MAXNUM,-1);
    138. int i=0;
    139. while(i<nreadnum)
    140. {
    141. if(eventloop->sockary[i].events&EPOLLIN)
    142. {
    143. item*pitem=(item*)eventloop->sockary[i].data.ptr;
    144. pitem->callback(pitem->sockfd,eventloop->sockary[i].events,pitem);
    145. }
    146. i++;
    147. sleep(1);
    148. }
    149. }
    150. close(sockfd);
    151. return 0;
    152. }

    客户端

    1. #include<stdio.h>
    2. #include<stdlib.h>
    3. #include<unistd.h>
    4. #include<sys/socket.h>
    5. #include<arpa/inet.h>
    6. #include<string.h>
    7. #define MAXSIZE 1024
    8. #include <sys/types.h>
    9. #include <sys/stat.h>
    10. #include <fcntl.h>
    11. typedef struct file
    12. {
    13. int filesize;
    14. char* filecont;
    15. }file;
    16. int main()
    17. {
    18. //socket()
    19. int clientfd=socket(AF_INET,SOCK_STREAM,0);
    20. //connect()
    21. struct sockaddr_in addr;
    22. addr.sin_family=AF_INET;
    23. addr.sin_addr.s_addr=inet_addr("127.0.0.1");
    24. addr.sin_port=htons(8899);
    25. if(-1==connect(clientfd,(struct sockaddr*)&addr,sizeof(addr)))
    26. {
    27. perror("connect failed");
    28. exit(0);
    29. }
    30. size_t nerl;
    31. FILE*fp=fopen("/home/lzl/learn/network/test/send/send.txt","r");
    32. file sfile;
    33. sfile.filesize=0;
    34. sfile.filecont=NULL;
    35. int bigfilesize=0;
    36. fseek(fp,0,SEEK_END);
    37. bigfilesize=ftell(fp);
    38. printf("bigsize is %d\n",bigfilesize);
    39. fseek(fp,0,SEEK_SET);
    40. sfile.filecont=(char*)malloc(sizeof(char)*bigfilesize);
    41. while((nerl=fread(sfile.filecont,1,1024,fp))>0)
    42. {
    43. sfile.filesize=bigfilesize;
    44. printf("filecont is %s\n",sfile.filecont);
    45. printf("filesize is %d\n",sfile.filesize);
    46. if(send(clientfd,(char*)&sfile.filesize,sizeof(int),0)<0)
    47. {
    48. printf("file send failed...\n");
    49. break;
    50. }
    51. if(send(clientfd,(char*)sfile.filecont,bigfilesize,0)<0)
    52. {
    53. printf("file send failed...\n");
    54. break;
    55. }
    56. }sleep(1);
    57. free(sfile.filecont);
    58. fclose(fp);
    59. close(clientfd);
    60. return 0;
    61. }

  • 相关阅读:
    .net framework中webapi使用swagger进行接口文档展示
    js数组介绍:创建、length的用法、冒泡排序法、选择排序法
    别用Excel、Python了,我找到了实现报表自动化最快的方法
    只依赖OPENCV的工作服安全帽检测YOLOV8S
    多层神经网络和激活函数
    jdk 8-future 异步处理-轮询isDone判断完成-springBoot示例
    网络安全攻防:ZigBee安全
    Hexo博客使用aplayer音乐播放插件
    如何用jxTMS开发一个功能(二)
    物理内存 虚拟内存 页映射模式
  • 原文地址:https://blog.csdn.net/van9527/article/details/127769530