• 【1++的Linux】之进程间通信


    👍作者主页:进击的1++
    🤩 专栏链接:【1++的Linux】

    一,进程间通信的目的

    1. 数据传输:一个进程需要将它的数据发送给另一个进程
    2. 资源共享:多个进程之间共享同样的资源。
    3. 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
    4. 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

    进程间通信的必要性:
    若没有进程间通信,那么也就无法使用并发能力,无法实现进程间协同。传输数据,消息通知等。

    进程是具有独立性的,虚拟地址空间和页表保证了其独立性,因此,进程间通信的成本是比较高的。
    想要让两进程间能够通信,那么其必定要能够看到同一份 “内存” 。这份所谓的“内存”不能属于任何一个进程,它应该是共享的。

    进程间通信的发展:

    1. 管道
    2. System V进程间通信
    3. POSIX进程间通信

    管道:
    匿名管道pipe
    命名管道

    System V IPC:
    System V 消息队列
    System V 共享内存
    System V 信号量

    POSIX IPC:
    消息队列
    共享内存
    信号量
    互斥量
    条件变量
    读写锁

    二,管道

    管道是Unix中最古老的进程间通信的形式。是Linux原生就能够提供的。其有一个入口,一个出口,是单向通信的,也可以说是一种特出的半双工通信。

    管道的原理:
    我们在上面提到,两进程之间能够进行通信,那么两进程之间就得都能看到同一份资源?那么怎么让两进程看到同一份资源呢?
    在fork之后,创建出来的子进程会继承父进程的大多数内容,这其中就包括文件描述符表,那么文件对象会被拷贝给子进程吗?显然是不会的,这样做是没有意义的。
    我们在创建子进程之前分别以读写方式打开同一个文件,子进程继承之后,其也能够这个文件的文件描述符,有了文件描述符,我们是不是就能够访问这个文件了!!!我们让父进程进行写,那么就关闭其读的那个文件描述符,让子进程读,那么就关闭其写的那个文件描述符。这样,父子进程间就能够就行通信了。这样通信方式我们叫做匿名管道。
    管道的本质是一种文件。

    下面我们来简单的实现一个匿名管道:

    使用pipe系统调用来创建匿名管道。

    #include
    #include
    #include
    #include
    #include
    #include
    #include
    using namespace std;
    
    int main()
    {
        //创建匿名管道
        int pipefd[2];//0读--1写
        int n=pipe(pipefd);
        assert(n!=-1);
        cout<<"creat pipe success"<<endl;
        (void)n;
        //创建子进程
        pid_t pid=fork();
        assert(pid!=-1);
        if(pid==0)//子进程
        {
            //子进程负责读
            close(pipefd[1]);//关闭写端
            char buffer[1024];
            while(true)
            {
                //sleep(3);
                ssize_t n=read(pipefd[0],buffer,sizeof(buffer)-1);
                assert(n!=-1);
               if(n>0)
               {
                 buffer[n]='\0';
                 cout<<"child get a message["<<getpid()<<"]"<<"father#"<<buffer<<endl;
               }
                if(n==0)
                {
                    cout<<"write quite,me quite"<<endl;
                    break;
                }
            }
    
            close(pipefd[0]);//可有可无
            exit(0);
        }
    
        //父进程写
        close(pipefd[0]);//关闭读
        const char* message="I am sending message";
        int count=0;
       while(true)
       {
         ssize_t n=write(pipefd[1],message,strlen(message));
         sleep(1);
         count++;
         if(count==5) break;
       }
    
       //读写完成,退出
      close(pipefd[1]);
      pid_t ret= waitpid(pid,nullptr,0);
      assert(ret>0);
      return 0;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    运行结果:

    写慢读快时
    在这里插入图片描述
    我们发现写慢读快时,读端不会继续写,而是停下来等待写入。

    当我们让写快,读慢时(即读时休眠时间长一些)
    在这里插入图片描述
    一次会将管道中的所有数据都读出来。管道的大小是有限制的,当管道被写满时,便不会再写,而是等待读。

    当把写端关掉,读端进程会直接退出。

    当把读端关掉,OS会关掉写进程。

    因此管道可以让进程间协同,提供了访问控制。
    管道提供的是面向流式的通信服务,其生命周期随进程。

    从管道读数据是一次性操作,数据一旦被读,它就从管道中被抛弃,释放空间以便写更多的数据。

    在这里插入图片描述
    站在内核的角度,管道的本质就是两个进程对同一个文件对象,一个进行写入,一个进行读取。
    看待管道和看待文件一样,使用也是一样的,这也符合:Linux下一切皆文件的思想。

    一个父进程可以和一个子进程通信,那么一个父进程能否和多个子进程分别通信?—可以的!
    代码如下:

    #pragma once
    #include
    #include
    #include
    #include
    #include
    #include
    #include  
    #include
    typedef std::function<void()> func;
    std::vector<func> callbacks;
    std::unordered_map<int,std::string> desc;
    void readSQL()
    {
        std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
    }
    
    void execule()
    {
        std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
    }
    
    void cal()
    {
        std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
    }
    
    void save()
    {
        std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
    }
    
    void lod()
    { desc.insert({callbacks.size(), "readSQL: 读取数据库"});
        callbacks.push_back(readSQL);
    
        desc.insert({callbacks.size(), "execule: 进行url解析"});
        callbacks.push_back(execule);
    
        desc.insert({callbacks.size(), "cal: 进行加密计算"});
        callbacks.push_back(cal);
    
        desc.insert({callbacks.size(), "save: 进行数据的文件保存"});
        callbacks.push_back(save);
    
    }
    
    void showHandler()
    {
        for(auto& e:desc)
        {
            std::cout<<e.first<<'\t'<<e.second<<std::endl;
        }
    }
    
    int Handersize()
    {
        return callbacks.size();
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    #include
    #include
    #include
    #include
    #include"task.hpp"
    #define PROCESS_NUM 5
    
    using namespace std;
    
    int waitcommand(int waitfd,bool& quite)
    {
        int command=0;
        ssize_t n=read(waitfd,&command,sizeof(command));
        if(n==0)
        {
            quite=true;
            return -1;
        }
    
        return command;
    
    }
    
    void SendCommand(int who,int fd,int command)
    {
        ssize_t s=write(fd,&command,sizeof(command));
        std::cout<<"main process call"<<who<<"excule"<<desc[command]<<std::endl;
    }
    
    int main()
    {
        lod();
        //pid  :  fd
        std::vector<std::pair<int,int>> slots;
        //多个子进程
        for(int i=0;i<PROCESS_NUM;i++)
        {
            //创建管道
            int pipefd[2];
            int n=pipe(pipefd);
            assert(n!=-1);
            //创建子进程
            pid_t pid=fork();
            assert(pid!=-1);
            if(pid==0)//子进程
            {
                //子进程读
                close(pipefd[1]);
                while(true)
                {
                    //等命令
                    bool quite=false;
                    int command=waitcommand(pipefd[0],quite);//不写就等待
                    if(command==-1)
                    {
                        //std::cout<<"退出"<
                        break;
                    }
                    else if(command>=0&&command<=Handersize())
                    {
                        callbacks[command]();
                    }
                    else
                    {
                        std::cout<<"非法输入"<<std::endl;
                    }
                }
                //退出
                close(pipefd[0]);
                std::cout<<"write quite,me quite"<<std::endl;
                exit(1);
                
            }
    
            //父进程写
            close(pipefd[0]);
            slots.push_back(std::make_pair(pid,pipefd[1]));
        }
    
        //随机派发命令
        srand((unsigned)time(nullptr));
        while(true)
        {
            int command=rand()%Handersize();//选择命令
            int choice=rand()%slots.size();//选择子进程
            //指派任务
            SendCommand(slots[choice].first,slots[choice].second,command);
            sleep(2);
        }
    
        //关闭所有写
        for(auto& e:slots)
        {
            close(e.second);
        }
    
        //回收所有子进程
         for(auto& e:slots)
        {
            waitpid(e.first,nullptr,0);
        }
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    在这里插入图片描述

    命名管道:
    命名管道与匿名管道的原理相同,都是通过让两个进程看到同一份资源,从而实现通信,但命名管道不再局限于父子进程之间,而是任意两个进程之间实现通信。
    两进程看到相同的资源,是通过管道文件的路径从而实现的。
    命名管道的本质也是一种文件,但不是普通的文件,普通的文件我们在读写时,会将内存数据刷新到磁盘中,但是我们的管道是不会的。因此其效率也是很高的。

    管道文件的创建:

    1. mkfifo filename
    2. int mkfifo(const char *filename,mode_t mode);

    下面是我们实现的命名管道的代码:

    // 服务端接收消息

    #include"comm.hpp"
    #include"Log.hpp"
    
    static void getmessage(int fd)
    {
          char buffer[1024];
       while(true)
       {
            int n=read(fd,buffer,sizeof(buffer-1));
            assert(n!=-1);
            if(n>0)
            {
                buffer[n]='\0';
                std::cout<<"["<<getpid()<<"]"<<"client say: "<<buffer<<std::endl;
            }
            else if(n==0)
            {
                std::cout<<"["<<getpid()<<"]"<<"client quit,me quit"<<std::endl;
                break;
            }
       }
    }
    
    int main()
    {
        //创建管道
        int n=mkfifo(ipc_path.c_str(),0666);
        if(n<0)
        {
            perror("mkfifo");
            exit(1);
        }
        log("管道创建成功",DEBUG)<<"step1"<<std::endl;
    
        //打开管道进行读
        int fd=open(ipc_path.c_str(),O_RDONLY);
        if(fd<0)
        {
            perror("open");
            exit(2);
        }
        log("打开管道成功",DEBUG)<<"step2"<<std::endl;
        for(int i=0;i<Process_Num;i++)
        {
            int pid=fork();
            assert(pid>=0);
            if(pid==0)
            {
                getmessage(fd);
                exit(1);
            }
    
        }
        for(int i=0;i<Process_Num;i++)
        {
            waitpid(-1,nullptr,0);
            std::cout<<"等待成功"<<std::endl;
        }
        close(fd);
        log("关闭管道成功",DEBUG)<<"step3"<<std::endl;
        unlink(ipc_path.c_str());
        log("删除管道成功",DEBUG)<<"step4"<<std::endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    //客户端发送消息

    
    #include
    #include"comm.hpp"
    #include"Log.hpp"
    #include
    int main()
    {
        int fd=open(ipc_path.c_str(),O_WRONLY);
        if(fd<0)
        {
            perror("open");
            exit(3);
        }
        log("client 打开管道成功",DEBUG)<<"step5"<<std::endl;
        std::string buffer;
        while(true)
        {
            std::cout<<"client say:"<<std::endl;
            std::getline(std::cin,buffer);
            int n=write(fd,buffer.c_str(),buffer.size());
    
        }
        close(fd);
        return 0;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    #pragma once
    #include
    #include"comm.hpp"
    #ifndef _LOG_H_
    #define _LOG_H_
    
    #define DEBUG 0
    #define NOTICE 1
    #define WARNING 2
    #define ERROR 3
    
    std::string mes[4]={
        "DEBUG","NOTICE","WARNING","ERROR"
    };
    
    std::ostream &log(std::string message,int level)
    {
        std::cout<<"|"<<unsigned(time(nullptr))<<"|"<<mes[level]<<"|"<<message;
        return std::cout;
    }
    
    
    #endif
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    #pragma once
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #define Process_Num 4
    
    std::string ipc_path="./fifo.ipc";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    一个普通的全局的静态函数与普通函数的区别是:用static修饰的函数,限定在本源码文件中,不能被本源码文件以外的代码文件调用。

  • 相关阅读:
    java并发之AQS详解(待更)
    ABAP VOFM定价过程的例程创建
    数仓建模—逻辑数据模型
    SpringBoot整合框架——集成SpringSecurity、shiro
    图的二种遍历-广度优先遍历和深度优先遍历
    Java8 Optional使用
    ES6新增的 Symbol
    SQL中的字符串截取函数
    python读写hive
    Redis配置和优化
  • 原文地址:https://blog.csdn.net/m0_63135219/article/details/133997957