
Linux进程间通信(IPC,Inter-Process Communication)的主要目的是使不同的进程能够协同工作、共享信息和资源,以便完成各种任务。IPC 在多进程或多线程的应用程序中是至关重要的,因为它允许进程之间进行数据传输、同步和协调操作。以下是一些主要的目的和应用场景:
进程间通信(IPC)是操作系统中的一个重要概念,它允许不同的进程之间交换数据和协作。IPC 方法的发展经历了不同的阶段和技术。在 Linux 和其他类 Unix 操作系统中,主要有以下三个重要的 IPC 机制:管道、System V IPC 和 POSIX IPC。这些机制有不同的特点和用途,它们一起构成了进程间通信的演进历史。
| 来连接命令。这篇文章我们先介绍管道通信中的匿名管道pipe
在计算机科学中,管道(Pipe)是一种进程间通信(IPC)机制,用于在一个进程的输出流直接传递到另一个进程的输入流,实现数据的传输和协作。管道通常用于将多个命令或进程连接在一起,其中一个命令的输出成为另一个命令的输入。这种机制非常有用,因为它可以将多个小任务组合成一个更大的任务,实现数据流的传递和处理
在 Unix/Linux 操作系统中,管道通常通过管道操作符 | 来实现,它将两个或多个命令连接在一起,使其执行时将数据从一个命令传递到下一个命令。以下是一个简单的示例:
假设你有一个名为 input.txt 的文本文件,其中包含一些文本数据。您希望将该数据按行排序,然后查找包含特定关键字的行。您可以使用管道将 sort 命令和 grep 命令连接起来完成这个任务:
cat input.txt | sort | grep "关键字"
在这个示例中,cat 命令用于将文件的内容读取并将其传递给 sort 命令,sort 命令对文本进行排序,并将结果传递给 grep 命令,grep 命令查找包含特定关键字的行。整个操作是通过管道实现的,数据从一个命令流向下一个命令,形成数据处理流。
管道的主要特点包括:
- 数据传递:管道允许数据在不同命令或进程之间传递,使它们可以进行处理。
- 连接命令:多个命令可以通过管道连接在一起,实现协作处理。
- 实时处理:数据流是实时的,不需要等待整个数据文件被处理完。
- 节省资源:不需要在磁盘上创建中间文件,可以节省磁盘和内存资源。
管道是 Unix/Linux 系统中强大的工具,它可以用于构建复杂的数据处理流程,从简单的文本处理到复杂的系统管理任务。它在 shell 脚本和命令行中被广泛使用,以提高任务的效率和可组合性。

