• TCP/IP网络编程(9) 进程间通信


    1. IPC基本概念

    进程间通信(Inter process communication, IPC)以为这两个不同进程间可以交换数据,为了完成这一点,操作系统中应该提供两个进程可以同时访问的内存空间。因此,只要有两个进程可以同时访问的内存空间,就可以通过此空间来交换数据。但是,进程具有完全独立的内存结构,即使通过fork()函数创建的子进程,也不会与父进程共享内存,因此进程间通信需要通过其他的特殊方法完成。

    1.1 利用管道实现进程间通信

    为完成进程之间的通信,需要创建管道,管道不属于进程的资源(即不是fork复制的对象),而是和套接字一样,属于操作系统资源,因此通过管道首实现IPC的原理是,两个进程通过操作系统提供的内存进行通信。
     

    创建管道的方式:

    1. #include
    2. /*
    3. 成功时返回0
    4. 失败时返回-1
    5. 参数:
    6. fields[0] 通过管道接收数据时使用的文件描述符
    7. fields[1] 通过管道传输数据时使用的文件描述符
    8. */
    9. int pipe(int fields[2]);

    父进程调用pipe函数时将创建管道,同时获取对应于管道出入口的文件描述符,此时父进程可以读写同一管道,在调用fork创建子进程之后,子进程也将同时拥有管道出入口的文件描述符:

    1. #include
    2. #include
    3. #define BUFF_SIZE 30
    4. int main(int argc, char** argv)
    5. {
    6. int fds[2]; // 用于存储管道出入口的文件描述符
    7. char message[] = "Hello Process\n";
    8. char buffer[BUFF_SIZE];
    9. if (pipe(fds) == -1)
    10. {
    11. printf("Falied to create pipe.\n");
    12. return -1;
    13. }
    14. pid_t pid = fork(); // 创建进行
    15. if (pid == 0)
    16. {
    17. // 子进程 写入管道
    18. write(fds[1], message, sizeof(message));
    19. }
    20. else
    21. {
    22. // 父进程 读取管道
    23. read(fds[0], buffer, BUFF_SIZE);
    24. fputs(buffer, stdout);
    25. }
    26. return 0;
    27. }

    进程间通信的示意图图下所示:

    1.2 通过管道实现进程间双向通信

    方案1:通过一个管道实现两个进程间的双向通信

    代码实例:

    1. #include
    2. #include
    3. #include
    4. #define BUFF_SIZE 30
    5. int main(int argc, char** argv)
    6. {
    7. int fds[2]; // 用于存储管道出入口的文件描述符
    8. char message1[] = "Hello Process\n";
    9. char message2[] = "I am vac!\n";
    10. char buffer[BUFF_SIZE];
    11. memset(buffer, 0, BUFF_SIZE);
    12. if (pipe(fds) == -1)
    13. {
    14. printf("Falied to create pipe.\n");
    15. return -1;
    16. }
    17. pid_t pid = fork(); // 创建进行
    18. if (pid == 0)
    19. {
    20. // 子进程 写入管道
    21. write(fds[1], message1, sizeof(message1));
    22. sleep(2);
    23. read(fds[0], buffer, BUFF_SIZE);
    24. printf("Child proc received data: %s\n", buffer);
    25. memset(buffer, 0, BUFF_SIZE);
    26. }
    27. else
    28. {
    29. // 父进程 读取管道
    30. read(fds[0], buffer, BUFF_SIZE);
    31. printf("Parent proc received data: %s\n", buffer);
    32. write(fds[1], message2, sizeof(message2));
    33. sleep(3);
    34. memset(buffer, 0, BUFF_SIZE);
    35. }
    36. return 0;
    37. }

    运行结果:

     如果将子进程中的sleep方法注释掉,再运行代码:

    1. if (pid == 0)
    2. {
    3. // 子进程 写入管道
    4. write(fds[1], message1, sizeof(message1));
    5. //sleep(2);
    6. read(fds[0], buffer, BUFF_SIZE);
    7. printf("Child proc received data: %s\n", buffer);
    8. memset(buffer, 0, BUFF_SIZE);
    9. }

    结果如下所示:                            

     此时父进程无法再收到数据,因为子进程在写入数据后,未经过延时,立即又读取数据,因此管道中写入的数据被子进程提前取走。按照管道数据的读取机制,数据在写入管道之后,称为无主数据,哪个进程先通过read函数读取数据,哪个进程就先得到数据。结果导致父进程通过read函数再也读取不到数据,而将无限期等待管道数据。

    注:利用一个管道进行进程间数据双向通信,程序设计者需要提前预测并控制运行流程,程序实现难度很大。

    方案2:通过两个管道实现进程间相互通信

    通过创建两个管道,各自在进程间负责不同的数据流,即可方便的实现进程间双向通信,采用两个管道即可避免程序运行流程的预测或控制。

     

     代码实例:

    1. #include
    2. #include
    3. #include
    4. #define BUFF_SIZE 30
    5. int main(int argc, char** argv)
    6. {
    7. int fds1[2]; // 子进程向父进程写数据
    8. int fds2[2]; // 父进程向子进程写数据
    9. char message1[] = "Hello Process\n";
    10. char message2[] = "I am vac!\n";
    11. char buffer[BUFF_SIZE];
    12. memset(buffer, 0, BUFF_SIZE);
    13. if (pipe(fds1) == -1)
    14. {
    15. printf("Falied to create pipe.\n");
    16. return -1;
    17. }
    18. if (pipe(fds2) == -1)
    19. {
    20. printf("Falied to create pipe.\n");
    21. return -1;
    22. }
    23. pid_t pid = fork(); // 创建进行
    24. if (pid == 0)
    25. {
    26. // 子进程 写入管道
    27. write(fds1[1], message1, sizeof(message1));
    28. read(fds2[0], buffer, BUFF_SIZE);
    29. printf("Child proc received data: %s\n", buffer);
    30. memset(buffer, 0, BUFF_SIZE);
    31. }
    32. else
    33. {
    34. // 父进程 读取管道
    35. sleep(1); // 可以延时更短,稍微等待一下
    36. read(fds1[0], buffer, BUFF_SIZE);
    37. printf("Parent proc received data: %s\n", buffer);
    38. write(fds2[1], message2, sizeof(message2));
    39. memset(buffer, 0, BUFF_SIZE);
    40. }
    41. return 0;
    42. }

    运行结果:

     2. 进程间通信的应用

          保存消息的回声服务器: 在向客户端提供服务的同时,服务端能够将接收到的数据同步保存到文件中。

    客户端代码可复用《TCP/IP网络编程(8) 基于Linux的多进程服务器》中的客户端代码

    1. /*
    2. 客户端
    3. create_date: 2022-7-29
    4. */
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #define BUFF_SIZE 30
    12. #define ADDRESS "127.0.0.1"
    13. #define PORT 13100
    14. void readRoutine(int sock, char* buf);
    15. void writeRoutine(int sock, char* buf);
    16. int main(int argc, char** argv)
    17. {
    18. char buffer[BUFF_SIZE];
    19. struct sockaddr_in serverAddr; // 服务端地址
    20. memset(&serverAddr, 0, sizeof(serverAddr));
    21. serverAddr.sin_family = AF_INET;
    22. serverAddr.sin_addr.s_addr = inet_addr(ADDRESS);
    23. serverAddr.sin_port = htons(PORT);
    24. memset(buffer, 0, BUFF_SIZE);
    25. int socket = ::socket(PF_INET, SOCK_STREAM, 0);
    26. if (socket == -1)
    27. {
    28. printf("Failed to init socket.\n");
    29. return -1;
    30. }
    31. if (connect(socket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) == -1)
    32. {
    33. printf("Failed to connect to server.\n");
    34. return -2;
    35. }
    36. printf("Successfully connect to the server.\n");
    37. pid_t pid = fork();
    38. if (pid == 0)
    39. {
    40. // 子进程负责发送
    41. writeRoutine(socket, buffer);
    42. }
    43. else
    44. {
    45. // 父进程负责接收
    46. readRoutine(socket, buffer);
    47. }
    48. close(socket);
    49. return 0;
    50. }
    51. void readRoutine(int sock, char* buf)
    52. {
    53. while (true)
    54. {
    55. memset(buf, 0, BUFF_SIZE);
    56. int str_len = read(sock, buf, BUFF_SIZE);
    57. if (str_len == 0) // EOF
    58. {
    59. printf("Receive EOF\n");
    60. return;
    61. }
    62. printf("\nReceive data from server: %s\n", buf);
    63. }
    64. }
    65. void writeRoutine(int sock, char* buf)
    66. {
    67. while (true)
    68. {
    69. fputs("Input message(Type Q(q) to quit): ", stdout);
    70. fgets(buf, BUFF_SIZE, stdin);
    71. if (strncmp(buf, "Q\n", 2) == 0 || strncmp(buf, "q\n", 2) == 0)
    72. {
    73. shutdown(sock, SHUT_WR);
    74. return;
    75. }
    76. write(sock, buf, strlen(buf));
    77. }
    78. }

    服务端示例代码代码示例:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. void readChildProcess(int signo)
    10. {
    11. int status;
    12. pid_t pid = waitpid(-1, &status, WNOHANG); // 非阻塞
    13. if (WIFEXITED(status))
    14. {
    15. printf("Removed child process %d\n", WEXITSTATUS(status));
    16. }
    17. }
    18. #define BUFF_SIZE 30
    19. #define PORT 13100
    20. int main(int argc, char** argv)
    21. {
    22. int serverSocket;
    23. int clientSocket;
    24. struct sockaddr_in servAddr;
    25. struct sockaddr_in clientAddr;
    26. char buffer[BUFF_SIZE];
    27. memset(buffer, 0, BUFF_SIZE);
    28. socklen_t addrSize;
    29. int fds[2]; // 管道的文件描述符
    30. struct sigaction sigact;
    31. sigact.sa_handler = readChildProcess;
    32. sigemptyset(&sigact.sa_mask);
    33. sigact.sa_flags = 0;
    34. sigaction(SIGCHLD, &sigact, 0); // 注册信号
    35. serverSocket = socket(PF_INET, SOCK_STREAM, 0);
    36. if (serverSocket == -1)
    37. {
    38. printf("Failed to init server socket.\n");
    39. return -1;
    40. }
    41. memset(&servAddr, 0, sizeof(servAddr));
    42. servAddr.sin_family = AF_INET;
    43. servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    44. servAddr.sin_port = htons(PORT);
    45. if (bind(serverSocket, (struct sockaddr*)&servAddr, sizeof(servAddr)) == -1)
    46. {
    47. printf("Failed to bind the server socket.\n");
    48. return -1;
    49. }
    50. if (listen(serverSocket, 5) == -1)
    51. {
    52. printf("Failed to listen client.\n");
    53. return -1;
    54. }
    55. // 创建管道
    56. pipe(fds);
    57. // 创建写文件进程
    58. pid_t pid = fork();
    59. if (pid == 0)
    60. {
    61. // 写文件子进程
    62. FILE* fp = fopen("data.txt", "a");
    63. char msg[100];
    64. while (true)
    65. {
    66. /* code */
    67. usleep(1000*40);
    68. memset(msg, 0, 100);
    69. int len = read(fds[0], msg, 100);
    70. // 解析读取到的内容
    71. if (len <= 5)
    72. {
    73. break;
    74. }
    75. char magicChar1 = msg[0];
    76. char magicChar2 = msg[1];
    77. if (magicChar1 != 'M' || magicChar2 != 'F')
    78. {
    79. break;
    80. }
    81. if (msg[2] == 1)
    82. {
    83. break;
    84. }
    85. ushort dataLen = msg[3] | (msg[4] << 8);
    86. if (dataLen <= 0)
    87. {
    88. continue;
    89. }
    90. // 将解析到的内容写入文件进行保存
    91. fwrite(&msg[5], dataLen, 1, fp);
    92. }
    93. fclose(fp);
    94. return 0;
    95. }
    96. else
    97. {
    98. // 父进程
    99. while (true)
    100. {
    101. printf("Waiting connect from client...\n");
    102. addrSize = sizeof(clientAddr);
    103. clientSocket = accept(serverSocket, (sockaddr*)&clientAddr, &addrSize);
    104. if (clientSocket == -1)
    105. continue;
    106. printf("Accept new connection from %s:%d.\n", inet_ntoa(clientAddr.sin_addr), ntohs(clientAddr.sin_port));
    107. pid_t processId = fork();
    108. if (processId == 0)
    109. {
    110. // 子进程
    111. close(serverSocket);
    112. // 接受消息
    113. int recvLen = 0;
    114. // 构造报文
    115. char message[100];
    116. usleep(1000*100);
    117. while ((recvLen = read(clientSocket, buffer, BUFF_SIZE)) != 0)
    118. {
    119. /* code */
    120. write(clientSocket, buffer, recvLen);
    121. // 向管道写数据
    122. memset(message, 0, 100);
    123. message[0] = 'M';
    124. message[1] = 'F';
    125. message[2] = 0;
    126. message[3] = recvLen & 0xFF;
    127. message[4] = (recvLen >> 8) & 0xFF;
    128. memcpy(&message[5], buffer, recvLen); // 注意:memcpy里面的指针不要转换成void*
    129. write(fds[1], message, recvLen + 5);
    130. memset(buffer, 0, BUFF_SIZE);
    131. }
    132. close(clientSocket);
    133. memset(message, 0, 100);
    134. // 发送结束标志
    135. message[0] = 'M';
    136. message[1] = 'F';
    137. message[2] = 1;
    138. message[3] = 0;
    139. message[4] = 0;
    140. write(fds[1], (void*)message, 5);
    141. printf("Disconnecting from server...\n");
    142. sleep(2);
    143. return 0;
    144. }
    145. else if (processId == -1)
    146. {
    147. close(clientSocket);
    148. continue;
    149. }
    150. else
    151. {
    152. // 主进程
    153. printf("Create new process %d for client.\n", processId);
    154. close(clientSocket);
    155. memset(&clientAddr, 0, sizeof(clientAddr));
    156. continue;
    157. }
    158. }
    159. }
    160. close(serverSocket);
    161. return 0;
    162. }
    163. /*
    164. 服务器进程和文件进程之间设计通信格式
    165. 0 M // 报文头
    166. 1 F // 报文头
    167. 2 0/1 // 报文类型 0数据 / 1 结束
    168. 3 len // 数据长度 低位
    169. 4 len // 数据长度 高位
    170. 5 x // 数据
    171. 6 x // 数据
    172. 7 x // 数据
    173. . // .
    174. .
    175. .
    176. */

    客户端运行结果:

     服务端运行结果:

     通过管道进行进程间通信,将服务端收到的消息发送给写文件进程,将收到的消息保存至文件:

     -----------------------------------------//end//---------------------------------------------

  • 相关阅读:
    得物Flink内核探索实践
    一文了解io包中的discard类型
    MCE虚拟筛选化合物库
    (附源码)ssm考试题库管理系统 毕业设计 069043
    ES6 入门教程 22 Class 的基本语法 22.9 静态属性 & 22.10 私有方法和私有属性
    Xilinx IP 10 Gigabit Ethernet Subsystem IP
    物联网的挑战
    nginx的优先级和匹配方式
    Linux Vim编辑器的基本使用
    deployment html--->JDBC--->mysql
  • 原文地址:https://blog.csdn.net/zj1131190425/article/details/126082415