• python Process and ProcessPools


    每个Python程序都是一个进程,每个进程有一个默认线程称之为主线程,该线程用于执行程序指令,事实上,每个进程都是Pthon解释器的一个实例对象,它执行Python指令。前面介绍了python Threads and ThreadPools ,这里继续介绍进程相关内容。Python通过multiprocessing.Process提供真正系统级别的进程。操作系统是如何控制新建一个进程的呢?都是由Python解释器来管理的。新建的子进程可以并行执行,也可能不能并行执行,即使是创建的多线程支持并发执行的,导致上述问题的原因比较多,比如底层硬件不支持并行执行,这就是并行执行和并发执行的区别:

    • 并行执行Concurrent:Code that can be executed out of order
    • 并发执行Parallel:Capability to execute code simultaneously

    为什么需要多进程呢,比如以下场景:

    • 同时creating和starting流程
    • 同时run和start流程
    • 同时blocked和terminated流程

    python中的进程历经三个阶段,创建新进程运行进程终止进程。在程序运行时,进程可能处于正在执行代码,或者被阻塞blocked,或者等待其他进程/外部资源等等场景。
    如何在进程中运行一个函数呢?

    1.多进程无参函数并发请求

    ...
    # create a process
    process = multiprocessing.Process(target=task)
    
    • 1
    • 2
    • 3

    如果task有参数,可以传递参数

    process = multiprocessing.Process(target=task, args=(arg1, arg2))
    
    • 1

    创建进程之后调用启动进程的函数start(),调用该函数立即返回,一旦启动,操作系统将在单独的进程中执行该task函数

    ...
    # run the new process
    process.start()
    
    • 1
    • 2
    • 3

    使用join()函数显示的等待新建的集成执行完成

    # wait for the process to finish
    print('Waiting for the process...')
    process.join()
    
    • 1
    • 2
    • 3

    下面我们给出具体的例子,代码如下所示:

    from time import sleep
    from multiprocessing import Process
    
    
    # a custom function that blocks for a moment
    def task():
        # block for a moment
        sleep(1)
        # display a message
        print('This is from another process')
    
    
    # entry point
    if __name__ == '__main__':
        # create a process
        process = Process(target=task)
        # run the process
        process.start()
        # wait for the process to finish
        print('Waiting for the process...')
        process.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    输出结果如下所示:

    Waiting for the process...
    This is from another process
    
    • 1
    • 2

    2.多进程有参函数并发请求

    目标函数如下所示:

    def task(sleep_time, message):
        # block for a moment
        sleep(sleep_time)
        # display a message
        print(message)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    调用multiprocessing.Process 构造函数指定task函数和args参数

    # create a process
    process = Process(target=task, args=(1.5, 'New message from another process'))
    
    • 1
    • 2

    这里给出完整的实例,代码如下所示:

    from time import sleep
    from multiprocessing import Process
     
    # a custom function that blocks for a moment
    def task(sleep_time, message):
        # block for a moment
        sleep(sleep_time)
        # display a message
        print(message)
     
    # entry point
    if __name__ == '__main__':
        # create a process
        process = Process(target=task, args=(1.5, 'New message from another process'))
        # run the process
        process.start()
        # wait for the process to finish
        print('Waiting for the process...')
        process.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    输出结果如下所示:

    Waiting for the process...
    New message from another process
    
    • 1
    • 2

    3.ProcessPoolExecutor进程池

    ProcessPoolExecutor也是Executor下的一个子类,原理和使用方式几乎和ThreadPoolExecutor一样。可以参考concurrent.futures,进程池用于自动管理进程的一个模块,通过配置max_worker设置固定的进程数。[ProcessPoolExecutor](https://superfastpython.com/processpoolexecutor-in-python/)类通过concurrent.futures 模块在python3.2开始提供的,参考线程池的介绍很容易写出如下的代码

    import time
    from concurrent.futures import ProcessPoolExecutor
    
    def task(sleep_sec=10, tag='test'):
        print('[%s] start sleep' % tag)
        time.sleep(sleep_sec)
        print('[%s] finish sleep' % tag)
        return 100
    
    
    def main():
        process_pool = ProcessPoolExecutor(max_workers=3)
        future = process_pool.submit(task, 3, tag='TEST')
        ret = future.result()
        print('result is %s' % str(ret))
        process_pool.shutdown()
    
    
    if __name__ == '__main__':
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    ProcessPoolExecutor扩展了Executor类,在调用时返回Future对象,Executor类定义了3个基本方法用于进程池的管理

    • submit():分配要执行的函数并返回Future兑现

    • map():将函数应用于到可迭代的元素

    • shutdown():关闭进程池
      使用ProcessPoolExecutor创建进程的生命步骤如下:

    • Create:使用ProcessPoolExecutor构造函数创建进程池

    • Submit:将task函数提交到submit函数并返回futures对象,也可以使用map()

    • Wait:等待并得到执行结果,该步骤是可选的

    • Shutdown:调用shutdown()关闭进程池

    3.1设置进程数

    使用ProcessPoolExecutor初始化实例时,必须使用池中固定数量的进程,出丝滑每个进程时指定要调用的函数以及函数的参数信息。该池的进程由每一个CPU来创建。故而进程数等于CPU数量

    • Default Total Processes = (Total CPUs)
      创建进程数时,如果有4个CPU,每个CPU都有超线程(大多数现代CPU都有),那么Python将看到8个CPU,并且默认情况下将为池分配6个进程。
      对于一些计算密集型任务,可以通过将进程数设置等于物理CPU核数而不是逻辑CPU数来实现最佳性能。
    executor = ProcessPoolExecutor()
    # 手动设置max_workers
    # executor = ProcessPoolExecutor(max_workers=4)
    
    • 1
    • 2
    • 3

    3.2 map()提交任务到进程池

    一旦创建了进程池,就可以提交异步任务到池中,提交task任务给Executor对象有两种方式,map()和submit()

    results = executor.map(my_task, my_items) 
    
    • 1

    创建之后,任务会在池中进行排队,在进程池中有进程可用的时候就被调度。map()函数将立即返回一个可迭代对象,该迭代对象用于访问目标任务函数task的结果

    for result in executor.map(my_task, my_items):
    	print(result)
    
    • 1
    • 2

    也可以设置超时时间timeout.

    for result in executor.map(my_task, my_items, timeout=5):
    	# wait for task to complete or timeout expires
    	print(result)
    
    • 1
    • 2
    • 3

    3.3 submit()提交任务到进程池

    submit()函数将一个任务提交到进程池,该函数获取要被调用的函数task和所有参数,然后立即返回一个Future对象

    future = executor.submit(my_task, arg1, arg2) 
    
    • 1

    可以通过Future对象的result()函数获取任务的结果,该调用时阻塞的,调用时会等待任务完成。

    ...
    # get the result from a future
    result = future.result() # blocks
    
    • 1
    • 2
    • 3

    当调用result()可以设置timeout参数来确定限定每个任务完成的时间,单位是s秒,超时之后引发timeout报错

    # wait for task to complete or timeout expires
    result = future.result(timeout=5) # blocks
    
    • 1
    • 2

    3.4 wait for Tasks to complete(optional)

    concurrent.futures 模块通过Future对象提供了两种方法等待目标任务被调度完成。wait是可选的。 只有调用submit()将任务推送到进程池是,才会创建Future对象。在调用map()或者submit()后直接等待结果,或者等待进程池中的所有任务完成。这两个模块函数wait()用于等待Future对象完成,as_completed()用于在任务完成时获取Future对象

    • wait(): 等待一个或者多个Future对象完成
    • as_completed(): 在所有任务完成时从collection中返回Future对象

    可以将两个函数与一个或者多个进程池创建的Future对象一起使用。

    futures = [executor.submit(my_task, my_data) for my_data in my_datalist]
    
    • 1

    对比TreadPoolExecutor,可以设置return_when参数

    # wait until we get the first result
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    
    • 1
    • 2
    done, not_done = wait(futures, return_when=ALL_COMPLETED)
    
    • 1
    done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
    
    • 1

    并发执行的好处在于可以在任务可用时获取结果,而不用等待所有任务都完成了再去获取任务的结果。as_completed() 函数将在任务在进程池中完成时返回任务的Future对象。我们可以调用该函数,并为其提供一个通过调用submit()创建的Future对象列表,当Future对象以任何顺序完成时,它都会返回Future对象. 通常在调用submit()时创建的Futures列表上循环使用as_completed()函数

    ...
    # iterate over all submitted tasks and get results as they are available
    for future in as_completed(futures):
    	# get the result for the next completed task
    	result = future.result() # blocks
    
    • 1
    • 2
    • 3
    • 4
    • 5

    📢注意:

    map()和submit()的结果不同。map()在对象返回迭代器而不是在Futures,其次map()是按照任务提交的顺序返回结果,而不是按照任务完成的顺序返回结果
    
    • 1

    3.5 关闭进程池

    当所有任务都完成后,需要关闭进程池,可以调用shutdown()函数。

    ...
    # shutdown the process pool
    executor.shutdown() # blocks
    
    • 1
    • 2
    • 3

    默认情况下等待所有任务完成后关闭进程池即wait参数为Ture,所以这里也说为什么是阻塞block的,当然也可以设置wait=False

    ...
    # shutdown the process pool
    executor.shutdown(wait=False) # does not blocks
    
    • 1
    • 2
    • 3

    3.6 上下文管理器

    ProcessPoolExecutor与上下文管理器一起使用“with”关键字创建一个模块,在该模块中可以使用进程池执行任务并获得结果。任务完成之后,进程池自动关闭,在with管理器内部,上下文管理器使用默认参数调用shutdown()函数,等待所有排队等待执行和正在执行的任务完成,然后返回并继续下文。

    ...
    # create a process pool
    with ProcessPoolExecutor(max_workers=10) as pool:
    	# submit tasks and get results
    	# ...
    	# automatically shutdown the process pool...
    # the pool is shutdown at this point
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    软件运维常见面试题
    python-基本数据类型-笔记
    电脑更新win10黑屏解决方法
    【C++ 提高编程】- 泛型编程之模板(类型参数化)
    简化的 Java 六边形架构
    【Qt-19】按Q退出应用程序
    【uniapp】确认弹出框,选择确定和取消
    力扣197. 上升的温度
    HTML躬行记(2)——WebRTC基础实践
    增加软件投入的重要性:提升自动化程度与用户界面设计的价值
  • 原文地址:https://blog.csdn.net/rhx_qiuzhi/article/details/128169942