CSDN话题挑战赛第2期
参赛话题:学习笔记
ProcessPoolExecutor:进程池
ThreadPoolExecutor:线程池
t1 = ThreadPoolExecutor(max_workers=5):创建一个线程池,线程池中最多支持同时执行多少个任务
t1.submit(函数名称):往线程池中提交执行的任务
t1.shutdown():等待线程池中所有的任务执行完毕之后,开始执行
target:指定线程执行的函数
args:给指定的任务函数传递参数(元组)
kwargs:给指定的任务函数传递的参数(字典)
name:给线程起名字
线程对象的方法
start()
join()
run()方法:线程最终执行任务函数是在线程的run方法中
import time import threading def work(name): for i in range(6): print(f"线程{i}工作") time.sleep(1) def work1(name): for i in range(5): print(f"线程{i}学习") time.sleep(1) if __name__ == '__main__': t1=threading.Thread(target=work,args=('kobe',),daemon=True) t2=threading.Thread(target=work1,args=('jimi',),daemon=True) t1.start() t2.start() t1.join() t2.join() print('主线程执行完毕')
'运行
run()方法:线程最终执行任务函数是在线程的run方法中
import time import threading class MyThread(threading.Thread): def run(self): for i in range(6): print(f"线程{i}工作") time.sleep(1) if __name__ == '__main__': t1=MyThread() t1.start() t1.join() print('主线程执行完毕')
'运行
import threading num=0 def work(): global num num+=10 print('work',num) def work1(): global num num+=100 print('work1',num) if __name__ == '__main__': t1=threading.Thread(target=work) t2=threading.Thread(target=work1) t1.start() t2.start() t1.join() t2.join() print('主线程执行结果',num)
'运行
执行结果:
work 10
work1 110
主线程执行结果 110
python全局解释器锁(GIL):
存在进程中
由于GIL存在:多线程只能并发,同一时刻只有一个线程执行任务
意味着python写的多线程无法充分利用硬件设备的资源
python中的线程在什么情况下会进行切换?(pythonGIL锁的释放)
1、程序执行遇到IO操作(耗时等待,堵塞)
2、程序执行的时间达到指定的阈值(0.005s)
import threading num=0 def work(): global num for i in range(100000): num+=1 print('work',num) def work1(): global num for i in range(100000): num+=1 print('work1',num) if __name__ == '__main__': t1=threading.Thread(target=work) t2=threading.Thread(target=work1) t1.start() t2.start() t1.join() t2.join() print('主线程执行结果',num)
'运行
执行结果:
workwork1 1853502
1000000
主线程执行结果 1853502
使用python多线程的时候,要注意的问题:
多个线程操作同一个全局的变量(资源),由于无法确定线程切换的时间,会导致数据覆盖(结果不准)
解决方案1:
加锁:将操作全局资源的代码锁起来,确保在执行这行代码时不会出现线程切换,(主动控制线程切换的节点)
解决方法2:
使用队列来存储多线程共享的数据
from threading import Lock,Thread num=0 def work(): lock.acquire() global num for i in range(1000000): num+=1 print('work', num) lock.release() def work1(): lock.acquire() global num for i in range(1000000): num+=1 print('work1', num) lock.release() if __name__ == '__main__': lock = Lock() t1=Thread(target=work) t2=Thread(target=work1) t1.start() t2.start() t1.join() t2.join() print('主线程执行结果',num)
'运行
work 1000000
work1 2000000
主线程执行结果 2000000
task_down():向队列发送一个任务执行完毕的信息
join():等待队列中的任务(数据处理完毕)执行完毕
数据处理完毕判断依据是什么?
1、队列接收到的任务执行完毕的信号,是否等于进入队列中的任务数(数据量)
调用put的方法次数和调用task_done方法要一致
2、队列为空
如果队列中有5条数据,获取了3条,发送了3个执行完毕的信号?join会堵塞
如果队列中有5条数据,获取了3条,发送了5个执行完毕的信号?join会堵塞
from queue import Queue,PriorityQueue,LifoQueue import threading import time q=Queue() for i in range(20): url=f'www.baidu{i}.com' q.put(url) def work(): while q.qsize()>0: url=q.get() print(url) time.sleep(1) #todo 向队列发送一个任务执行完毕的信息 q.task_done() def main(): for i in range(4): t=threading.Thread(target=work) t.start() #等待队列中的任务(数据处理完毕)执行完毕 q.join() ''' 数据处理完毕判断依据是什么? 1、队列接收到的任务执行完毕的信号,是否等于进入队列中的任务数(数据量) 调用put的方法次数和调用task_done方法要一致 2、队列为空 task_done方法:向队列发送一个任务执行完毕的信息 ''' main()
'运行
t1=ThreadPoolExecutor(max_workers=5):创建一个线程池,线程池中最多支持同时执行多少个任务
t1.submit(work):往线程池中提交执行的任务
t1.shutdown():等待线程池中所有的任务执行完毕之后,开始执行
from concurrent.futures.process import ProcessPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor import time import threading def work(): for i in range(6): print(f"线程1工作") time.sleep(1) def work1(): for i in range(5): print(f"线程2学习") time.sleep(1) if __name__ == '__main__': #todo 创建一个线程池 t1 = ThreadPoolExecutor(max_workers=5) # todo 线程池中最多支持同时执行多少个任务 st=time.time() #todo 往线程池中提交执行的任务 t1.submit(work) t1.submit(work1) #todo 等待线程池中所有的任务执行完毕之后,开始执行 t1.shutdown() et=time.time() print('执行的时间:',et-st)
'运行
线程1工作
线程2学习
线程1工作线程2学习
线程1工作线程2学习
线程1工作线程2学习
线程1工作
线程2学习
线程1工作
执行的时间: 6.0538411140441895
from concurrent.futures.process import ProcessPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor import time import threading def work(name): for i in range(6): print(f"线程1工作---{name}") time.sleep(1) def work1(name): for i in range(5): print(f"线程2学习---{name}") time.sleep(1) if __name__ == '__main__': #todo 创建一个线程池 t1 = ThreadPoolExecutor(max_workers=5) # todo 线程池中最多支持同时执行多少个任务 st=time.time() #todo 往线程池中提交执行的任务 t1.submit(work,'kobe') t1.submit(work1,'james') #todo 等待线程池中所有的任务执行完毕之后,开始执行 t1.shutdown() et=time.time() print('执行的时间:',et-st)
'运行
线程1工作—kobe
线程2学习—james
线程1工作—kobe线程2学习—james
线程2学习—james线程1工作—kobe
线程1工作—kobe线程2学习—james
线程2学习—james线程1工作—kobe
线程1工作—kobe
执行的时间: 6.051986455917358
t1.map(func1,li):批量往线程池中提交任务
from concurrent.futures.process import ProcessPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor import time import threading def func1(item): for i in range(2): print('正在执行任务{},的第{}轮'.format(item,i)) time.sleep(0.25) if __name__ == '__main__': #todo 创建一个线程池 t1 = ThreadPoolExecutor(max_workers=3) # todo 线程池中最多支持同时执行多少个任务 li=[11,6,8,24,22] #todo 用例数据 #todo 批量往线程池中提交任务 t1.map(func1,li) #todo 等价于 # for i in li: # t1.submit(func1,i)
'运行
正在执行任务11,的第0轮
正在执行任务6,的第0轮
正在执行任务8,的第0轮
正在执行任务11,的第1轮
正在执行任务6,的第1轮正在执行任务8,的第1轮
正在执行任务24,的第0轮
正在执行任务22,的第0轮
正在执行任务22,的第1轮
正在执行任务24,的第1轮
from concurrent.futures.process import ProcessPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor import time import threading def func1(item): for i in range(2): print('正在执行任务{},的第{}轮'.format(item,i)) time.sleep(0.25) if __name__ == '__main__': st=time.time() with ThreadPoolExecutor(max_workers=2) as tp: tp.map(func1,[11,22,33]) et=time.time() print('时长:',et-st)
'运行
正在执行任务11,的第0轮
正在执行任务22,的第0轮
正在执行任务22,的第1轮正在执行任务11,的第1轮
正在执行任务33,的第0轮
正在执行任务33,的第1轮
时长: 1.0219557285308838