concurrent.futures 模块提供异步执行可调用对象高层接口
而异步执行可以由 ThreadPoolExecutor 线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口。
一、ThreadPoolExecutor 线程池
线程池执行器:ThreadPoolExecutor类
- 快速导入:from concurrent.futures import ThreadPoolExecutor
如何使用:
- 创建一个线程池对象:ThreadPoolExecutor(max_workers=5)
参数:max_workers 表示最大可同时执行的线程数
- submit(任务函数)方法: 去提交任务执行
- shutdown()方法: 等待线程尺池中所有的任务执行完毕之后,在往下执行
- map(任务函数,参数)方法: 用于批量任务提交,前面参数是:任务函数 后面是任务函数参数
线程池的优点:
1.使用线程池可以避免频繁的去创建线程,销毁线程,可以减少内存的使用
ThreadPoolExecutor类基本语法使用:
# 创建一个队列对象
q = Queue()
# 生产数据
def work():
for i in range(5):
for a in range(20):
print(f"这是第{i}轮数据插入,插入的数据为:{a}")
q.put(a)
time.sleep(1)
# 创建一个线程池对象
# 参数:max_workers 表示最大可同时执行的线程数
tpool = ThreadPoolExecutor(max_workers=5)
# 1.submit(函数)方法:去提交任务
tpool.submit(work)
# 2.shutdown()方法:等待线程尺池中所有的任务执行完毕之后,在往下执行
tpool.shutdown()
print("---------------------- 结束执行 -----------------------")
ThreadPoolExecutor类上下文管理协议的语法使用:
上下文管理协议的语法:
with ThreadPoolExecutor(max_workers=5) as tpool:
tpool.submit(提交的任务函数)
使用上下文管理器后,无需使用shutdown()方法,在使用 with 语句时,会调用时将 wait 设为 True ,会一样等待
q = Queue()
# 生产数据
def work():
for i in range(5):
for a in range(20):
print(f"这是第{i}轮数据插入,,插入的数据为:{a}")
q.put(a)
time.sleep(1)
with ThreadPoolExecutor(max_workers=5) as tpool:
tpool.submit(work)
map方法(任务函数,参数): 用于批量任务提交
- map()方法去提交单个参数
def work(name):
for i in range(3):
print("========= 这是{}执行的第{}轮测试=========".format(name, i))
time.sleep(1)
with ThreadPoolExecutor(max_workers=5) as tpool:
# 1.map方法
tpool.map(work,[1,2,3,4,5])
# 与上面的map方法一样的效果
# 2.submit方法
for i in range(5):
tpool.submit(work)
print("---------------------- 结束执行 -----------------------")
- map()方法去提交多个参数
def work(name,age):
for i in range(3):
print("========= 这是{}执行的第{}轮测试,只有{}个=========".format(name,i,age))
time.sleep(1)
with ThreadPoolExecutor(max_workers=5) as tpool:
# 1.submit方法
tpool.submit(work,'henry',18)
# 2.map方法
tpool.map(work, ['henry', 'henry1'], [17, 18]) # 以数组的方式
print("---------------------- 结束执行 -----------------------")
二、ProcessPoolExecutor 进程池
进程池执行器:ProcessPoolExecutor类
- 快速导入:from concurrent.futures import ProcessPoolExecutor
如何使用:
- 创建一个进程池对象:创建一个进程池对象: ProcessPoolExecutor(max_workers=5)
参数:max_workers 表示最大可同时执行的进程数
- submit(任务函数)方法: 去提交任务执行
- shutdown()方法: 等待进程池中所有的任务执行完毕之后,在往下执行
- map(任务函数,参数)方法: 用于批量任务提交,前面参数是:任务函数 后面是任务函数参数
ProcessPoolExecutor类基本语法使用:
q = Queue()
def work():
for i in range(5):
for a in range(20):
print(f"这是第{i}轮数据插入,,插入的数据为:{a}")
q.put(a)
time.sleep(1) #
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=5) as tpool:
tpool.submit(work)
ProcessPoolExecutor类上下文管理协议的语法使用:
上下文管理协议的语法:
with ThreadPoolExecutor(max_workers=5) as tpool:
tpool.submit(提交的任务函数)
使用上下文管理器后,无需使用shutdown()方法,在使用 with 语句时,会调用时将 wait 设为 True ,会一样等待
q = Queue()
# 生产数据
def work():
for i in range(5):
for a in range(20):
print(f"这是第{i}轮数据插入,,插入的数据为:{a}")
q.put(a)
time.sleep(1)
with ProcessPoolExecutor(max_workers=5) as tpool:
tpool.submit(work)
进程池之间的数据通信问题:
- 同一个进程中多个线程之间使用: queue.Queue
import queue
q1 = queue.Queue()
- 多个进程之间数据通信的队列:multiprocessing.Queue
from multiprocessing import Queue
q1 = Queue()
- 进程池之间数据通信:multiprocessing.Manager().Queue
from multiprocessing import Manager
q2 = Manager().Queue
注意事项:
- 如果要使用Pool创建进程,就需要使用 multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()
否则会得到一条如下的错误信息:RuntimeError: Queue objects should only be shared between processes through inheritance.
实例:
def work1(q):
for i in range(10):
q.put(i)
def work2(q):
for i in range(10):
print(q.get())
if __name__ == '__main__':
q2 = Manager().Queue()
with ProcessPoolExecutor(max_workers=2) as pool:
pool.submit(work1, q2)
pool.submit(work2, q2)