• Python进程间的通信之管道通信:os.pipe


    前言

      进程(process)是系统进行资源分配和调度的基本单位,每个进程都有自己的地址(内存)空间(由CPU分配),处于安全的考虑,不同进程之间的内存空间是相互隔离的,也就是说 进程A 是不能直接访问 进程B 的内存空间。但某些场景下,不同进程间需要相互通信,该怎么办呢?即然进程间不能直接通信,那就借助第三方来完成通信,这个第三方就是系统的内核。
      内核提供了多种方式来完成进程间的通信,比如管道、消息队列、共享内存、信号量等。本文主要介绍 管道(pipe) 的原理及os.pipe()实现。

      ps: 本文中的代码需要在linux系统中运行。

    1. 模拟管道通信

      管道本质上就是内核中的一个缓存,当进程创建一个管道后,系统会返回两个文件描述符,可以通过这两个描述符往管道写入数据或者读取数据。管道是一种单向通信,即管道的一端只用于读数据,另一端只用于写数据,只有写完数据之后另一个进程才能去读。
      模拟一下管道的读数据和写数据:

    import os
    
    input_data = 'hello world!'.encode('utf-8')
    
    if __name__ == '__main__':
        r, w = os.pipe()	# 创建管道
    
        os.write(w, input_data)		# 向管道写入数据
        # os.close(w)
    
        # 如果没有数据会进入等待状态(阻塞)
        output_data = os.read(r, len(input_data))	# 从管道中读数据
        print(output_data.decode('utf-8'))
        # os.close(r)
    # hello world!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

      os.pipe() 创建一个管道,返回一对分别用于读取和写入的文件描述符(r, w),新的文件描述符是 不可继承 的。
      os.read(fd, n) 从文件描述符 fd 中读取至多 n 个字节,返回所读取字节的字节串(bytestring),如果到达了 fd 指向的文件末尾,则返回空字节对象。
      os.write(fd, str)str 中的字节串(bytestring)写入文件描述符 fd,返回实际写入的字节数。

      os.read(fd, n)os.write(fd, str) 适用于低级 I/O 操作,必须用于 os.open()pipe() 返回的文件描述符。

      除了上述对管道进行读写外,os库还提供了fdopen()函数来创建文件对象,再使用对象来进行相关操作。

    import os
    import struct
    
    input_data = 'hello world!'.encode('utf-8')
    
    if __name__ == '__main__':
        r, w = os.pipe()
    
        writer = os.fdopen(w, 'wb')
        writer.write(input_data)	# 写入数据
        writer.flush()  # 刷新(写入数据之后必须手动刷新)
        # writer.close()
    
        reader = os.fdopen(r, 'rb')
        output_data = reader.read(len(input_data))		# 读取数据
        print(output_data.decode('utf-8'))
        # reader.close()
    # hello world!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

      os.fdopen(fd, *args, **kwargs) 返回打开文件描述符 fd 对应文件的对象,类似 open() 函数,二者接受同样的参数,不同之处在于 fdopen() 第一个参数是整数(文件描述符是整数类型的)。

    2. 实现进程间的单向通信

      进程间的管道通信过程大致如下:
      (1) 父进程使用 pipe函数 通过系统调用创建一个管道;
      (2) 父进程使用 fork 函数 通过系统调用创建两个子进程;
      (3) 两个子进程可以通过管道进行通信。

    在这里插入图片描述

      这里简化生产者与消费者的例子来模拟进程间的单向通信,main 代码实现如下:

    # main.py
    import os
    import sys
    import subprocess
    
    
    if __name__ == '__main__':
        r, w = os.pipe()
    
        cmd1 = [sys.executable, "-m", "consumer", str(r)]
        cmd2 = [sys.executable, "-m", "producer", str(w)]
    
        proc1 = subprocess.Popen(cmd1, pass_fds=(r, ))     # 在一个新的进程中执行子程序
        proc2 = subprocess.Popen(cmd2, pass_fds=(w, ))
        
        print('parent process pid: ', os.getpid())
        print('child process pid(proc1): ', proc1.pid)
        print('child process pid(proc2): ', proc2.pid)
    
        proc1.wait()   # 等待子进程被终止
        proc2.wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

      producer 代码实现如下:

    """负责写数据"""
    import os
    import sys
    import struct
    
    writer = os.fdopen(int(sys.argv[1]), "wb")
    
    input_data = 'hello world!'.encode('utf-8')
    
    writer.write(struct.pack(", len(input_data)))  # 小端模式, 低地址存储 b'\x05\x00\x00\x00'
    writer.write(input_data)
    writer.flush()  # 刷新(写入数据之后必须手动刷新)
    
    print('input data: ', input_data.decode('utf-8'))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

      consumer 代码实现如下:

    """负责读数据"""
    import os
    import sys
    import struct
    
    
    reader = os.fdopen(int(sys.argv[1]), "rb")
    
    len_data = reader.read(4)  # int占用4个字节
    recv_bytes = struct.unpack(", len_data)[0]
    output_data = reader.read(recv_bytes)
    
    print('output data: ', output_data.decode('utf-8'))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

      运行结果如下:

    在这里插入图片描述

    3. 实现进程间的双向通信

      如果使用os.pipe()函数实现双向通信,则需要创建两个管道。

    在这里插入图片描述

      main 代码实现如下:

    # main2.py
    import os
    import sys
    import subprocess
    
    if __name__ == '__main__':
        # connect subprocess with a pair of pipes
        worker1_read, worker2_write = os.pipe()
        worker2_read, worker1_write = os.pipe()
    
        cmd1 = [sys.executable, "-m", "client", str(worker1_read), str(worker1_write)]
        cmd2 = [sys.executable, "-m", "server", str(worker2_read), str(worker2_write)]
    
        proc1 = subprocess.Popen(cmd1, pass_fds=(worker1_read, worker1_write))  # 在一个新的进程中执行子程序
        proc2 = subprocess.Popen(cmd2, pass_fds=(worker2_read, worker2_write))
    
        print('parent process pid: ', os.getpid())
        print('child process pid(proc1): ', proc1.pid)
        print('child process pid(proc2): ', proc2.pid)
    
        proc1.wait()  # 等待子进程被终止
        proc2.wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

      client 代码实现如下:

    import os
    import sys
    import struct
    
    reader = os.fdopen(int(sys.argv[1]), "rb")
    writer = os.fdopen(int(sys.argv[2]), "wb")
    
    pid = os.getpid()
    send_info = '[{}]hello server!'.format(pid).encode('utf-8')
    
    print('pid[{}] send info: {}'.format(pid, send_info.decode()))
    writer.write(struct.pack(", len(send_info)))  # 小端模式, 低地址存储 b'\x05\x00\x00\x00'
    writer.write(send_info)
    writer.flush()  # 刷新(写入数据之后必须手动刷新)
    
    len_data = reader.read(4)  # int占用4个字节
    recv_bytes = struct.unpack(", len_data)[0]
    recv_data = reader.read(recv_bytes)
    print('pid[{}] recv info: {}'.format(pid, recv_data.decode()))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

      server 代码实现如下:

    import os
    import sys
    import struct
    
    reader = os.fdopen(int(sys.argv[1]), "rb")
    writer = os.fdopen(int(sys.argv[2]), "wb")
    
    pid = os.getpid()
    send_info = '[{}]hello client!'.format(pid).encode('utf-8')
    
    len_data = reader.read(4)  # int占用4个字节
    recv_bytes = struct.unpack(", len_data)[0]
    recv_data = reader.read(recv_bytes)
    print('pid[{}] recv info: {}'.format(pid, recv_data.decode()))
    
    print('pid[{}] send info: {}'.format(pid, send_info.decode()))
    writer.write(struct.pack(", len(send_info)))  # 小端模式, 低地址存储 b'\x05\x00\x00\x00'
    writer.write(send_info)
    writer.flush()  # 刷新(写入数据之后必须手动刷新)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

      运行结果如下:

    在这里插入图片描述

    结束语

      os.open()函数与open()函数的区别:
        os.open()是通过os库调用操作系统来打开文件,而open()函数是 python 自带的函数,它是通过 python 程序来打开文件的,可以理解为是对低阶os.open()的封装。

  • 相关阅读:
    【论文 01】《Attention is all you need》
    SpringBoot日志链路追踪实现
    64线LiDAR上速度可达120Hz!一种基于图像表示的快速精确的LiDAR地面分割算法
    Vue3配置router路由步骤
    「程序员必须掌握的算法」动态规划「上篇」
    springcloud11:Hystrix服务降级(断路器)
    实操新项目丨手把手带你开发疫情防控系统
    EBS JVM 内存优化攻略
    使用新版Visual Studio编译老项目部分报错处理
    ※基本数据类型的包装类、String类、String增强类StringBuilder和StringBuffer、BigDecimal类、Math类
  • 原文地址:https://blog.csdn.net/qq_42730750/article/details/127729762