• Python子进程管理与进程信息获取


    1. Python子进程模块subprocess

    subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。

    (1)run 方法

    首先我们来看看 run 方法的使用,该方法的参数如下:

    • args:表示要执行的命令。必须是一个字符串或字符串参数列表。
    • stdinstdoutstderr:子进程的标准输入、输出和错误。其值可以是 subprocess.PIPEsubprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 Nonesubprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None,表示什么都不做。另外,stderr 可以合并到 stdout 里一起输出。
    • timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并抛出 TimeoutExpired 异常。
    • check:如果该参数设置为 True,并且进程退出状态码不是 0,则抛出 CalledProcessError 异常。
    • encoding:如果指定了该参数,则 stdinstdoutstderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。
    • shell:如果该参数为 True,将通过操作系统的 Shell 执行指定的命令。
    • capture_output:如果 capture_output = True,则将捕获 stdoutstderr,调用时内部的 Popen 对象将自动使用 stdout = PIPEstderr = PIPE 创建标准输出和标准错误对象;传递 stdoutstderr 参数时不能同时传递 capture_output 参数。如果希望捕获并将两个 stream 合并为一个,使用 stdout = PIPEstderr = STDOUT

    下面我们来看一个例子,run 方法调用方式返回 CompletedProcess 实例:

    import subprocess
    
    args1 = ['python', 'src/Python子进程测试程序1.py']
    
    ret = subprocess.run(args=args1, capture_output=True, encoding='utf-8')  # 相当于在命令行执行:python src/Python子进程测试程序1.py
    print(ret)  # CompletedProcess(args=['python', 'src/Python子进程测试程序1.py'], returncode=0, stdout='子进程输出: Hello World!\n', stderr='')
    
    if ret.returncode == 0:
        print('Success, stdout:', ret.stdout)  # Success, stdout: 子进程输出: Hello World!
    else:
        print('Error, stderr:', ret.stderr)  # Error, stderr: python: can't open file 'D:\xxx\src\Python子进程测试程序1_Wrong.py': [Errno 2] No such file or directory
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    其中,Python子进程测试程序1.py 内容如下:

    print('子进程输出: Hello World!')
    
    • 1

    (2)Popen 方法

    Popensubprocess 的核心,子进程的创建和管理都靠它处理。

    Popen 方法的参数如下:

    • args:Shell 命令,可以是字符串或者序列类型(如:列表、元组)。
    • bufsize:缓冲区大小。当创建标准流的管道对象时使用,默认为 -10 表示不使用缓冲区,1 表示行缓冲,仅当 universal_newlines = True 时可用,也就是文本模式。正数表示缓冲区大小,负数表示使用系统默认的缓冲区大小。
    • stdinstdoutstderr:分别表示程序的标准输入、输出、错误句柄。
    • preexec_fn:只在 Unix 平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。
    • shell:如果该参数为 True,将通过操作系统的 Shell 执行指定的命令。
    • cwd:用于设置子进程的当前目录。
    • env:用于指定子进程的环境变量。如果 env = None,子进程的环境变量将从父进程中继承。

    该方法会创建一个 Popen 对象, Popen 对象有以下几种方法:

    • poll():检查进程是否终止,如果终止返回 returncode,否则返回 None
    • wait(timeout):等待子进程终止。
    • communicate(input=None, timeout=None):和子进程交互,向子进程发送和读取数据。将 input 指定数据发送到 stdin;从 stdoutstderr 读取数据,直到到达文件末尾,等待进程终止。所以,返回值是一个元组:(stdout_data, stderr_data)。如果 timeout 时间内子进程不结束,则会抛出 TimeoutExpired 异常。其中需要注意的是,捕获异常之后,可以再次调用该函数,因为子进程并没有被 KILL。因此,如果超时结束程序的话,需要现正确 KILL 子进程。
    • send_signal(singnal):发送信号到子进程。
    • terminate():停止子进程,也就是发送 SIGTERM 信号到子进程。
    • kill():杀死子进程,发送 SIGKILL 信号到子进程。

    Popen 方法的样例如下:

    args2 = ['python', 'src/Python子进程测试程序2.py']
    
    proc = subprocess.Popen(args=args2,
                            stdin=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            encoding='utf-8')
    print(proc)  # 
    
    stdout, stderr = proc.communicate(input='AsanoSaki')
    print('stdout:', stdout)  # stdout: 子进程输出:  AsanoSaki
    print('stderr:', stderr)  # stderr: 空
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    其中,Python子进程测试程序2.py 内容如下:

    s = input()
    print('子进程输出: ', s)
    
    • 1
    • 2

    现在我们来看一下 communicate 的用法,我们将测试程序修改为运行时间超过一秒:

    args2 = ['python', 'src/Python子进程测试程序2.py']
    
    proc = subprocess.Popen(args=args2,
                            stdin=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            encoding='utf-8')
    
    try:
        stdout, stderr = proc.communicate(input='AsanoSaki', timeout=1)  # 设置1s超时时间
    except subprocess.TimeoutExpired:
        print('子进程运行超时')
        proc.kill()  # 需要KILL子进程
        stdout, stderr = proc.communicate()  # 捕获异常之后,可以再次调用该函数
        print('stdout:', stdout)  # stdout: 子进程输出:  AsanoSaki
        print('stderr:', stderr)  # stderr: 空
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    现在的 Python子进程测试程序2.py 内容如下:

    s = input()
    print('子进程输出: ', s)
    
    for i in range(10**9):
        pass
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. ThreadPoolExecutor线程池

    concurrent.futures 模块是 Python3.2 中引入的新模块,用于支持异步执行,以及在多核 CPU 和网络 I/O 中进行高效的并发编程。线程池的基类是 concurrent.futures 模块中的 ExecutorExecutor 提供了两个子类,即 ThreadPoolExecutorProcessPoolExecutor ,简化了跨平台异步编程的实现。其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。如果使用线程池/进程池来管理并发编程,那么只要将相应的 Task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

    首先,让我们先来理解多进程和多线程两种并发编程的方式:

    • 多进程:当通过多进程来实现并发编程时,程序会将任务分配给多个进程,这些进程可以在不同的 CPU 上同时运行。进程之间是独立的,各自有自己的内存空间等,可以实现真正的并行执行。不过,进程之间的通信比较耗时,需要使用 IPC(进程间通信)机制,而且进程之间的切换比线程之间的切换耗时,所以创建进程的代价较高。
    • 多线程:当通过多线程来实现并发编程时,程序会将任务分配给多个线程,这些线程可以在同一个进程中的不同 CPU 核上同时运行。线程之间共享进程的内存空间,因此开销比较小。但是需要注意,在 Python 解释器中,线程是无法实现真正的并行执行,因为 Python 有 GIL(全局解释器锁),它确保同时只有一个线程运行 Python 代码。因此,一个 Python 进程中的多个线程并不能并行执行,在使用多线程编程时不能完全利用多核 CPU。

    ThreadPoolExecutor 创建一个线程池,任务可以提交到这个线程池中执行。ThreadPoolExecutorProcessPoolExecutor 更容易使用,且没有像进程那样的开销。它可以让我们在一个 Python 解释器中进行跨线程异步编程,因为它规避了 GIL。

    Exectuor 提供了如下常用方法:

    • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,**kwargs 代表以关键字参数的形式为 fn 函数传入参数。
    • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
    • shutdown(wait=True):关闭线程池。

    程序将 fn 函数 submit 给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

    Future 对象提供了如下方法:

    • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True
    • cancelled():返回 Future 代表的线程任务是否被成功取消。
    • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True
    • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True
    • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
    • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None
    • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

    在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

    使用线程池来执行线程任务的步骤如下:

    1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
    2. 定义一个普通函数作为线程任务。
    3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
    4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

    下面我们来看一个例子:

    from concurrent.futures import ThreadPoolExecutor
    
    def thread(num):
        print('Threads:', num)
    
    def getResult():  # 有返回结果的函数
        return 'Get Result'
    
    # 新建ThreadPoolExecutor对象并指定最大的线程数量
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 提交多个任务到线程池中
        executor.submit(thread, 1)  # Threads: 1
        executor.submit(thread, 2)  # Threads: 2
        t = executor.submit(getResult)
        print(t.result())  # Get Result
    
    # 或者按如下方式实现
    threadPool = ThreadPoolExecutor(max_workers=3)
    for i in range(3):
        threadPool.submit(thread, i)
    threadPool.shutdown(wait=True)
    # Threads: 0
    # Threads: 1
    # Threads: 2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3. 系统信息获取模块Psutil

    现在可能会有人在想那我们如何获取子进程/线程在运行时的时间开销或者内存占用等信息呢?Python 有一个第三方模块 psutil,专门用来获取操作系统以及硬件相关的信息,比如:CPU、磁盘、网络、内存等等。

    首先我们需要安装 psutil,直接通过 pip 命令安装即可:

    pip install psutil
    
    • 1

    (1)查看 CPU 相关信息:

    import psutil
    
    print(psutil.cpu_count())  # CPU的逻辑数量:12
    
    print(psutil.cpu_count(logical=False))  # CPU的物理核心数量:6
    
    print(psutil.cpu_times())  # CPU的用户/系统/空闲时间
    # scputimes(user=26860.4375, system=10963.515624999884, idle=676060.796875, interrupt=740.875, dpc=477.75)
    
    for _ in range(3):
        # interval表示每隔0.5s刷新一次,percpu表示查看所有的CPU使用率
        print(psutil.cpu_percent(interval=0.5, percpu=True))
    # [21.2, 0.0, 32.4, 0.0, 16.1, 0.0, 0.0, 0.0, 3.2, 0.0, 0.0, 0.0]
    # [25.8, 0.0, 3.1, 0.0, 0.0, 0.0, 3.1, 3.1, 0.0, 0.0, 0.0, 18.8]
    # [32.4, 0.0, 21.9, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 6.2, 0.0]
    
    print(psutil.cpu_stats())  # CPU的统计信息,包括上下文切换、中断、软中断以及系统调用次数等
    # scpustats(ctx_switches=3399680458, interrupts=1365489476, soft_interrupts=0, syscalls=2283205750)
    
    print(psutil.cpu_freq())  # CPU的频率
    # scpufreq(current=2592.0, min=0.0, max=2592.0)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    (2)查看内存及磁盘相关信息:

    print(psutil.virtual_memory())  # 内存使用情况,分别为总内存、可用内存、内存占用率、已使用的内存大小、剩余的内存大小
    # svmem(total=17022177280, available=7125008384, percent=58.1, used=9897168896, free=7125008384)
    
    print(psutil.swap_memory())  # 交换内存信息(专门用来临时存储数据)
    # sswap(total=6030352384, used=5409898496, free=620453888, percent=89.7, sin=0, sout=0)
    
    print(psutil.disk_partitions())  # 磁盘分区、磁盘使用率和磁盘IO信息
    # [sdiskpart(device='C:\\', mountpoint='C:\\', fstype='NTFS', opts='rw,fixed', maxfile=255, maxpath=260),
    #  sdiskpart(device='D:\\', mountpoint='D:\\', fstype='NTFS', opts='rw,fixed', maxfile=255, maxpath=260),
    #  sdiskpart(device='E:\\', mountpoint='E:\\', fstype='NTFS', opts='rw,fixed', maxfile=255, maxpath=260)]
    
    print(psutil.disk_usage("C:\\"))  # 某个磁盘使用情况
    # sdiskusage(total=160253673472, used=101791543296, free=58462130176, percent=63.5)
    
    print(psutil.disk_io_counters())  # 磁盘IO统计信息,分别为读次数、写次数、读的字节数、写的字节数、读时间、写时间
    # sdiskio(read_count=1833834, write_count=1831471, read_bytes=69098376704, write_bytes=59881958400, read_time=17952, write_time=2323)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    (3)查看网络相关信息:

    print(psutil.net_io_counters())  # 网卡的网络IO统计信息
    # snetio(bytes_sent=629698806, bytes_recv=1756588411, packets_sent=1280472, packets_recv=2023810, errin=0, errout=0, dropin=0, dropout=0)
    
    print(psutil.net_io_counters(pernic=True))  # 列出所有网卡的信息
    # {'Ethernet': snetio(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0, errin=0, errout=0, dropin=0, dropout=0),
    #  'Local Area Connection* 3': snetio(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0, errin=0, errout=0, dropin=0, dropout=0),
    #  'Local Area Connection* 4': snetio(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0, errin=0, errout=0, dropin=0, dropout=0),
    #  ......]
    
    print(psutil.net_if_addrs())  # 网络接口信息
    # {'Ethernet': [snicaddr(family=, address='04-D4-C4-74-A4-F0', netmask=None, broadcast=None, ptp=None), snicaddr(family=, address='169.254.216.112', netmask='255.255.0.0', broadcast=None, ptp=None)],
    #  'Local Area Connection* 3': [snicaddr(family=, address='38-00-25-26-9C-70', netmask=None, broadcast=None, ptp=None), snicaddr(family=, address='169.254.169.242', netmask='255.255.0.0', broadcast=None, ptp=None), snicaddr(family=, address='fe80::5e4e:63b6:7416:787b', netmask=None, broadcast=None, ptp=None)],
    #  ......]
    
    print(psutil.net_if_stats())  # 网卡的详细信息,包括是否启动、通信类型、传输速度、mtu
    # {'Ethernet': snicstats(isup=False, duplex=, speed=0, mtu=1500),
    #  'vEthernet (Default Switch)': snicstats(isup=True, duplex=, speed=4294, mtu=1500),
    #  'Loopback Pseudo-Interface 1': snicstats(isup=True, duplex=, speed=1073, mtu=1500),
    #  ......]
    
    print(psutil.net_connections())  # 当前机器的网络连接,里面接受一个参数,默认是"inet",当然我们也可以指定为其它,比如"tcp"
    # [sconn(fd=-1, family=, type=, laddr=addr(ip='127.0.0.1', port=1309), raddr=(), status='NONE', pid=7516),
    #  sconn(fd=-1, family=, type=, laddr=addr(ip='127.0.0.1', port=9100), raddr=(), status='LISTEN', pid=6004),
    #  sconn(fd=-1, family=, type=, laddr=addr(ip='::', port=49667), raddr=(), status='LISTEN', pid=3768),
    #  ......]
    
    print(psutil.users())  # 当前登录的用户信息
    # [suser(name='AsanoSaki', terminal=None, host=None, started=1694966965.3425539, pid=None)]
    
    import datetime
    print(psutil.boot_time())  # 系统的启动时间:1694912508.6818905
    print(datetime.datetime.fromtimestamp(psutil.boot_time()))  # 2023-09-17 09:01:48.681890
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    (4)查看进程相关信息:

    print(psutil.pids())  # 当前存在的所有进程的PID
    # [0, 4, 8, 140, 212, 584, 756, 1052, 1160, 1188, 1292, 1364, 1384, ...]
    
    print(psutil.pid_exists(0))  # 某个进程是否存在
    # True
    
    print(psutil.process_iter())  # 所有进程对象(Process)组成的迭代器
    # 
    
    print(psutil.Process(pid=10712))  # 根据PID获取一个进程对应的Process对象
    # psutil.Process(pid=10712, name='pycharm64.exe', status='running', started='08:56:02')
    
    p = psutil.Process(pid=10712)  # 获取该Process对象
    
    print(p.name())  # 进程名称,pycharm64.exe
    
    print(p.exe())  # 进程的exe路径:E:\PyCharm 2020.3.3\bin\pycharm64.exe
    
    print(p.cwd())  # 进程的工作目录:E:\PyCharm 2020.3.3\jbr\bin
    
    print(p.cmdline())  # 进程启动的命令行:['E:\\PyCharm 2020.3.3\\bin\\pycharm64.exe']
    
    print(p.status())  # 进程状态:running
    
    print(p.username())  # 进程用户名:LAPTOP-23NEHV3U\AsanoSaki
    
    print(p.create_time())  # 进程创建时间,返回时间戳:1694998562.3625667
    
    print(p.cpu_times())  # 进程使用的CPU时间
    # pcputimes(user=277.09375, system=34.265625, children_user=0.0, children_system=0.0)
    
    print(p.memory_info())  # 进程所使用的内存
    # pmem(rss=1507303424, vms=1790066688, num_page_faults=1466884, peak_wset=1536196608,
    #      wset=1507303424, peak_paged_pool=881616, paged_pool=876144, peak_nonpaged_pool=268096,
    #      nonpaged_pool=154024, pagefile=1790066688, peak_pagefile=1798070272, private=1790066688)
    
    print(p.num_threads())  # 进程内的线程数量,即这个进程开启了多少个线程:68
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    现在我们使用 psutil 模块实现获取 ThreadPoolExecutor 线程任务运行的时间与内存占用信息:

    args2 = ['python', 'src/Python子进程测试程序2.py']
    
    proc = subprocess.Popen(args=args2,
                            stdin=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            encoding='utf-8')
    
    def getProcessInfo(pid):
        p = psutil.Process(pid)  # 获取pid所代表的子进程
        start_time = time.time()
        memory = 0
        while(True):
            try:
                memory = max(memory, p.memory_info().rss)
            except:
                break
        runtime = (time.time() - start_time) * 1000
        return runtime, memory
    
    threadPool = ThreadPoolExecutor()
    task = threadPool.submit(getProcessInfo, proc.pid)
    stdout, stderr = proc.communicate(input='AsanoSaki')
    runtime, memory = task.result()
    print(runtime)  # 510.7400417327881
    print(memory)  # 40865792
    
    threadPool.shutdown(wait=True)
    proc.kill()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    其中,Python子进程测试程序2.py 内容如下:

    s = input()
    print('子进程输出: ', s)
    
    • 1
    • 2
  • 相关阅读:
    【Spring Boot】SpringBoot 2.6.6 集成 SpringDoc 1.6.9 生成swagger接口文档
    AYIT嵌入式实验室2023级C语言训练1-4章训练题
    【EI会议征稿通知】第四届电网系统与绿色能源国际学术会议(PGSGE 2024)
    重装系统后没声音如何解决
    C#详解:程序域、程序集、模块、Type、反射
    珂学送分
    光遇三周年福利【源码+教程】Java游戏开发_Java贪吃蛇小游戏_Java项目实战_Java课程设计_Java课设项目_Java初级项目_Java练手项目
    【Python笔记-设计模式】原型模式
    关于软件物料清单(SBOM),你所需要了解的一切
    python资源库
  • 原文地址:https://blog.csdn.net/m0_51755720/article/details/132948466