• 【Linux】进程间通信 -- 管道


    对于进程间通信的理解

    首先,进程间通信的本质是,让不同的进程看到同一份资源(这份资源不能隶属于任何一个进程,即应该是共享的)。而进程间通信的目的是为了实现多进程之间的协同。
    但由于进程运行具有独立性(虚拟地址空间+页表 保证了进程运行的独立性),所以要实现进程间的通行难度会比较大。
    管道通信作为进程间通信的一种方式,Linux原生就能提供。其通信方式又分为两种:匿名管道 和 命名管道。

    匿名管道

    匿名管道通信常用于父子进程间的通信。通过fork创建子进程,让具有血缘关系的进程能够进行通信。
    其实现通信的步骤主要有3步:

    1. 父进程分别以读和写方式打开同一个文件
    2. fork()创建子进程
    3. 父子进程各自关闭自己不需要的文件描述符

    在这里插入图片描述
    如上图看管道本质还是文件。
    既然管道通信,首先要能够创建出管道。pipe系统接口可以帮助创建管道。其参数pipefd是一个数组,

    // pipefd[0]对应读端的文件描述符
    // pipefd[1]对应写端的文件描述符
    int pipe(int pipefd[2]);
    
    • 1
    • 2
    • 3
    // 匿名管道通信测试
    void Test1()
    {
        // 1.创建管道
        int pipefd[2] = {0};
        int ret = pipe(pipefd);
        if(ret != 0)
        {
            perror("pipe");
            exit(1);
        }
        // 测试打开的文件描述符
        cout << "pipefd[0]: " << pipefd[0] << endl;
        cout << "pipefd[1]: " << pipefd[1] << endl;
    
        // 2.创建子进程
        pid_t pid = fork();
    
        if(pid > 0)
        {
        	// 3.构建单向通行的信道,父进程写入,子进程读取
            // 3.1.父进程 -- 写
            
            // 关闭读端
            close(pipefd[0]);
            int count = 0;
            while(true)
            {
            	// 不断写如变化的信息
                string msg = "hello world" + to_string(count++);
                write(pipefd[1], msg.c_str(), msg.size());
                sleep(1);
    
                if(count > 5)
                {
                    cout << "write quit" << endl;
                    break;
                }
            }
            // 关闭写端
            close(pipefd[1]);
    		
    		// 4.等待子进程
            pid_t wpid = waitpid(pid, nullptr, 0);
            if(wpid == -1)
            {
                perror("waitpid");
                exit(3);
            }
        }
        else if(pid == 0)
        {
            // 3.2.子进程 -- 读
    		
    		// 关闭写端
            close(pipefd[1]);
    		
    		// 不断读取信息
            char receive[128] = {0};
            while(true)
            {
                ssize_t size = read(pipefd[0], receive, 127);
                if(size > 0)
                {
                    cout << "receive: " << receive << endl;
                }
                else if(size == 0) 
                {
                    cout << "write quit, read quit" << endl;
                    break;
                }
                else
                {
                    perror("read");
                    exit(4);
                }
            }
            // 关闭读端
            close(pipefd[0]);
        }
        else 
        {
            perror("fork");
            exit(2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    在这里插入图片描述
    通过匿名管道我们还可以模拟进程池的设计。

    // 简单的进程池设计
    #define PROCESS_NUM 5
    
    using f = function<void()>;
    unordered_map<int, f> task;
    
    void load()
    {
        task[1] = [](){cout << "sub process[" << getpid() << "]->void Task1()" << endl;};
        task[2] = [](){cout << "sub process[" << getpid() << "]->void Task2()" << endl;};
        task[3] = [](){cout << "sub process[" << getpid() << "]->void Task3()" << endl;};
        task[4] = [](){cout << "sub process[" << getpid() << "]->void Task4()" << endl;};
    }
    
    void sendTask(int fd, pid_t pid, int task_num)
    {
        write(fd, &task_num, sizeof(task_num));
        cout << "process[" << pid << "] execute " << "task" << task_num << " by " << fd << endl;
    }
    
    int waitTask(int fd)
    {
        int task_num = 0;
        ssize_t size = read(fd, &task_num, sizeof(task_num));
        if(size == 0)
        {
            return 0;
        }
        if(size == sizeof(task_num))
        {
            return task_num;
        }
        return -1;
    }
    
    void Test2()
    {
        load();
        vector<pair<int, pid_t>> process;
        // 创建多个进程
        for(int i = 0; i < PROCESS_NUM; ++i)
        {
            // 创建管道
            int pipefd[2] = {0};
            int ret = pipe(pipefd);
            if(ret != 0)
            {
                perror("pipe");
                exit(1);
            }
    
            // 创建子进程
            pid_t pid = fork();
    
            if(pid == 0)
            {
                // 子进程 -- 读
                close(pipefd[1]);
    
                while(true)
                {
                    // 等待任务
                    int task_num = waitTask(pipefd[0]);
                    if(task_num == 0)
                    {
                        break;
                    }
                    else if(task_num >= 1 && task_num <= task.size())
                    {
                        task[task_num]();
                    }
                    else
                    {
                        perror("waitTask");
                        exit(3);
                    }
                }
                exit(0);
            }
            else if (pid < 0)
            {
                perror("fork");
                exit(2);
            }
    
            // 父进程读端关闭
            close(pipefd[0]);
            process.emplace_back(pipefd[1], pid);
        }
    
        // 父进程 -- 写
        srand((unsigned int)time(0));
        
        while(true)
        {
            // 选择一个进程 -- 随机数方式的负载均衡
            int process_num = rand() % process.size();
            // 选择一个任务
            // int task_num = rand() % task.size() + 1;
            int task_num = 0;
            cout << "please enter your task num: ";
            cin >> task_num;
    
            // 派发任务
            sendTask(process[process_num].first, process[process_num].second, task_num);
        }
    
        // 关闭fd
        for(const auto& e : process)
        {
            close(e.first);
        }
    
        // 回收子进程
        for(const auto& e : process)
        {
            waitpid(e.second, nullptr, 0);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119

    在这里插入图片描述

    命名管道

    可以用mkfifo命令创建一个命名管道。如下图是一个命名管道的小实验。
    在这里插入图片描述
    也可以通过mkfifo接口进行命名管道文件的创建。
    在这里插入图片描述
    命名管道通信的测试。

    // 1. log.hpp
    #include 
    
    enum ErrLevel
    {
        lev_0,
        lev_1,
        lev_2,
        lev_3,
        lev_4
    };
    
    const std::string error[] = {
        "err_0",
        "err_1",
        "err_2",
        "err_3",
        "err_4"
    };
    
    std::ostream& Log(const std::string& msg, int level)
    {
        std::cout << " | " << (unsigned int)time(0) << " | " << error[level] << " | " << msg << " |";
        return std::cout;
    }
    
    // 2. comm.hpp
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    using namespace std;
    
    #include "log.hpp"
    
    #define MODE 0666
    #define SIZE 128
    
    // 命名管道,通过文件路径,让不同进程能看到这同一份资源
    string named_pipe_path = "/home/zs/linux/testcpp/fifo.ipc";
    
    // 3. server.cpp
    static void getMsg(int fd)
    {
        char buffer[SIZE];
        while(true)
        {
            memset(buffer, '\0', sizeof(buffer));
            ssize_t size = read(fd, buffer, sizeof(buffer) - 1); // ssize_t - long int
            if(size > 0)
            {
                cout << "[" << getpid() << "]" << "client say:" << buffer << endl;
            }
            else if(size == 0)
            {
                cerr << "[" << getpid() << "]" << "read end of file, client quit, then server quit" << endl;
                break;
            }
            else
            {
                perror("read");
                break;
            }
        }
    }
    
    void test()
    {
        // 1.创建管道文件
        if(0 != mkfifo(named_pipe_path.c_str(), MODE))
        {
            perror("mkfifo");
            exit(1);
        }
        Log("创建管道文件成功", lev_0) << endl;
    
        // 2.文件操作
        int fd = open(named_pipe_path.c_str(), O_RDONLY);
        if(fd < 0)
        {
            perror("open");
            exit(2);
        }
        Log("打开管道文件成功", lev_0) << endl;
    
        for(int i = 0; i < 3; ++i)
        {
            pid_t pid = fork();
            if(pid == 0)
            {
                // 3.通信
                getMsg(fd);
                exit(0);
            }
        }
    
        for(int i = 0; i < 3; ++i)
        {
            waitpid(-1, nullptr, 0);
        }
    
        // 4.关闭文件
        close(fd);
        Log("关闭管道文件成功", lev_0) << endl;
        unlink(named_pipe_path.c_str()); // 通信完毕,删除管道文件
        Log("删除管道文件成功", lev_0) << endl;
    }
    
    // 4. client.cpp
    void test()
    {
        // 1.获取管道文件
        int fd = open(named_pipe_path.c_str(), O_WRONLY);
        if(fd < 0)
        {
            perror("open");
            exit(1);
        }
        
        // 2.通信
        string message;
        while(true)
        {
            cout << "please enter your message: ";
            getline(cin, message);
            write(fd, message.c_str(), message.size());
        }
    
        // 关闭
        close(fd);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134

    在这里插入图片描述

    管道通信总结

    1. 管道常用来进行具有血缘关系的进程间的通信
    2. 管道让进程间协同,提供了访问控制
    3. 管道提供的是面向流式的通信服务
    4. 管道是基于文件的,文件的生命周期跟随进程,管道的生命周期也跟随进程
    5. 管道用于单向通信,属于半双工通信的一种特殊情况

    管道本质是文件,又和传统的文件又不一样。管道文件不会将数据刷新到磁盘。
    匿名管道通过父子继承的方式看到同一份资源,命名管道通过文件路径的唯一性看到同一份资源,从而达到不同进程间通信的目的。
    对于管道文件:
    如果写的一方很快,读的一方很慢,当管道写满时,写端必须等待;
    如果写的一方很慢,读的一方很快,当管道没有数据时,读端必须等待;
    如果写端先被关闭了,读端会读到文件结尾;
    如果读端先被关闭了,操作系统会终止写端进程。

  • 相关阅读:
    FlinkSql中的join操作详解
    Tomcat安装与配置
    基于微信小程序的奶茶在线预定点单管理系统
    Mysql之数据处理及数据汇总函数
    CICD命令
    Mybatis参数传递方式
    11个开源测试自动化框架,如何选?
    如何使用C#在Excel中插入分页符
    mysql主从同步
    SCT1270FQAR/TPS61089RNRR
  • 原文地址:https://blog.csdn.net/weixin_62172209/article/details/134449408