匿名管道(Anonymous Pipe)是一种轻量级的进程间通信(IPC)机制,用于在同一计算机上的两个相关进程之间传递数据。匿名管道被称为“匿名”是因为它们没有独立的文件系统路径或标识符,而是在内存中创建的通道,用于实现进程之间的数据传递。
匿名管道通常是单向的,支持从一个进程的输出流传递数据到另一个进程的输入流。它们是进程通信的一种基本方式,特别适用于父子进程之间的通信。
在 Unix/Linux 操作系统中,匿名管道通常由 pipe() 系统调用创建。一旦管道被创建,两个相关的进程(通常是父子进程)可以使用管道进行通信。一个进程可以将数据写入管道,而另一个进程可以从管道中读取数据。
匿名管道的典型用途包括:
| 来将一个命令的输出连接到另一个命令的输入,这背后就是匿名管道的工作机制。请注意,匿名管道只能在有关的进程之间使用,通常用于协作性任务。如果需要在不相关的进程之间进行通信,或者需要更复杂的通信机制,可以考虑其他进程间通信机制,如命名管道、System V IPC 或网络套接字。
#include
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
pipe 函数的原型如下:
#include
int pipe(int filedes[2]);
参数 filedes 是一个包含两个整数的数组,用于存储管道的文件描述符。filedes[0] 是管道的读端,filedes[1] 是管道的写端。
下面是一个简单的示例,演示如何使用 pipe 函数创建管道并在父子进程之间传递数据:
#include
#include
#include
int main() {
int filedes[2];
char buffer[30];
pid_t child_pid;
// 创建管道
if (pipe(filedes) == -1) {
perror("pipe");
exit(EXIT_FAILURE);
}
child_pid = fork(); // 创建子进程
if (child_pid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (child_pid == 0) { // 子进程
close(filedes[1]); // 关闭管道写端
read(filedes[0], buffer, sizeof(buffer));
printf("子进程读取的数据: %s\n", buffer);
close(filedes[0]);
} else { // 父进程
close(filedes[0]); // 关闭管道读端
write(filedes[1], "Hello, child!", 13);
close(filedes[1]);
}
return 0;
}
输出结果
子进程读取的数据: Hello, child!
在此示例中,pipe 函数创建了一个管道,然后通过 fork 创建了一个子进程。父进程向管道写入数据,子进程从管道中读取数据。通过这种方式,父子进程之间实现了数据传递。

首先我们建立一个mypipe.cpp文件
写入需要用到的头文件并展开std命名空间
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
主函数中我们先创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n != -1); // debug assert
(void)n;
int pipefd[2] = {0};:在栈上创建一个名为 pipefd 的整数数组,其中有两个元素。这两个元素分别代表管道的读端(pipefd[0])和写端(pipefd[1])。int n = pipe(pipefd);:使用 pipe 函数创建管道。如果创建成功,pipe 函数会将读端和写端的文件描述符填充到 pipefd 数组中。assert(n != -1);:assert 是一个宏,用于在运行时检查条件是否为真。如果条件为假,它会终止程序执行,并在标准错误输出中打印一条错误消息。在这里,它用于检查 pipe 函数是否成功创建了管道。如果 pipe 函数返回 -1(表示创建失败),则程序将终止。(void)n 这一行代码是为了在编译器的严格性检查下避免未使用的变量 n 导致的编译警告。在这段代码中,n 是用于接收 pipe 函数的返回值的变量,但在后续代码中并没有实际使用 n,因此编译器可能会生成未使用变量的警告。
将 (void)n 添加到代码中的目的是明确告诉编译器,你有意不使用变量 n,并且这是一个有意的决策。这可以帮助消除编译器关于未使用变量的警告,以保持代码的整洁性。
要注意的是,这样的代码通常是为了编程风格和规范的目的,以确保代码清晰和易于维护。实际上,你也可以在不使用 (void)n 的情况下编译代码,但这可能会导致一些编译器产生警告信息,或者在某些情况下可能导致不必要的干扰。
加入条件编译观察管道的文件描述符的值
#ifdef DEBUG
cout << "pipefd[0]: " << pipefd[0] << endl;
cout << "pipefd[1]: " << pipefd[1] << endl;
#endif
通常,DEBUG 宏会在调试模式下定义,以便在开发和测试时提供更多的信息,而在生产模式下将其禁用,以减少不必要的输出和性能开销。例如,在编译时可以使用 -D 选项定义 DEBUG 宏,如下所示:
g++ -o my_program my_program.cpp -DDEBUG
这将在编译过程中定义 DEBUG 宏,允许在代码中执行条件编译的调试输出。如果不需要调试输出,可以省略 -DDEBUG 选项,或者在代码中注释掉 #define DEBUG。
如果你使用makefile编译的话可以参考下面的文件代码
pipe:pipe.cc
g++ -o $@ $^ -std=c++11 -DDEBUG
.PHONY:clean
clean:
rm -f pipe
创建子进程
pid_t id = fork();
assert(id != -1);
if (id == 0)
{
close(pipefd[1]);
char buffer[1024 * 8];
while (true)
{
ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
cout << "child get a message[" << getpid() << "] Father# " << buffer << endl;
}
else if(s == 0)
{
cout << "writer quit(father), me quit!!!" << endl;
break;
}
}
exit(0);
}
pid_t id = fork();:这行代码创建一个子进程,并将子进程的 PID(进程ID)存储在 id 变量中。如果 fork 失败,id 将是 -1,因此接下来的 assert 用于检查 fork 是否成功。父子进程将根据 fork 的返回值分别执行不同的代码。if (id == 0):这是一个条件语句,用于判断当前代码块是否在子进程中执行。如果 id 等于 0,说明当前代码块在子进程中执行。close(pipefd[1]);:子进程关闭管道的写端,这是因为子进程只负责从管道中读取数据。关闭写端有助于确保管道的正确使用。char buffer[1024 * 8];:定义一个字符数组 buffer 用于存储从管道中读取的数据。while (true):进入一个无限循环,子进程将一直尝试从管道中读取数据。ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);:子进程使用 read 函数从管道的读端读取数据,并将读取的字节数存储在 s 中。数据将存储在 buffer 中,但要确保不超过 buffer 的大小减 1,以便在末尾添加 null 终止字符。if (s > 0):如果成功读取了数据,进入这个条件,然后子进程将打印接收到的消息。else if (s == 0):如果 read 返回值为 0,这表示管道的写端已经被关闭,通常意味着父进程已经完成了数据的写入。子进程在这种情况下打印一条消息,然后退出。exit(0);:子进程退出,结束其执行。父进程写入
close(pipefd[0]);
string message = "我是父进程,我正在给你发消息";
int count = 0;
char send_buffer[1024 * 8];
while (true)
{
snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d",message.c_str(), getpid(), count++);
write(pipefd[1], send_buffer, strlen(send_buffer));
sleep(1);
cout << count << endl;
if (count == 5){
cout << "writer quit(father)" << endl;
break;
}
}
close(pipefd[1]);
pid_t ret = waitpid(id, nullptr, 0);
cout << "id : " << id << " ret: " << ret <<endl;
assert(ret > 0);
(void)ret;
close(pipefd[0]);:父进程关闭管道的读端,因为父进程只负责向管道写入数据,关闭读端可以确保管道的正确使用。string message = "我是父进程,我正在给你发消息";:定义一个字符串 message,其中包含要发送给子进程的消息。int count = 0;:初始化一个计数器 count,用于给消息编号。char send_buffer[1024 * 8];:定义一个字符数组 send_buffer 用于存储要发送的消息。while (true):进入一个无限循环,父进程将不断发送消息。snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d", message.c_str(), getpid(), count++);:使用 snprintf 函数将消息格式化到 send_buffer 中,包括消息文本、父进程的 PID 和消息编号。write(pipefd[1], send_buffer, strlen(send_buffer));:使用 write 函数将消息写入管道的写端。这将把消息发送给子进程。sleep(1);:父进程休眠 1 秒,然后继续发送下一条消息。if (count == 5):当 count 达到 5 时,父进程输出一条消息表示它将退出,并且终止循环。close(pipefd[1]);:父进程在退出之前关闭管道的写端,以确保子进程知道不会再有更多的数据传入。pid_t ret = waitpid(id, nullptr, 0);:父进程使用 waitpid 函数等待子进程的终止。这确保了在子进程退出之前,父进程不会提前退出。assert(ret > 0);:使用 assert 检查 waitpid 函数的返回值 ret 是否大于 0,以确保子进程已经正常退出。如果 waitpid 返回负值,表示等待出现错误,可能需要进一步处理。编译后执行
child get a message[11049] Father# 我是父进程,我正在给你发消息[11048] : 0
1
child get a message[11049] Father# 我是父进程,我正在给你发消息[11048] : 1
2
child get a message[11049] Father# 我是父进程,我正在给你发消息[11048] : 2
3
child get a message[11049] Father# 我是父进程,我正在给你发消息[11048] : 3
4
child get a message[11049] Father# 我是父进程,我正在给你发消息[11048] : 4
5
writer quit(father)
writer quit(father), me quit!!!
id : 11049 ret: 11049
全部代码
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
int main()
{
//1.创建管道
int pipefd[2]={0};
int n=pipe(pipefd);
assert(n!=-1);
(void)n;
#ifdef DEBUG
cout << "pipefd[0]: " << pipefd[0] << endl;
cout << "pipefd[1]: " << pipefd[1] << endl;
#endif
// 2. 创建子进程
pid_t id = fork();
assert(id != -1);
if (id == 0)
{
close(pipefd[1]);
char buffer[1024 * 8];
while (true)
{
ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
cout << "child get a message[" << getpid() << "] Father# " << buffer << endl;
}
else if(s == 0)
{
cout << "writer quit(father), me quit!!!" << endl;
break;
}
}
exit(0);
}
close(pipefd[0]);
string message = "我是父进程,我正在给你发消息";
int count = 0;
char send_buffer[1024 * 8];
while (true)
{
snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d",message.c_str(), getpid(), count++);
write(pipefd[1], send_buffer, strlen(send_buffer));
sleep(1);
cout << count << endl;
if (count == 5){
cout << "writer quit(father)" << endl;
break;
}
}
close(pipefd[1]);
pid_t ret = waitpid(id, nullptr, 0);
cout << "id : " << id << " ret: " << ret <<endl;
assert(ret > 0);
(void)ret;
return 0;
}
用fork来共享管道原理

