我们通过代码来验证GIL的存在
通过 创建一百个线程,正常来说这是异步操作 他们一时间取到的都是100,但是 因为有GIL的机制 让他们抢锁 所以就不是真正意义上的同时运行而是串行
from threading import Thread
money = 100
def task():
global money
money -=1
t_litst =[]
# 创建100个线程
for i in range(100):
t=Thread(target=task)
t.start()
#将线程添加进列表
t_litst.append(t)
# 将所有创建的线程 都让他能够运行完
for i in t_litst:
t.join()
#等待所有的线程运行结束 查看money 是多少
print(money)
>>>>>>
0
单进程多线程中 当一个线程先拿到到锁取到了公共数据后 进入io操作 那么 会释放解释器锁 让各线程都能轮流拿到锁 去取公共数据 而第一个拿到锁的 没有机会 在这之前 将数据实时更新
所以我们就算有解释器锁在面对io操作下 同线程 创建情况 我们还是要添加互斥锁
from threading import Thread
import time
money = 100
def task():
global money
#先获取money
price=money
#io操作 使其释放锁
time.sleep(0.01)
money=price-1
t_litst =[]
# 创建100个线程
t1=time.time()
for i in range(100):
t=Thread(target=task)
t.start()
t.join()
#将线程添加进列表
t_litst.append(t)
# 将所有创建的线程 都让他能够运行完
for i in t_litst:
t.join()
#等待所有的线程运行结束 查看money 是多少
print(time.time()-t1)
print(money)
>>>>>>
99
所以要保证数据安全 不紊乱 需要我们自己加锁
from threading import Thread, Lock
import time
mutex = Lock()
money = 100
def task():
# 抢锁
mutex.acquire()
global money
# 先获取money
price = money
# io操作 使其释放锁
time.sleep(0.01)
money = price - 1
#释放锁
mutex.release()
t_litst = []
# 创建100个线程
t1 = time.time()
for i in range(100):
t = Thread(target=task)
t.start()
# t.join()
# 将线程添加进列表
t_litst.append(t)
# 0.10970592498779297
# 将所有创建的线程 都让他能够运行完
for i in t_litst:
t.join()
# 等待所有的线程运行结束 查看money 是多少
print(time.time() - t1)
print(money)
需要分情况
cpu情况
代码情况
单个cpu
io密集型
多进程
总耗时(单进程的耗时+io+申请空间+拷贝代码)
多线程
总耗时(单进程的耗时加+io)
多线程有优势
计算密集型
多进程
申请额外的空间 消耗更多的资源(总耗时+申请空间+拷贝代码+切换)
多线程
耗时资源相对较少 通过多道技术(总耗时+切换)
多线程有优势
多个cpu
io密集型
多进程
总耗时(单个进程的耗时+io+申请空间+拷贝代码)
多线程
总耗时(单进程耗时+io)
多线程有优势
计算密集型
多进程
总耗时(单个进程的耗时)
多线程
总耗时(多个进程的综合)
多线程完胜!
模拟计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
# 计算密集型
res = 1
for i in range(1, 100000):
res *= i
if __name__ == '__main__':
# 查看当前计算机cpu个数
print(os.cpu_count())
start_time = time.time()
# p_list = []
# # 一次性创建12个进程
# for i in range(12):
# p=Process(target=work)
# p.start()
# p_list.append(p)
# #确定 所有进程全部运行完毕
# for p in p_list:
# p.join()
#多进程: 9.303270816802979
t_list=[]
for i in range(12):
t = Thread(target=work)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(time.time()-start_time)
#多线程: 27.876002550125122
模拟io密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
# io操作
time.sleep(2)
if __name__ == '__main__':
stat_time = time.time()
# t_list = []
# for i in range(100):
# t = Thread(target=work)
# t.start()
# for t in t_list:
# t.join()
p_list=[]
for i in range(100):
p = Process(target=work)
p.start()
for p in p_list:
p.join()
print(time.time() - stat_time)
#多线程: 0.012948989868164062
#多进程: 1.7265512943267822
虽然我们已经掌握了互斥锁的使用,但在实际项目中jing’liang
先抢锁 后释放锁
from threading import Thread,Lock
import time
#类名加括号 每执行依次 就会产生一个新的对象
mutexA = Lock()
mutexB = Lock()
class Mythread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print(f'{self.name}抢A锁')
mutexB.acquire()
print(f'{self.name}抢B')
mutexB.release()
print(f'{self.name}释放了B')
mutexA.release()
print(f'{self.name}释放了A')
def func2(self):
mutexB.acquire()
print(f'{self.name}抢到了B')
time.sleep(1)
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexA.release()
print(f'{self.name}释放了A')
mutexB.release()
print(f'{self.name}释放了B')
for i in range(10):
t = Mythread()
t.start()
信号量本质也是 互斥锁 只不过它是多把锁
强调:
信号量在不同的知识体系中 意思可能有区别
在django中 信号量指的是达到某个条件自动触发(中间件)
我们之前使用LOCK产生的是单把锁
类似于单间厕所
信号量相当于一次性创建多间厕所
类似于公共厕所
from threading import Thread, Semaphore
import time
# 一次性产生五把锁
sp = Semaphore(5)
class Mythread(Thread):
def run(self):
sp.acquire()
print(self.name)
time.sleep(2)
sp.release()
# print()
for i in range(20):
t = Mythread()
t.start()
#一次性进去五个 然后出再进去五个 依次类推
子进程:子线程之间可以彼此等待彼此
子A运行到 某一个代码位置 发信号告诉 B开始运行
from threading import Thread, Event
import time
# 类似于 造了一个红绿灯
event = Event()
def light():
print('红灯亮着的 所有人都不能动')
# 进入io操作 等待两秒
time.sleep(2)
print('绿灯亮了 冲冲冲')
event.set()
def car(name):
print('%s等红灯' % name)
# 等待 等light执行到 set发送信号 就会继续执行
event.wait()
print('%s加油门 飙车' % name)
#异步操作 进入io就转到下面创建进程
t = Thread(target=light)
t.start()
for i in range(20):
t = Thread(target=car, args=(i,))
t.start()
多进程 多线程
在实际应用中 是不是可以无限制的开进程和线程
肯定不可以 !! 会造成内存溢出 受限于硬件水平
我们在开设多进程或线程的时候 还需要考虑硬件的承受范围
池
降低程序的执行效率 保证计算机硬件的安全
进程池
提前创建好固定个数的进程供程序使用 后续不会再创建
线程池
提前创建好 固定个数的线程提供程序使用 后续不会再创建
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread
import os
import time
pool = ThreadPoolExecutor(5) # 固定产生五个线程
pool = ProcessPoolExecutor(5) # 固定产生五个进程
def task(n):
print(os.getpid())
#print(n)
time.sleep(1)
return 'f返回的结果'
# 这边会 被异步回调 传参执行
def func(*args, **kwargs):
print('func', args, kwargs)
#获取 返回值
print(args[0].result())
if __name__ == '__main__':
for i in range(20):
# 异步回调:异步任务 执行完成后有结果就会自动触发该机制
pool.submit(task,123).add_done_callback(func)
进程:资源单位
线程:执行单位
协程:
单线程下实现并发 (效率极高)
在代码层面欺骗cpu 让cpu觉得我们的代码里面没有io操作
实际上在io操作被我们自己写的代码检测 一旦有 立刻让代码执行别的
该技术完全是程序员 自己弄出来的 名字也是程序员自己起的
核心:自己写代码完全切换+保存状态
import time
from gevent import monkey;
# 固定编写 用于检测所有的IO操作(猴子补丁) 用来补全一些检测不到的io操作
monkey.patch_all()
from gevent import spawn
def func1():
print('func1 running')
time.sleep(3)
print('func1 over')
def func2():
print('func2 running')
time.sleep(5)
print('func2 over')
if __name__ == '__main__':
start_time = time.time()
#func1()
#func2()
#检测代码 一旦有IO自动切换(执行没有io的操作 变向的等待io结束)
s1 = spawn(func1)
s2 = spawn(func2)
s1.join()
s2.join()
print(time.time() - start_time)
#普通串行 8.020445108413696
#协程 5.039286136627197
import socket
# 固定编写 用于检测所有的IO操作(猴子补丁) 用来补全一些检测不到的io操作
from gevent import monkey;monkey.patch_all()
from gevent import spawn
#循环 收发
def communication(sock):
while True:
data = sock.recv(1024)
print(data.decode('utf8'))
sock.send(data.upper())
def get_server():
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:
#等待 客户端访问 网络请求
sock, addr = server.accept()
#监测 communication的io操作
spawn(communication, sock)
#监测 get_server 的io操作 然后io操作
s1 = spawn(get_server)
s1.join()
如何不断的提升程序的运行效率
多进程下并多线程 多线程下开协程