• linux篇【9】:进程间通信——<前序>


    目录

    一.通信背景

    3.进程间通信目的

    4.进程间通信发展

    5.进程间通信分类

    二.管道——匿名管道

    1.匿名管道原理 

    2.匿名管道特点

    3.打开进程读写的系统接口:pipe

    4.管道的代码-演示pipe通信的基本过程-匿名管道

    5.实现业务处理的管道代码-父进程控制子进程

    6.进程池应用-多进程业务

     进程池应用代码:

    7.匿名管道特征总结:

    三.管道——命名管道

    1.命名管道原理

    2.mkfifo 创建命名管道

    3.命名管道应用代码:

    makefile

    comm.h

    clientFifo.cpp

    serverFifo.cpp


    一.通信背景

    1.进程是具有独立性的!——导致进程间想交互数据,成本会非常高
    进程为什么要通信:需要多进程进行协同处理一件事情
    2.不要以为,进程独立了,就是彻底独立,有时候,我们需要双方能够进行一定程度的信息交互

    3.进程间通信目的

    数据传输:一个进程需要将它的数据发送给另一个进程
    资源共享:多个进程之间共享同样的资源。
    通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
    进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变

    4.进程间通信发展

    管道
    System V进程间通信
    POSIX进程间通信

    5.进程间通信分类

    典型进程间通信方式: 管道,共享内存,消息队列,信号量,网络通信,文件 等多种方式
    管道
            匿名管道pipe
            命名管道
    System V IPC
            System V 消息队列
            System V 共享内存
            System V 信号量
    POSIX IPC
            消息队列
            共享内存
            信号量
            互斥量
            条件变量
            读写锁

    二.管道——匿名管道

    因为进程有独立性。
    所以通信之前,我们需要让不同的进程看到同一份资源(文件,内存块..)
    我们要学的进程间通信,不是告诉我们如何通信。而是解决:他们两个如何先看到同一份资源!
    资源的不同决定了不同种类的通信方式!

    第一种通信方式—>管道(提供共享资源的一种手段)

    1.匿名管道原理 

    父进程写入文件的缓冲区,子进程去缓冲区里读,就实现了两个进程看到同一份资源,这个用于通信的内存级文件就叫做管道(普通级文件是为了保存数据到磁盘的)。struct file中有inode,inode中有一个联合体可以来表示自己是管道/块设备/文件/字符

    2.匿名管道特点

    都是单向的!
    管道传输资源的——这个资源就是 数据
    进程间通信管道——>单向的,传输数据的!

    进程间通信管道是单向的过程图:

    ①为什么父进程要分别打开读和写? ——答:为了让子进程继承,让子进程不用再打开了!(如果父进程只是打开写,子进程也会继承写,那就没人读了,让子进程读的话还得再打开读)
    ②为什么父子要关闭对应的读写?——答:管道必须是单向通信的。父进程关闭读,子进程关闭写,就构成单向的父写子读了。
    ③谁决定,父子关闭什么读写? ——答:不是由管道本身决定的, 是由你的需求决定的!

    一个进程一旦退出了,进程中打开的所有描述符都会关闭,释放资源。只要不是循环,理论上打开的文件描述符不close也可以,因为进程退出会自动关闭文件描述符,但是尽量保持良好代码规范,不用了就释放。
    举个例子,拆房,拆房的时候不要的家具不需要额外拉出来拆,因为房子一旦拆了所有的东西会一并当作垃圾回收,但是如果没拆房之前有家具不用了就要赶紧拉出去扔了,要不然家里就摆满了

    3.打开进程读写的系统接口:pipe

    man 2 pipe  :pipe底层封装open,open了两次,第一次O_RDONLY,第二次O_WRONLY,把读写描述符分别放在数组pipefd[0],pipefd[1]。 返回值 ——> 成功返回0,失败返回-1

    4.管道的代码-演示pipe通信的基本过程-匿名管道

    ①当父进程没有写入数据的时候,子进程在等。所以,父进程写入之后,子进程才能read(会返回)到数据,子进程打印读取数据要以父进程的节奏为主!
    ②父进程和子进程读写管道的时候(是有一定的顺序性的!)。那么父子进程各自printf的时候,会有顺序吗?——答:无序。printf就是向显示器写入,也是文件,但是缺乏访问控制。
    ③父进程和子进程读写管道的时候:
    ——管道内部,没有数据,reader就必须阻塞等待(read),等管道有数据(阻塞等待就是当前进程的 task_ struct 放入等待队列中,R->S/D/T)
    ——管道内部,如果数据被写满,writer就必须阻塞等待(write)等待管道中有空间
    因为pipe内部自带访问控制机制——同步和互斥机制

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. //演示pipe通信的基本过程 -- 匿名管道
    8. int main()
    9. {
    10. // 1. 创建管道
    11. int pipefd[2] = {0};
    12. if(pipe(pipefd) != 0)
    13. {
    14. cerr << "pipe error" << endl;
    15. return 1;
    16. }
    17. // 2. 创建子进程
    18. pid_t id = fork();
    19. if(id < 0)
    20. {
    21. cerr << "fork error" << endl;
    22. return 2;
    23. }
    24. else if (id == 0)
    25. {
    26. // child
    27. // 子进程来进行读取, 子进程就应该关掉写端
    28. close(pipefd[1]);
    29. #define NUM 1024
    30. char buffer[NUM];
    31. while(true)
    32. {
    33. cout << "时间戳: " << (uint64_t)time(nullptr) << endl;
    34. // 子进程没有带sleep,为什么子进程你也会休眠呢??
    35. memset(buffer, 0, sizeof(buffer));
    36. sleep(100);
    37. ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);
    38. if(s > 0)
    39. {
    40. //读取成功
    41. buffer[s] = '\0';
    42. cout << "子进程收到消息,内容是: " << buffer << endl;
    43. }
    44. else if(s == 0) //当read返回0时说明写端已关闭,所以要退出
    45. {
    46. cout << "父进程写完了,我也退出啦" << endl;
    47. break;
    48. }
    49. else{
    50. //Do Nothing
    51. }
    52. }
    53. close(pipefd[0]);
    54. exit(0);
    55. }
    56. else
    57. {
    58. // parent
    59. // 父进程来进行写入,就应该关掉读端
    60. close(pipefd[0]);
    61. const char *msg = "你好子进程,我是父进程, 这次发送的信息编号是: ";
    62. int cnt = 0;
    63. // while(cnt < 5)
    64. while(1)
    65. {
    66. char sendBuffer[1024];
    67. sprintf(sendBuffer, "%s : %d", msg, cnt);
    68. //sleep(30); // 这里是为了一会看现象明显
    69. write(pipefd[1], sendBuffer, strlen(sendBuffer)); //要不要+1 1,0
    70. cnt++;
    71. cout << "cnt: " << cnt << endl;
    72. }
    73. close(pipefd[1]);
    74. cout << "父进程写完了" << endl;
    75. }
    76. pid_t res = waitpid(id, nullptr, 0);
    77. if(res > 0)
    78. {
    79. cout << "等待子进程成功" << endl;
    80. }
    81. // 0 -> 嘴巴 -> 读(读书)
    82. // 1 -> 笔 -> 写的
    83. // cout << "fd[0]: " << pipefd[0] << endl;
    84. // cout << "fd[1]: " << pipefd[1] << endl;
    85. return 0;
    86. }

    5.实现业务处理的管道代码-父进程控制子进程

    assert处只是换了一种写法,原写法是:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. using namespace std;
    13. typedef void (*functor)();
    14. vector functors; // 方法集合
    15. // for debug
    16. unordered_map<uint32_t, string> info;
    17. void f1()
    18. {
    19. cout << "这是一个处理日志的任务, 执行的进程 ID [" << getpid() << "]"
    20. << "执行时间是[" << time(nullptr) << "]\n" << endl;
    21. //
    22. }
    23. void f2()
    24. {
    25. cout << "这是一个备份数据任务, 执行的进程 ID [" << getpid() << "]"
    26. << "执行时间是[" << time(nullptr) << "]\n" << endl;
    27. }
    28. void f3()
    29. {
    30. cout << "这是一个处理网络连接的任务, 执行的进程 ID [" << getpid() << "]"
    31. << "执行时间是[" << time(nullptr) << "]\n" << endl;
    32. }
    33. void loadFunctor()
    34. {
    35. info.insert({functors.size(), "处理日志的任务"});
    36. functors.push_back(f1);
    37. info.insert({functors.size(), "备份数据任务"});
    38. functors.push_back(f2);
    39. info.insert({functors.size(), "处理网络连接的任务"});
    40. functors.push_back(f3);
    41. }
    42. 2. 父进程控制子进程
    43. int main()
    44. {
    45. // 0. 加载任务列表
    46. loadFunctor();
    47. // 1. 创建管道
    48. int pipefd[2] = {0};
    49. if (pipe(pipefd) != 0)
    50. {
    51. cerr << "pipe error" << endl;
    52. return 1;
    53. }
    54. // 2. 创建子进程
    55. pid_t id = fork();
    56. if (id < 0)
    57. {
    58. cerr << " fork error " << endl;
    59. return 2;
    60. }
    61. else if (id == 0)
    62. {
    63. // 3. 关闭不需要的文件fd
    64. // child,read
    65. close(pipefd[1]);
    66. // 4. 业务处理
    67. while (true)
    68. {
    69. uint32_t operatorType = 0;
    70. // 如果有数据,就读取,如果没有数据,就阻塞等待, 等待任务的到来
    71. ssize_t s = read(pipefd[0], &operatorType, sizeof(uint32_t));
    72. if (s == 0)
    73. {
    74. cout << "我要退出啦,我是给人打工的,老板都走了...." << endl;
    75. break;
    76. }
    77. //这里if(s == sizeof(uint32_t));换一种断言的写法
    78. assert(s == sizeof(uint32_t));
    79. // debug 模式下:assert断言,是有效的
    80. // release 模式:断言就没有了
    81. // 一旦断言没有了,s变量就是:只被定义但没有被使用。
    82. //release模式下只定义不使用,可能会有warning
    83. (void)s; //保证s变量被使用过
    84. if (operatorType < functors.size())
    85. {
    86. functors[operatorType]();
    87. }
    88. else
    89. {
    90. cerr << "bug? operatorType = " << operatorType << std::endl;
    91. }
    92. }
    93. close(pipefd[0]);
    94. exit(0);
    95. }
    96. else
    97. {
    98. srand((long long)time(nullptr));
    99. // parent,write - 操作
    100. // 3. 关闭不需要的文件fd
    101. close(pipefd[0]);
    102. // 4. 指派任务
    103. int num = functors.size();
    104. int cnt = 10;
    105. while (cnt--)
    106. {
    107. // 5. 形成任务码
    108. uint32_t commandCode = rand() % num;
    109. std::cout << "父进程指派任务完成,任务是: " << info[commandCode] << " 任务的编号是: " << cnt << std::endl;
    110. // 向指定的进程下达执行任务的操作
    111. write(pipefd[1], &commandCode, sizeof(uint32_t));
    112. sleep(1);
    113. }
    114. close(pipefd[1]);
    115. pid_t res = waitpid(id, nullptr, 0);
    116. if (res)
    117. cout << "wait success" << endl;
    118. }
    119. return 0;
    120. }

    6.进程池应用-多进程业务

     进程池应用代码:

     注意:关闭写段在这里统一在最后回收资源时关闭。
       

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. using namespace std;
    13. typedef void (*functor)();
    14. vector functors; // 方法集合
    15. // for debug
    16. unordered_map<uint32_t, string> info;
    17. void f1()
    18. {
    19. cout << "这是一个处理日志的任务, 执行的进程 ID [" << getpid() << "]"
    20. << "执行时间是[" << time(nullptr) << "]\n" << endl;
    21. //
    22. }
    23. void f2()
    24. {
    25. cout << "这是一个备份数据任务, 执行的进程 ID [" << getpid() << "]"
    26. << "执行时间是[" << time(nullptr) << "]\n" << endl;
    27. }
    28. void f3()
    29. {
    30. cout << "这是一个处理网络连接的任务, 执行的进程 ID [" << getpid() << "]"
    31. << "执行时间是[" << time(nullptr) << "]\n" << endl;
    32. }
    33. void loadFunctor()
    34. {
    35. info.insert({functors.size(), "处理日志的任务"});
    36. functors.push_back(f1);
    37. info.insert({functors.size(), "备份数据任务"});
    38. functors.push_back(f2);
    39. info.insert({functors.size(), "处理网络连接的任务"});
    40. functors.push_back(f3);
    41. }
    42. // int32_t: 进程pid, int32_t: 该进程对应的管道写端fd
    43. typedef std::pair<int32_t, int32_t> elem;
    44. int processNum = 5;
    45. void work(int blockFd)
    46. {
    47. cout << "进程[" << getpid() << "]" << " 开始工作" << endl;
    48. // 子进程核心工作的代码
    49. while (true)
    50. {
    51. // a.阻塞等待 b. 获取任务信息
    52. uint32_t operatorCode = 0;
    53. ssize_t s = read(blockFd, &operatorCode, sizeof(uint32_t));
    54. if(s == 0) break;
    55. assert(s == sizeof(uint32_t));
    56. (void)s;
    57. // c. 处理任务
    58. if(operatorCode < functors.size()) functors[operatorCode]();
    59. }
    60. cout << "进程[" << getpid() << "]" << " 结束工作" << endl;
    61. }
    62. // [子进程的pid, 子进程的管道fd]
    63. void blanceSendTask(const vector &processFds)
    64. {
    65. srand((long long)time(nullptr));
    66. while(true)
    67. {
    68. sleep(1);
    69. // 选择一个进程, 选择进程是随机的,没有压着一个进程给任务
    70. // 较为均匀的将任务给所有的子进程 --- 负载均衡
    71. uint32_t pick = rand() % processFds.size();
    72. // 选择一个任务
    73. uint32_t task = rand() % functors.size();
    74. // 把任务给一个指定的进程
    75. write(processFds[pick].second, &task, sizeof(task));
    76. // 打印对应的提示信息
    77. cout << "父进程指派任务->" << info[task] << "给进程: " << processFds[pick].first << " 编号: " << pick << endl;
    78. }
    79. }
    80. int main()
    81. {
    82. loadFunctor();
    83. vector assignMap;
    84. // 创建processNum个进程
    85. for (int i = 0; i < processNum; i++)
    86. {
    87. // 定义保存管道fd的对象
    88. int pipefd[2] = {0};
    89. // 创建管道
    90. pipe(pipefd);
    91. // 创建子进程
    92. pid_t id = fork();
    93. if (id == 0)
    94. {
    95. // 子进程读取, r -> pipefd[0]
    96. close(pipefd[1]);
    97. // 子进程执行
    98. work(pipefd[0]);
    99. close(pipefd[0]);
    100. exit(0);
    101. }
    102. //父进程做的事情, pipefd[1]
    103. close(pipefd[0]);
    104. elem e(id, pipefd[1]);
    105. assignMap.push_back(e);
    106. }
    107. cout << "create all process success!" << std::endl;
    108. // 父进程, 派发任务
    109. blanceSendTask(assignMap);
    110. // 回收资源
    111. for (int i = 0; i < processNum; i++)
    112. {
    113. if (waitpid(assignMap[i].first, nullptr, 0) > 0)
    114. cout << "wait for: pid=" << assignMap[i].first << " wait success!"
    115. << "number: " << i << "\n";
    116. close(assignMap[i].second); 注意!!关闭写段在这里统一关闭
    117. }
    118. }

    7.匿名管道特征总结:

    (命令行管道|,其实就是匿名管道)
    (1)管道只能用来进行具有血缘关系的进程之间,进行进程间通信。常用于父子通信
    (2)管道只能单向通信(内核实现决定的),半双工的一 种特殊情况(半双工:某时某刻只要一个人在说一个人在听)
    (3)管道自带同步机制(pipe满—> writer等 ;  pipe空—>  reader等) --自带访问控制
    (4)管道是面向字节流的----现在还解释不清楚--先写的字符,一定是先被读取的,没有格式边界,需要用户来定义区分内容的边界
    [sizeof (uint32_ t)] ---- 网络tcp,我们自定义协议的
    (5)管道的生命周期--管道是文件--进程退出了,曾经打开的文件会怎么办?退出--管道的生命周期随进程。与该文件相关的所有进程退出,此文件的引用计数--到0,文件也会关闭。

    三.管道——命名管道

    只能父子(血缘)通信? ?毫不相干的进程之间进行通信,可以吗? ?——可以,用命名管道

    • 匿名管道只能用于具有亲缘关系的进程间通信,命名管道可用于同一主机上的任意进程间通信
    • 匿名管道的本质是内核中的缓冲区,命名管道文件是缓冲区的标识

    1.命名管道原理

    进程间通信的本质是:不同的进程要看到同一份资源。
    匿名管道原理:通过利用 fork创建子进程,子进程能继承父进程的文件描述符表的特性 实现的。

    命名管道原理:磁盘中的内存把 文件描述的结构体struct_file 加载到内存中,进程1打开此文件,引用计数加1;进程2打开此文件,遍历内核发现文件存在,引用计数变为2,两个进程可以看到同一份资源。两进程通信时在内存中通信,数据不会刷新到磁盘上,此文件只是一种符号

    通过一个fifo文件->有路径->具有唯一性->通过路径,找到同一个资源!
    磁盘

    2.mkfifo 创建命名管道

    int mkfifo(const char *pathname, mode_t mode);   

    作用:创建一个文件(命名管道),路径pathname就是这个文件的路径(./.fifo)。

    pathname:你要制作的文件的文件路径。mode:你要制作的文件的权限

    返回值:创建成功返回0,创建失败返回-1

    3.命名管道应用代码:

    makefile

    1. .PHONY:all
    2. all: clientFifo serverFifo
    3. clientFifo:clientFifo.cpp
    4. g++ -Wall -o $@ $^ -std=c++11
    5. serverFifo:serverFifo.cpp
    6. g++ -Wall -o $@ $^ -std=c++11
    7. .PHONY:clean
    8. clean:
    9. rm -rf clientFifo serverFifo .fifo

    comm.h

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #define IPC_PATH "./.fifo"

    clientFifo.cpp

    1. //写入
    2. #include "comm.h"
    3. using namespace std;
    4. int main()
    5. {
    6. int pipeFd = open(IPC_PATH, O_WRONLY);
    7. if(pipeFd < 0)
    8. {
    9. cerr << "open: " << strerror(errno) << endl;
    10. return 1;
    11. }
    12. #define NUM 1024
    13. char line[NUM];
    14. while(true)
    15. {
    16. printf("请输入你的消息# ");
    17. fflush(stdout);
    18. memset(line, 0, sizeof(line));
    19. // fgets -> C -> line结尾自动添加\0
    20. if(fgets(line, sizeof(line), stdin) != nullptr)
    21. {
    22. //abcd\n\0
    23. line[strlen(line) - 1] = '\0';
    24. write(pipeFd, line, strlen(line));
    25. }
    26. else
    27. {
    28. break;
    29. }
    30. }
    31. close(pipeFd);
    32. cout << "客户端退出啦" << endl;
    33. return 0;
    34. }

    serverFifo.cpp

    1. //读取
    2. #include "comm.h"
    3. using namespace std;
    4. int main()
    5. {
    6. umask(0);
    7. if(mkfifo(IPC_PATH, 0600) != 0)
    8. {
    9. cerr << "mkfifo error" << endl;
    10. return 1;
    11. }
    12. int pipeFd = open(IPC_PATH, O_RDONLY);
    13. if(pipeFd < 0)
    14. {
    15. cerr << "open fifo error" << endl;
    16. return 2;
    17. }
    18. #define NUM 1024
    19. //正常的通信过程
    20. char buffer[NUM];
    21. while(true)
    22. {
    23. ssize_t s = read(pipeFd, buffer, sizeof(buffer)-1);
    24. if(s > 0)
    25. {
    26. buffer[s] = '\0';
    27. cout << "客户端->服务器# " << buffer << endl;
    28. }
    29. else if(s == 0)
    30. {
    31. cout << "客户退出啦,我也退出把";
    32. break;
    33. }
    34. else
    35. {
    36. //do nothing
    37. cout << "read: " << strerror(errno) << endl;
    38. break;
    39. }
    40. }
    41. close(pipeFd);
    42. cout << "服务端退出啦" << endl;
    43. unlink(IPC_PATH);
    44. return 0;
    45. }

    注意点①:serverFifo.cpp中最后几句容易忘写:
        close(pipeFd);
        cout << "服务端退出啦" << endl;
        unlink(IPC_PATH);

  • 相关阅读:
    Java多线程_多线程的补充
    C++---多态原理
    软考(高级)是否需要报班,大家有什么建议?
    【Arthas案例】某应用依赖两个GAV不同但包含两个相同全限定类名StaticLoggerBinder,引起log4j.Level类找不到异常
    用智能文字识别技术赋能古彝文数字化之路
    如何制作我的第一个Phaser.js游戏
    企业网络规划与优化
    React教程(详细)
    AIGC:使用生成对抗网络GAN实现MINST手写数字图像生成
    gridControlExport.ExportToXls(fileName) dev gridcontrol导出excel列宽问题
  • 原文地址:https://blog.csdn.net/zhang_si_hang/article/details/127664447