站在文件描述符角度-深度理解管道

站在内核角度-管道本质

所以,看待管道,就如同看待文件一样!管道的使用和文件一致,迎合了“Linux一切皆文件思想”
#pragma once
#include
#include
#include
#include
#include
#include
typedef std::function<void()> func;
std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;
void register_web()
{
std::cout << "sub process[" << getpid() << " ] 注册网站\n" << std::endl;
}
void login_web()
{
std::cout << "sub process[" << getpid() << " ] 登录网站\n" << std::endl;
}
void cal_sql()
{
std::cout << "sub process[" << getpid() << " ] 调用数据库\n" << std::endl;
}
void save_sql()
{
std::cout << "sub process[" << getpid() << " ] 保存数据库\n" << std::endl;
}
void load()
{
desc.insert({callbacks.size(), "register_web: 注册网站"});
callbacks.push_back(register_web);
desc.insert({callbacks.size(), "login_web: 登陆网站"});
callbacks.push_back(login_web);
desc.insert({callbacks.size(), "cal_sql: 调用数据库"});
callbacks.push_back(cal_sql);
desc.insert({callbacks.size(), "save_sql: 保存数据库"});
callbacks.push_back(save_sql);
}
void showHandler()
{
for(const auto &iter : desc )
{
std::cout << iter.first << "\t" << iter.second << std::endl;
}
}
int handlerSize()
{
return callbacks.size();
}
typedef std::function func; :这行代码定义了一个函数类型别名 func,它表示一个没有参数和没有返回值的函数。std::vector callbacks; :这是一个 std::vector,用于存储回调函数。每个元素都是一个函数对象,表示一个回调函数。std::unordered_map desc; :这是一个无序映射,用于存储回调函数的描述。它将回调函数的索引(在 callbacks 中的位置)映射到相应的描述字符串。void register_web(), void login_web(), void cal_sql(), void save_sql():这些是实际的回调函数,每个函数执行不同的操作,并在标准输出中打印相应的消息,指示它们的操作。void load():load 函数用于初始化回调函数和它们的描述,将它们添加到 callbacks 和 desc 中。每个回调函数在添加到 callbacks 时都会分配一个唯一的索引,这个索引也用于关联到描述。void showHandler():showHandler 函数用于显示所有回调函数的描述和它们的索引。int handlerSize():handlerSize 函数返回 callbacks 中回调函数的数量。#include
#include
#include
#include
#include
#include
#include
#include
#include "task.hpp"
#define PROCESS_NUM 5
using namespace std;
int waitCommand(int waitFd, bool &quit) //如果不发,就阻塞
{
uint32_t command = 0;
ssize_t s = read(waitFd, &command, sizeof(command));
if (s == 0)
{
quit = true;
return -1;
}
assert(s == sizeof(uint32_t));
return command;
}
void sendAndWakeup(pid_t who, int fd, uint32_t command)
{
write(fd, &command, sizeof(command));
cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}
int main()
{
load();
vector<pair<pid_t, int>> slots;
// 先创建多个进程
for (int i = 0; i < PROCESS_NUM; i++)
{
// 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
pid_t id = fork();
assert(id != -1);
// 子进程进行读取
if (id == 0)
{
// 关闭写端
close(pipefd[1]);
// child
while (true)
{
// 等命令
bool quit = false;
int command = waitCommand(pipefd[0], quit); //如果不发,就阻塞
if (quit)
break;
// 执行对应的命令
if (command >= 0 && command < handlerSize())
{
callbacks[command]();
}
else
{
cout << "非法command: " << command << endl;
}
}
exit(1);
}
// father,进行写入,关闭读端
close(pipefd[0]);
slots.push_back(pair<pid_t, int>(id, pipefd[1]));
}
// 父进程派发任务
srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机
while (true)
{
// 选择一个任务, 如果任务是从网络里面来的?
int command = rand() % handlerSize();
// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡
int choice = rand() % slots.size();
// 把任务给指定的进程
sendAndWakeup(slots[choice].first, slots[choice].second, command);
sleep(1);
}
// 关闭fd, 所有的子进程都会退出
for (const auto &slot : slots)
{
close(slot.second);
}
// 回收所有的子进程信息
for (const auto &slot : slots)
{
waitpid(slot.first, nullptr, 0);
}
}
PROCESS_NUM,它表示要创建的子进程的数量。waitCommand 函数:
command 变量中。quit 为 true,表示退出。sendAndWakeup 函数:
write 向管道写入命令,然后在标准输出上打印消息,表示主进程正在调用子进程执行命令。main 函数:
load() 函数被调用,其中包括回调函数的注册和初始化。vector 类型的容器 slots,用于存储子进程的 PID 和管道的写端文件描述符。for 循环,创建了 PROCESS_NUM 个子进程。每个子进程都有一个管道,一个用于读取命令,另一个用于写入命令。waitCommand 函数等待从父进程发送的命令,并执行相应的回调函数。如果命令是退出命令,子进程退出。sendAndWakeup 函数将任务发送给一个随机选择的子进程,以实现任务派发和负载均衡。main process: call process 11485 execute cal_sql: 调用数据库 through 6
sub process[11485 ] 调用数据库
main process: call process 11487 execute register_web: 注册网站 through 8
sub process[11487 ] 注册网站
main process: call process 11483 execute register_web: 注册网站 through 4
sub process[11483 ] 注册网站
main process: call process 11484 execute save_sql: 保存数据库 through 5
sub process[11484 ] 保存数据库
main process: call process 11483 execute cal_sql: 调用数据库 through 4
sub process[11483 ] 调用数据库
main process: call process 11485 execute login_web: 登陆网站 through 6
sub process[11485 ] 登录网站
main process: call process 11487 execute login_web: 登陆网站 through 8
sub process[11487 ] 登录网站
main process: call process 11487 execute register_web: 注册网站 through 8
sub process[11487 ] 注册网站
......
O_NONBLOCK 未启用(O_NONBLOCK disable),read 调用将阻塞,即进程会一直等待,直到管道中有数据可读取为止。O_NONBLOCK 启用(O_NONBLOCK enable),read 调用将立即返回-1,同时 errno 的值将被设置为 EAGAIN,表示没有数据可读。O_NONBLOCK 未启用(O_NONBLOCK disable),write 调用将阻塞,直到有进程从管道中读取数据,腾出足够的空间。O_NONBLOCK 启用(O_NONBLOCK enable),write 调用将立即返回-1,同时 errno 的值将被设置为 EAGAIN,表示管道已满。read 返回0:
read 调用将返回0,表示已达到文件结束(EOF)状态。这表明不会再有数据写入管道。write 操作可能会产生信号 SIGPIPE:
write 操作可能会产生 SIGPIPE 信号。这是因为写入数据没有接收方,因此内核会向写进程发送 SIGPIPE 信号,通常会导致写进程终止。这可以用来处理管道通信中的异常情况。PIPE_BUF 时,Linux 将保证写入是原子性的。这意味着写入操作要么完全成功,要么完全失败,不会出现部分写入的情况。 PIPE_BUF 的值可以通过 pathconf 或 _PC_PIPE_BUF 获取,通常情况下为4096字节。PIPE_BUF,Linux将不再保证写入的原子性,可能会出现部分写入的情况,这需要编程时进行适当的处理。fork 调用创建的父进程和子进程之间可以使用该管道进行通信。