进程间通信的目的:
管道是 Unix 中最古老的进程间通信的形式
我们把从一个进程连接到另一个进程的一个数据流称为一个管道,管道是单向的。
管道本质上也是一个文件,但是它和普通的文件不一样,普通的文件是为了向磁盘上读写数据,而管道是为了进程间通信,而且管道的操作属于内存级别。
下面简单讲述管道的原理,需要用到文件描述符的知识👉[Linux](10)系统级I/O,文件描述符,重定向,缓冲区,文件操作模拟实现_世真的博客-CSDN博客
父进程创建管道

创建子进程,子进程会继承父进程打开的文件

关闭父进程的读端 fd[0] 和子进程的写端 fd[1],保证单向性

pipe 函数:
创建管道
NAME
pipe, pipe2 - create pipe
SYNOPSIS
#include
int pipe(int pipefd[2]);
// 创建成功:返回0。出错:返回-1
pipe() 创建一个管道,它是一个可用于进程间通信的单向数据通道。数组 pipefd 用于返回引用管道末端的两个文件描述符。pipefd[0] 表示管道的读取端。pipefd[1] 表示管道的写入端。从管道写入端写入的数据将由内核缓冲,直到从管道读取端读取。
例子:
创建管道并查看pipefd的两个文件描述符:
#include
#include
#include
using namespace std;
int main()
{
int pipefd[2] = { 0 };
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
cout << "fd[0]: " << pipefd[0] << endl;
cout << "fd[1]: " << pipefd[1] << endl;
return 0;
}
运行结果:
fd[0]: 3
fd[1]: 4
接下来创建子进程,并通过管道让父进程给子进程发消息:
#include
#include
#include
#include
#include
#include
using namespace std;
int main()
{
// 创建管道
int pipefd[2] = { 0 };
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
// 创建子进程
pid_t id = fork();
if (id < 0)
{
cerr << "fork error" << endl;
return 2;
}
else if (id == 0)
{
// 子进程
// 子进程进行读取,关闭写端
close(pipefd[1]);
#define NUM 1024
char buffer[NUM];
while (true)
{
memset(buffer, 0, sizeof(buffer));
size_t s = read(pipefd[0], buffer, sizeof(buffer));
if (s > 0)
{
buffer[s] = '\0';
cout << "子进程收到消息,内容是:" << buffer << endl;
}
else if (s == 0)
{
cout << "父进程写完,子进程读完,退出" << endl;
break;
}
else
{
// nothing to do
}
}
close(pipefd[0]);
exit(0);
}
else
{
// 父进程
// 父进程进行写入,关闭读端
close(pipefd[0]);
string msg = "儿子,我是你爸爸";
int cnt = 0;
while (cnt < 5)
{
write(pipefd[1], msg.c_str(), msg.size());
sleep(1);
++cnt;
}
close(pipefd[1]);
cout << "父进程写入完毕" << endl;
}
pid_t res = waitpid(id, nullptr, 0);
if (res > 0)
{
cout << "等待子进程成功" << endl;
}
return 0;
}
上述代码,让父进程写5次消息到管道中,子进程一直读,父进程写完后等待子进程退出。
补充:read函数可以通过查看文件的引用计数来得知父进程有没有关闭描述符,如果关闭了,read就会读完然后返回0
输出结果:
[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
子进程收到消息,内容是:儿子,我是你爸爸
父进程写完,子进程读完,退出
父进程写入完毕
等待子进程成功
在观察结果的过程中很容易发现一个现象:父进程每隔1秒写入一次,子进程读消息的节奏也是1s一次。
当父进程没有写入的时候,子进程在干什么呢?
创建一个任务集合,父进程通过发送任务码让子进程完成指定的任务。
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
typedef void(*functor)(); // 定义函数指针
vector<functor> functors; // 方法集合
unordered_map<uint32_t, string> info;
void f1()
{
printf("这是一个处理日志的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr));
}
void f2()
{
printf("这是一个备份数据的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr));
}
void f3()
{
printf("这是一个处理网络连接的任务,执行的进程 id [%d],执行时间是[%d]\n", getpid(), time(nullptr));
}
void loadFunctor()
{
info.insert({functors.size(), "处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据的任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接的任务"});
functors.push_back(f3);
}
int main()
{
// 加载任务列表
loadFunctor();
// 创建管道
int pipefd[2] = { 0 };
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}
// 创建子进程
pid_t id = fork();
if (id < 0)
{
cerr << "fork error" << endl;
return 2;
}
else if (id == 0)
{
// child, read
close(pipefd[1]);
while (true)
{
uint32_t operatorType = 0;
// 如果有数据,就读取,如果没有数据,就阻塞等待
ssize_t s = read(pipefd[0], &operatorType, sizeof(ssize_t));
if (s == 0)
{
cout << "父进程派发任务完毕,子进程退出" << endl;
break;
}
assert(s == sizeof(uint32_t));
(void)s;
if (operatorType < functors.size())
{
functors[operatorType]();
}
else
{
cerr << "bug? operatorType = " << operatorType << endl;
}
}
close(pipefd[0]);
exit(0);
}
else
{
srand((long long)time(nullptr));
// parent, write
close(pipefd[0]);
// 指派任务
int num = functors.size();
int cnt = 10;
while (cnt--)
{
// 形成任务码
uint32_t commandCode = rand() % num;
cout << "父进程指派任务完成,任务是:" << info[commandCode] << "任务编号:" << cnt << endl;
// 向指定的进程下达任务
write(pipefd[1], &commandCode, sizeof(uint32_t));
sleep(1);
}
close(pipefd[1]);
pid_t res = waitpid(id, nullptr ,0);
if (res) cout << "wait success" << endl;
}
return 0;
}
运行结果:
[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
父进程指派任务完成,任务是:处理网络连接的任务任务编号:9
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355875]
父进程指派任务完成,任务是:备份数据的任务任务编号:8
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355876]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:7
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355877]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:6
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355878]
父进程指派任务完成,任务是:处理网络连接的任务任务编号:5
这是一个处理网络连接的任务,执行的进程 id [11735],执行时间是[1667355879]
父进程指派任务完成,任务是:备份数据的任务任务编号:4
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355880]
父进程指派任务完成,任务是:处理日志的任务任务编号:3
这是一个处理日志的任务,执行的进程 id [11735],执行时间是[1667355881]
父进程指派任务完成,任务是:备份数据的任务任务编号:2
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355882]
父进程指派任务完成,任务是:备份数据的任务任务编号:1
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355883]
父进程指派任务完成,任务是:备份数据的任务任务编号:0
这是一个备份数据的任务,执行的进程 id [11735],执行时间是[1667355884]
父进程派发任务完毕,子进程退出
wait success
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
typedef void(*functor)(); // 定义函数指针
vector<functor> functors; // 方法集合
unordered_map<uint32_t, string> info;
void f1()
{
printf("这是一个处理日志的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr));
}
void f2()
{
printf("这是一个备份数据的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr));
}
void f3()
{
printf("这是一个处理网络连接的任务,执行的进程 id [%d],执行时间是[%d]\n\n", getpid(), time(nullptr));
}
void loadFunctor()
{
info.insert({functors.size(), "处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "备份数据的任务"});
functors.push_back(f2);
info.insert({functors.size(), "处理网络连接的任务"});
functors.push_back(f3);
}
// int32_t:进程pid, int32_t:该进程对应的管道写端fd
typedef pair<int32_t, int32_t> elem;
vector<elem> assignMap;
int processNum = 5;
void work(int blockFd)
{
cout << "进程 [" << getpid() << "] " << "开始工作" << endl;
// 子进程核心工作的代码
while (true)
{
// 阻塞等待,获取任务信息
uint32_t operaotrCode = 0;
ssize_t s = read(blockFd, &operaotrCode, sizeof(uint32_t));
if (s == 0) break;
assert(s == sizeof(uint32_t));
(void)s;
// 处理任务
if (operaotrCode < functors.size()) functors[operaotrCode]();
}
cout << "进程 [" << getpid() << "] " << "结束工作" << endl;
}
void sendTask(const vector<elem>& processFds)
{
srand((long long)time(nullptr));
while (true)
{
sleep(1);
// 选择一个进程
uint32_t pick = rand() % processFds.size();
// 选择一个任务
uint32_t task = rand() % functors.size();
// 把任务给一个指定的进程
write(processFds[pick].second, &task, sizeof(task));
// 打印提示信息
printf("父进程指派任务: %s 给进程:%d, 编号: %d\n", info[task].c_str(), processFds[pick].first, pick);
}
}
int main()
{
loadFunctor();
// 创建processNum个进程
for (int i = 0; i < processNum; ++i)
{
int pipefd[2] = { 0 };
// 创建管道
pipe(pipefd);
// 创建子进程
pid_t id = fork();
if (id == 0)
{
// 子进程读取,r
close(pipefd[1]);
// 子进程执行
work(pipefd[0]);
exit(0);
}
// 父进程做的事情
close(pipefd[0]);
elem e(id, pipefd[1]);
assignMap.push_back(e);
}
cout << "create all process sucess" << endl;
// 父进程派发任务
sendTask(assignMap);
// 回收资源
for (int i = 0; i < processNum; ++i)
{
if (waitpid(assignMap[i].first, nullptr, 0) > 0)
printf("wait for: pid = %d, wait success, number: %d\n", assignMap[i].first, i);
close(assignMap[i].second);
}
return 0;
}
[CegghnnoR@VM-4-13-centos pipe]$ ./mypipe
进程 [19069] 开始工作
create all process sucess
进程 [19071] 开始工作
进程 [19072] 开始工作
进程 [19073] 开始工作
进程 [19070] 开始工作
父进程指派任务: 处理网络连接的任务 给进程:19069, 编号: 0
这是一个处理网络连接的任务,执行的进程 id [19069],执行时间是[1667397801]
父进程指派任务: 处理日志的任务 给进程:19073, 编号: 4
这是一个处理日志的任务,执行的进程 id [19073],执行时间是[1667397802]
父进程指派任务: 备份数据的任务 给进程:19069, 编号: 0
这是一个备份数据的任务,执行的进程 id [19069],执行时间是[1667397803]
父进程指派任务: 处理网络连接的任务 给进程:19071, 编号: 2
这是一个处理网络连接的任务,执行的进程 id [19071],执行时间是[1667397804]
父进程指派任务: 备份数据的任务 给进程:19071, 编号: 2
这是一个备份数据的任务,执行的进程 id [19071],执行时间是[1667397805]
父进程指派任务: 处理网络连接的任务 给进程:19073, 编号: 4
这是一个处理网络连接的任务,执行的进程 id [19073],执行时间是[1667397806]
父进程指派任务: 备份数据的任务 给进程:19072, 编号: 3
这是一个备份数据的任务,执行的进程 id [19072],执行时间是[1667397807]
补充:命令行中 | 也可以创建管道,| 左右的两个命令所创建的进程之间可以相互通信,这是子进程之间的通信。
上面的匿名管道只能进行有血缘关系的进程之间通信,对于毫不相干的进程间通信,需要使用命名管道。
创建命名管道的命令 mkfifo:
NAME
mkfifo - make FIFOs (named pipes)
SYNOPSIS
mkfifo [OPTION]... NAME...
使用该命令,创建一个管道文件:
[CegghnnoR@VM-4-13-centos named_pipe]$ mkfifo myfifo
[CegghnnoR@VM-4-13-centos named_pipe]$ ll
total 0
prw-rw-r-- 1 CegghnnoR CegghnnoR 0 Nov 2 22:54 myfifo
例子:
在一个终端中等待读取管道中的数据,在另一个终端中向管道写入数据,被写入的数据会在另一个终端中显示。

创建命名管道的函数接口:
NAME
mkfifo - make a FIFO special file (a named pipe)
SYNOPSIS
#include
#include
int mkfifo(const char *pathname, mode_t mode);
// 创建成功:返回0,失败:返回-1
pathname:管道文件路径
mode:文件权限
需要注意的是:命名管道的通信依然是在内存里进行的,数据不会刷新到磁盘,所谓的管道文件,只是代表一种符号。
例子:
编写两个程序,一个表示客户端,一个表示服务端。让客户端输入的信息能够在服务端显示。
comm.h
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define IPC_PATH "./.fifo"
clientFifo.cpp
// 写入
#include "comm.h"
using namespace std;
int main()
{
int pipeFd = open(IPC_PATH, O_WRONLY);
if (pipeFd < 0)
{
cerr << "open: " << strerror(errno) << endl;
return 1;
}
#define NUM 1024
char line[NUM];
while (true)
{
printf("请输入你的消息# ");
fflush(stdout);
memset(line, 0, sizeof(line));
if (fgets(line, sizeof(line), stdin) != nullptr)
{
write(pipeFd, line, strlen(line));
}
else
{
break;
}
}
close(pipeFd);
cout << "客户端退出" << endl;
return 0;
}
serverFifo.cpp
// 读取
#include "comm.h"
using namespace std;
int main()
{
umask(0);
if (mkfifo(IPC_PATH, 0600) != 0)
{
cerr << "mkfifo error" << endl;
return 1;
}
int pipeFd = open(IPC_PATH, O_RDONLY);
if (pipeFd < 0)
{
cerr << "open fifo error" << endl;
return 2;
}
// 通信过程...
#define NUM 1024
char buffer[NUM];
while (true)
{
ssize_t s = read(pipeFd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = '\0';
cout << "客户端->服务器# " << buffer << endl;
}
else if (s == 0)
{
cout << "客户退出了,我服务也退出" << endl;
break;
}
else
{
//错误
cout << strerror(errno) << endl;
break;
}
}
close(pipeFd);
cout << "服务端退出" << endl;
return 0;
}
运行:
打开两个终端,下图分别为客户端(左)和服务端(右),先运行服务端创建管道并等待输入,然后运行客户端输入信息。

可以看到这两个程序是毫无关系的,但是它们可以通过命名管道实现通信。
最后一个小知识点:
使用unlink删除文件。
NAME
unlink - delete a name and possibly the file it refers to
SYNOPSIS
#include
int unlink(const char *pathname);
当你在程序运行结束想删除管道文件,可以使用 unlink
//...
close(pipeFd);
cout << "服务端退出" << endl;
unlink(IPC_PATH); // 在程序运行结束就把管道文件删掉
return 0;
}