每个Python程序都是一个进程,每个进程有一个默认线程称之为主线程,该线程用于执行程序指令,事实上,每个进程都是Pthon解释器的一个实例对象,它执行Python指令。前面介绍了python Threads and ThreadPools ,这里继续介绍进程相关内容。Python通过multiprocessing.Process
提供真正系统级别的进程。操作系统是如何控制新建一个进程的呢?都是由Python解释器来管理的。新建的子进程可以并行执行,也可能不能并行执行,即使是创建的多线程支持并发执行的,导致上述问题的原因比较多,比如底层硬件不支持并行执行,这就是并行执行和并发执行的区别:
为什么需要多进程呢,比如以下场景:
python中的进程历经三个阶段,创建新进程
,运行进程
,终止进程
。在程序运行时,进程可能处于正在执行代码,或者被阻塞blocked,或者等待其他进程/外部资源等等场景。
如何在进程中运行一个函数呢?
...
# create a process
process = multiprocessing.Process(target=task)
如果task有参数,可以传递参数
process = multiprocessing.Process(target=task, args=(arg1, arg2))
创建进程之后调用启动进程的函数start(),调用该函数立即返回,一旦启动,操作系统将在单独的进程中执行该task函数
...
# run the new process
process.start()
使用join()
函数显示的等待新建的集成执行完成
# wait for the process to finish
print('Waiting for the process...')
process.join()
下面我们给出具体的例子,代码如下所示:
from time import sleep
from multiprocessing import Process
# a custom function that blocks for a moment
def task():
# block for a moment
sleep(1)
# display a message
print('This is from another process')
# entry point
if __name__ == '__main__':
# create a process
process = Process(target=task)
# run the process
process.start()
# wait for the process to finish
print('Waiting for the process...')
process.join()
输出结果如下所示:
Waiting for the process...
This is from another process
目标函数如下所示:
def task(sleep_time, message):
# block for a moment
sleep(sleep_time)
# display a message
print(message)
调用multiprocessing.Process
构造函数指定task函数和args参数
# create a process
process = Process(target=task, args=(1.5, 'New message from another process'))
这里给出完整的实例,代码如下所示:
from time import sleep
from multiprocessing import Process
# a custom function that blocks for a moment
def task(sleep_time, message):
# block for a moment
sleep(sleep_time)
# display a message
print(message)
# entry point
if __name__ == '__main__':
# create a process
process = Process(target=task, args=(1.5, 'New message from another process'))
# run the process
process.start()
# wait for the process to finish
print('Waiting for the process...')
process.join()
输出结果如下所示:
Waiting for the process...
New message from another process
ProcessPoolExecutor也是Executor下的一个子类,原理和使用方式几乎和ThreadPoolExecutor一样。可以参考concurrent.futures,进程池用于自动管理进程的一个模块,通过配置max_worker设置固定的进程数。[ProcessPoolExecutor](https://superfastpython.com/processpoolexecutor-in-python/)
类通过concurrent.futures
模块在python3.2开始提供的,参考线程池的介绍很容易写出如下的代码
import time
from concurrent.futures import ProcessPoolExecutor
def task(sleep_sec=10, tag='test'):
print('[%s] start sleep' % tag)
time.sleep(sleep_sec)
print('[%s] finish sleep' % tag)
return 100
def main():
process_pool = ProcessPoolExecutor(max_workers=3)
future = process_pool.submit(task, 3, tag='TEST')
ret = future.result()
print('result is %s' % str(ret))
process_pool.shutdown()
if __name__ == '__main__':
main()
ProcessPoolExecutor
扩展了Executor类,在调用时返回Future对象,Executor类定义了3个基本方法用于进程池的管理
submit():分配要执行的函数并返回Future兑现
map():将函数应用于到可迭代的元素
shutdown():关闭进程池
使用ProcessPoolExecutor
创建进程的生命步骤如下:
Create
:使用ProcessPoolExecutor构造函数创建进程池
Submit
:将task函数提交到submit函数并返回futures对象,也可以使用map()
Wait
:等待并得到执行结果,该步骤是可选的
Shutdown
:调用shutdown()关闭进程池
使用ProcessPoolExecutor
初始化实例时,必须使用池中固定数量的进程,出丝滑每个进程时指定要调用的函数以及函数的参数信息。该池的进程由每一个CPU来创建。故而进程数等于CPU数量
executor = ProcessPoolExecutor()
# 手动设置max_workers
# executor = ProcessPoolExecutor(max_workers=4)
一旦创建了进程池,就可以提交异步任务到池中,提交task任务给Executor对象有两种方式,map()和submit()
results = executor.map(my_task, my_items)
创建之后,任务会在池中进行排队,在进程池中有进程可用的时候就被调度。map()函数将立即返回一个可迭代对象,该迭代对象用于访问目标任务函数task的结果
for result in executor.map(my_task, my_items):
print(result)
也可以设置超时时间timeout.
for result in executor.map(my_task, my_items, timeout=5):
# wait for task to complete or timeout expires
print(result)
submit()
函数将一个任务提交到进程池,该函数获取要被调用的函数task和所有参数,然后立即返回一个Future对象
future = executor.submit(my_task, arg1, arg2)
可以通过Future对象的result()
函数获取任务的结果,该调用时阻塞的,调用时会等待任务完成。
...
# get the result from a future
result = future.result() # blocks
当调用result()可以设置timeout参数来确定限定每个任务完成的时间,单位是s秒,超时之后引发timeout报错
# wait for task to complete or timeout expires
result = future.result(timeout=5) # blocks
concurrent.futures
模块通过Future对象提供了两种方法等待目标任务被调度完成。wait
是可选的。 只有调用submit()将任务推送到进程池是,才会创建Future对象。在调用map()或者submit()后直接等待结果,或者等待进程池中的所有任务完成。这两个模块函数wait()
用于等待Future对象完成,as_completed()
用于在任务完成时获取Future对象
可以将两个函数与一个或者多个进程池创建的Future对象一起使用。
futures = [executor.submit(my_task, my_data) for my_data in my_datalist]
对比TreadPoolExecutor,可以设置return_when参数
# wait until we get the first result
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
done, not_done = wait(futures, return_when=ALL_COMPLETED)
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
并发执行的好处在于可以在任务可用时获取结果,而不用等待所有任务都完成了再去获取任务的结果。as_completed() 函数将在任务在进程池中完成时返回任务的Future对象。我们可以调用该函数,并为其提供一个通过调用submit()创建的Future对象列表,当Future对象以任何顺序完成时,它都会返回Future对象. 通常在调用submit()时创建的Futures列表上循环使用as_completed()函数
...
# iterate over all submitted tasks and get results as they are available
for future in as_completed(futures):
# get the result for the next completed task
result = future.result() # blocks
📢注意:
map()和submit()的结果不同。map()在对象返回迭代器而不是在Futures,其次map()是按照任务提交的顺序返回结果,而不是按照任务完成的顺序返回结果
当所有任务都完成后,需要关闭进程池,可以调用shutdown()
函数。
...
# shutdown the process pool
executor.shutdown() # blocks
默认情况下等待所有任务完成后关闭进程池即wait
参数为Ture,所以这里也说为什么是阻塞block的,当然也可以设置wait=False
...
# shutdown the process pool
executor.shutdown(wait=False) # does not blocks
将ProcessPoolExecutor
与上下文管理器一起使用“with
”关键字创建一个模块,在该模块中可以使用进程池执行任务并获得结果。任务完成之后,进程池自动关闭,在with管理器内部,上下文管理器使用默认参数调用shutdown()
函数,等待所有排队等待执行和正在执行的任务完成,然后返回并继续下文。
...
# create a process pool
with ProcessPoolExecutor(max_workers=10) as pool:
# submit tasks and get results
# ...
# automatically shutdown the process pool...
# the pool is shutdown at this point