• [Linux]----进程间通信之管道通信



    前言

    首先我基于通信背景来带大家了解进程间通讯!!!

    1. 进程是具有独立性的!—>进程间想交互数据数据,成本会非常高!—>如果我们多进程需要协同处理一件事情
    2. 我们要明确一个点,进程独立了,不是彻底独立,有时候,我们需要双方能够进行一定程度的信息交互。

    正文开始!

    一、进程间通信目的

    • 数据传输:一个进程需要将他的数据发送给另一个进程
    • 资源共享:多个进程之间共享同样的资源
    • 通知事件:一个进程需要向另一个或一组进程发送消息,通知他们发生可某种事件(如进程终止是要通知父进程)
    • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道他的状态改变。

    二、进程间通信发展

    • 管道
    • System V进程间通信
    • POSIX进程间通信

    三、进程间通信分类

    管道

    • 匿名管道pipe
    • 命名管道

    System V IPC

    • System V 消息队列
    • System V 共享内存
    • System V 信号量

    POSIX IPC

    • 消息队列
    • 共享内存
    • 信号量
    • 互斥量
    • 条件变量
    • 读写锁

    POSIX的通讯我们在进程的学习过程中带大家了解!
    本章主要给大家分享管道和共享内存来进行进程间通讯!

    四、管道

    1. 匿名管道

    在这里插入图片描述
    进程A和进程B可以同时看到一份文件来通过某种方式(接下来会将,现在先大致的理解一下)来进行通信!

    所以就要求我们在通讯之前,让不同的进程看到同一份资源(文件,内存块…)!

    我们要学的进程间通信,不是告诉我们如何通信。而是如何让两个进程先看到同一份资源!

    资源的不同决定了不同种类的通信方式!

    管道是提供共享资源的一种手段。

    在这里插入图片描述

    2. 管道内核代码

    在这里插入图片描述

    管道的特点

    1. 大部分都是单向的!
    2. 所有的管道都是传输资源的—数据

    所以进程间通信中的管道一定是单向的,为了传输数据的!!

    3. 站在文件描述符角度-深度理解管道

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    那么为什么父进程要分别打开读和写呢?->为了让子进程继承,让子进程不用在打开了

    为什么父子要关闭对应的读写?->因为管道是单向通信的!!

    谁决定,父子关闭什么读写?–>不是由管道本身决定的,由用户的需求决定的!

    4. 站在内核角度-管道本质

    为了实现管道的通信,内核提供了接口方便我们使用!
    在这里插入图片描述
    在这里插入图片描述

    创建管道

    #include
    #include
    #include
    #include
    
    //演示pipe通信的基本过程-----匿名管道
    using namespace std;
    int main()
    {
        //1.创建管道
        int pipefd[2]={0};
        if(pipe(pipefd)!=0)
        {
            cerr<<"pipe error"<<endl;
            return 1;
        }
        cout<<"fd[0]="<<pipefd[0]<<endl;
        cout<<"fd[1]="<<pipefd[1]<<endl;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述
    因为文件描述符中0,1,2是被默认打开的,所以得到我们预期的结果3,4。

    在pipefd[2]这个数组中,pipefd[0]是管道中的读端!pipefd[1]是管道中的写端!!

    进行管道通信

    #include
    #include
    #include
    #include
    #include
    #include
    #include
    
    
    //演示pipe通信的基本过程-----匿名管道
    using namespace std;
    #define NUM 1024
    int main()
    {
        //1.创建管道
        int pipefd[2]={0};
        if(pipe(pipefd)!=0)
        {
            cerr<<"pipe error"<<endl;
            return 1;
        }
        //2.创建子进程
        pid_t id=fork();
        if(id<0)
        {
            cerr<<"fork error"<<endl;
            return 2;
        }
        else if(id==0)
        {
            //子进程
            //让子进程进行读取,子进程就应该关掉写端
            close(pipefd[1]);
            char buffer[NUM];
            while(true)
            {
                memset(buffer,0,sizeof(buffer));
                ssize_t s=read(pipefd[0],buffer,sizeof(buffer)-1);
                if(s>0)
                {
                    //读取成功
                    buffer[s]='\0';
                    cout<<"子进程收到消息,内容是: "<<buffer<<endl;
                }
                else if(s==0)
                {
                    cout<<"父进程写完了,我也退出啦"<<endl;
                    break;
                }
                else
                {
                    //Do Nothing
                }
            }
            close(pipefd[0]);
            exit(0);
        }
        else
        {
            //父进程
            //让父进程进行写入,就应该关掉读端!
            close(pipefd[0]);
            const char* msg="你好子进程,我是父进程,这次发送的信息标号是:";
            int cnt=0;
            while(cnt<5)
            {
                write(pipefd[1],sendBuffer,strlen(sendBuffer));
                sleep(1);//这里是为了一会看现象明显
                cnt++;
            }
            cout<<"父进程写完啦!"<<endl;
            close(pipefd[1]);
        }
        pid_t ret=waitpid(id,nullptr,0);
        if(ret>0)
        {
            cout<<"等待子进程成功"<<endl;
        }
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    在这里插入图片描述

    因为我们让父进程发送五条信息,子进程收到五条信息后也就推出了!此时就完成了我们通过管道进行进程间通信了!!

    对父进程代码稍作修改

    在这里插入图片描述
    在这里插入图片描述

    在父进程中我们每次写入后都sleep(1);然后子进程读入的时候也是休眠1秒后在读取,所以我们可以发现:1.当父进程没有写入数据的时候,子进程会在等!所以,父进程写入之后,子进程才能read(会返回)到数据,子进程打印读取数据要以父进程的节奏为主!

    所以父进程和子进程在读写的时候,是有一定顺序的!!

    但是我们以前父子进程各自printf(向显示器写入)的时候,打印出来的语句是没有顺序的!(缺乏访问控制!)

    所以我们可以得出结论:管道内部没有数据,reader就必须阻塞等待(read)。管道内部,如果数据被写满,writer就不必须阻塞等待(write);

    阻塞等待的本质就是将当前进程的task_struct放入等待队列中!(R->S/D/T)

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    所以,pipe内部自带访问控制机制!!

    父进程指派给子进程任务

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    using namespace std;
    
     typedef void (*functor)();
     vector<functor> functors; //方法集合
    
    // for debug
    unordered_map<uint32_t, string> info;
    
    void f1()
    {
        cout << "这是一个处理日志的任务,执行的进程 ID [" << getpid()
             << "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
    }
    void f2()
    {
        cout << "这是一个备份数据的任务,执行的进程 ID [" << getpid()
             << "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
    }
    void f3()
    {
        cout << "这是一个处理网络链接的任务,执行的进程 ID [" << getpid()
             << "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
    }
    
    void loadFunctor()
    {
        info.insert({functors.size(), "这是一个处理日志的任务"});
        functors.push_back(f1);
        info.insert({functors.size(), "这是一个备份数据的任务"});
        functors.push_back(f2);
        info.insert({functors.size(), "这是一个处理网络链接的任务"});
        functors.push_back(f3);
    }
    int main()
    {
        // 0.加载任务列表
        loadFunctor();
    
        // 1.创建管道
        int pipefd[2] = {0};
        if (pipe(pipefd) != 0)
        {
            cerr << "pipe error" << endl;
            return 1;
        }
    
        // 2.创建子进程
        pid_t id = fork();
        if (id < 0)
        {
            cerr << "fork error" << endl;
            return 2;
        }
        else if (id == 0)
        {
            //子进程进行读操作---read
    
            // 3.关闭不需要的文件fd
            close(pipefd[1]);
    
            // 4.业务处理
            while (true)
            {
                uint32_t opeartorType = 0;
                //如果有数据,就读取,如果没有数据,就阻塞等待,等待任务的到来
                ssize_t s = read(pipefd[0], &opeartorType, sizeof(uint32_t));
                if (s == 0)
                {
                    cout << "我要退出啦,我是子进程,父进程都退出了!" << endl;
                    break;
                }
    
                assert(s == sizeof(uint32_t));
                // assert断言,是变异有效的debug模式
                // release模式下,断言也就没有了
                //一旦断言没有了,s变量就只是被定义,没有被使用。release摸下中,可能会warning
                (void)s;
    
                if (opeartorType < functors.size())
                {
                    functors[opeartorType]();
                }
                else
                {
                    cerr << "bug opeartorType" << endl;
                }
            }
            close(pipefd[0]);
            exit(0);
        }
        else
        {
            srand((long long)time(nullptr));
            //父进程进行写操作----write
    
            // 3.关闭不需要的文件fd
            close(pipefd[0]);
            // 4.指派任务
            uint32_t num = functors.size();
            int cnt = 0;
            while (cnt < 10)
            {
                // 5.形成任务码
                int commandCode = rand() % num;
                std::cout << "父进程指派任务完成,任务是: " << info[commandCode] << "任务的编号是: " << cnt << std::endl;
                // 向指定的进程下达执行任务的操作
                write(pipefd[1], &commandCode, sizeof(uint32_t));
                sleep(1);
                cnt++;
            }
            close(pipefd[1]);
            pid_t ret = waitpid(id, nullptr, 0);
            if (ret)
            {
                cout << "wait success" << endl;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130

    在这里插入图片描述

    父进程通过给子进程发送不同的信号,让子进程完成不同的任务!!!

    接下来如果通过父进程控制一批子进程呢??

    在这里插入图片描述

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    using namespace std;
    
    typedef void (*functor)();
    vector<functor> functors; //方法集合
    
    // for debug
    unordered_map<uint32_t, string> info;
    
    void f1()
    {
        cout << "这是一个处理日志的任务,执行的进程 ID [" << getpid()
             << "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
    }
    void f2()
    {
        cout << "这是一个备份数据的任务,执行的进程 ID [" << getpid()
             << "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
    }
    void f3()
    {
        cout << "这是一个处理网络链接的任务,执行的进程 ID [" << getpid()
             << "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
    }
    
    void loadFunctor()
    {
        info.insert({functors.size(), "这是一个处理日志的任务"});
        functors.push_back(f1);
        info.insert({functors.size(), "这是一个备份数据的任务"});
        functors.push_back(f2);
        info.insert({functors.size(), "这是一个处理网络链接的任务"});
        functors.push_back(f3);
    }
    
    // int32_t:进程pid,int32_t:该进程对应的写端fd
    typedef std::pair<int32_t, int32_t> elem;
    int processNum = 5;
    
    void work(int blockFd)
    {
        cout<<"进程 ["<<getpid()<<"] 开始工作!"<<endl;
        //子进程核心工作的代码
        while(true)
        {   
            //a.阻塞等待   b.获取任务信息
            uint32_t operatorCode=0;
             
            ssize_t s=read(blockFd,&operatorCode,sizeof(uint32_t));
    
            if(s==0)
                break;
            assert(s==sizeof(uint32_t));
            (void)s;
    
            //c.处理任务
            if(operatorCode<functors.size())
            {
                functors[operatorCode]();
            }
        }
        cout<<"进程 ["<<getpid()<<"] 结束工作!"<<endl;
    }   
    //[子进程的pid,子进程的管道fd]  
    void blanceSendTask(const vector<elem>& assignMap)
    {
        srand((long long)time(nullptr));
        while(true)
        {
            sleep(1);
            //选择一个进程,选择进程是随机的,没有压着一个进程一直给任务
            //较为均匀的将任务给所有子进程 ---负载均衡
            uint32_t pick=rand()%assignMap.size();
    
            //选择一个任务
            uint32_t task=rand()%functors.size();
    
            //把任务给一个指定的进程
            write(assignMap[pick].second,&task,sizeof(uint32_t));
    
            //打印对应的提示信息
            cout<<"父进程指派任务->"<<info[task]<<" 给进程: "<<assignMap[pick].first<<" 编号是:"<<pick<<endl;
        }
    }
    int main()
    {
        loadFunctor();
        vector<elem> assignMap;
        //创建processNum个子进程
        for (int i = 0; i < processNum; i++)
        {
            //定义保存管道的fd的对象
            int pipefd[2] = {0};
            //创建管道
            pipe(pipefd);
            //创建子进程
            pid_t id = fork();
            if (id == 0)
            {
                //子进程读取,r
                close(pipefd[1]);
                //子进程执行
                work(pipefd[0]);
                close(pipefd[0]);
                exit(0);
            }
            //父进程做的事情
            close(pipefd[0]);
            elem e(id,pipefd[1]);
            assignMap.push_back(e);
        }
        cout<<"create all process success!"<<endl;
        //父进程派发任务
        blanceSendTask(assignMap);
        //回收资源
        for(int i=0;i<processNum;i++)
        {
            if(waitpid(assignMap[i].first,nullptr,0)>0)
            {
                cout<<"wait for:pid=["<<assignMap[i].first<<"] success!"<<" number: "<<i<<endl;
            }
            close(assignMap[i].second);
        }
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137

    在这里插入图片描述

    接下来回归命令行的’|'字符

    在这里插入图片描述

    命令行’|',其实就是匿名管道!!

    在这里我们使用匿名管道也可以让兄弟进程进行通信!

    5. 管道的特征总结

    1. 管道只能用来进行具有血缘关系的进程之间,进行进程间通信。常用于父子通信
    2. 管道只能单向通信(内核的实现决定)—半双工的一种特殊情况
    3. 管道自带同步机制(pipe满,writer等,pipe空,reader等)—自带访问控制
    4. 管道是面向字节流的。----先写的字符,一定是先被读取的,管道内的数据没有格式边界,需要用户来定义区分内容的边界。
    5. 管道的生命周期—管道是文件—进程退出了,增加打开的文件会怎样?–退出,随进程

    五、命名管道

    那是不是只能父子(血缘)通信??—毫不相关的进程之间进行通信,可以吗???------命名管道!
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    1. 创建命名管道

    在这里插入图片描述

    命名管道:通过一个FIFO文件–>有路径–>具有唯一性—>通过路径,找到同一个资源!

    在这里插入图片描述
    代码如下
    “comm.h”

    #pragma once
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    
    #define IPC_PATH "./.fifo"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    “clientFifo.cpp”

    //写入
    #include"comm.h"
    using namespace std;
    #define NUM 1024
    int main()
    {
        int pipeFd=open(IPC_PATH,O_WRONLY);
        if(pipeFd<0)
        {
            cerr<<"open: "<<strerror(errno)<<endl;
            return 1;
        }
        char line[NUM];
        while(true)
        {
            printf("请输入你的消息# ");
            fflush(stdout);
            memset(line,0,sizeof(line));
            if(fgets(line,sizeof(line),stdin)!=nullptr)
            {
                line[strlen(line)-1]='\0';
                write(pipeFd,line,strlen(line));
            }
            else
            {
                break;
            }
        }
        close(pipeFd);
        cout<<"客户端退出啦!"<<endl;
        unlink(IPC_PATH);
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    “serverFifo.cpp”

    //读取
    #include"comm.h"
    using namespace std;
    #define NUM 1024
    
    int main()
    {
        extern int error;
        if(mkfifo(IPC_PATH,0666)!=0)
        {
            cerr<<"mkfifo error"<<endl;
            return 1;
        }
        int pipeFd=open(IPC_PATH,O_RDONLY);
        if(pipeFd<0)
        {
            cerr<<"open error"<<endl;
            return 2;
        }
    
        //正常的通信过程
        char buffer[NUM];
        while(true)
        {
            ssize_t s=read(pipeFd,buffer,sizeof(buffer)-1);
            if(s>0)
            {
                buffer[s]='\0';
                cout<<"客户端->服务器# "<<buffer<<endl;
            }
            else if(s==0)
            {
                cout<<"客户端退出啦,我也退出吧!"<<endl;
                break;
            }
            else
            {
                //do nothing
                cout<<"read "<<strerror(errno)<<endl;
                break;
            }
        }
        close(pipeFd);
        cout<<"服务端退出啦!"<<endl;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    在这里插入图片描述
    在这里插入图片描述


    总结

    (本章完!)

  • 相关阅读:
    linux 内核协助的探测
    Java的反射慢在哪?
    React 网络请求
    FreeSWITCH 1.10.10 简单图形化界面7-记录一次配置讯时网关的问题
    让国内顶尖程序员社区“牛客网”低头的这份Java面试手册真的强
    ESP8266与STC8H8K单片机联动——天气时钟
    【Go】Go语言中的数组基本语法与应用实战
    .net 项目使用 JSON Schema
    uniapp微信小程序 提示消息 上传文件
    Telnet 测试 UDP 端口?
  • 原文地址:https://blog.csdn.net/m0_61560468/article/details/127935923