多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。
并发编程让我们能够充分利用IO资源, 但是当多个进程使用同一分述句资源的时候就会引发数据安全或者数据混乱问题.
- import os
- import time
- import random
- from multiprocessing import Process
-
- def work(n):
- print('%s: %s is running' %(n,os.getpid()))
- time.sleep(random.random())
- print('%s:%s is done' %(n,os.getpid()))
-
- if __name__ == '__main__':
- for i in range(3):
- p=Process(target=work,args=(i,))
- p.start()
- # 由并发变成了串行,牺牲了运行效率,但避免了竞争
- import os
- import time
- import random
- from multiprocessing import Process,Lock
-
- def work(lock,n):
- lock.acquire()
- print('%s: %s is running' % (n, os.getpid()))
- time.sleep(random.random())
- print('%s: %s is done' % (n, os.getpid()))
- lock.release()
- if __name__ == '__main__':
- lock=Lock()
- for i in range(3):
- p=Process(target=work,args=(lock,i))
- p.start()
上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。
接下来,我们以模拟抢票为例,来看看数据安全的重要性。
- # 文件db的内容为:{"count":1}
- # 注意一定要用双引号,不然json无法识别
- # 并发运行,效率高,但竞争写同一文件,数据写入错乱
- from multiprocessing import Process,Lock
- import time,json,random
- def search():
- dic=json.load(open('db'))
- print('剩余票数%s' %dic['count'])
-
- def get():
- dic=json.load(open('db'))
- time.sleep(0.1) # 模拟读数据的网络延迟
- if dic['count'] >0:
- dic['count']-=1
- time.sleep(0.2) # 模拟写数据的网络延迟
- json.dump(dic,open('db','w'))
- print('购票成功')
-
- def task():
- search()
- get()
-
- if __name__ == '__main__':
- for i in range(100): # 模拟并发100个客户端抢票
- p=Process(target=task)
- p.start()
- # 文件db的内容为:{"count":5}
- # 注意一定要用双引号,不然json无法识别
- # 并发运行,效率高,但竞争写同一文件,数据写入错乱
- from multiprocessing import Process,Lock
- import time,json,random
- def search():
- dic=json.load(open('db'))
- print('剩余票数%s' %dic['count'])
-
- def get():
- dic=json.load(open('db'))
- time.sleep(random.random()) # 模拟读数据的网络延迟
- if dic['count'] >0:
- dic['count']-=1
- time.sleep(random.random()) # 模拟写数据的网络延迟
- json.dump(dic,open('db','w'))
- print('购票成功')
- else:
- print('购票失败')
-
- def task(lock):
- search()
- lock.acquire()
- get()
- lock.release()
-
- if __name__ == '__main__':
- lock = Lock()
- for i in range(100): # 模拟并发100个客户端抢票
- p=Process(target=task,args=(lock,))
- p.start()
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
因此我们最好找寻一种解决方案能够兼顾:
效率高(多个进程共享一块内存的数据)
帮我们处理好锁问题。
这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
问题引入:对数据 n = 10 开启多线程将 n 减成 0
这 10 个进程是并发执行的,所消耗的时间只有大概 0.1 秒,但是结果为 9
- from threading import Thread
- import os,time
- def work():
- global n
- temp=n
- time.sleep(0.1)
- n=temp-1
- if __name__ == '__main__':
- n=10
- l=[]
- for i in range(10):
- p=Thread(target=work)
- l.append(p)
- p.start()
- for p in l:
- p.join()
-
- print(n) #结果可能为9
结果为 0,但是这 10 个进程是串行执行的,所消耗的时间大概 1 秒,
- from threading import Thread,Lock
- import os,time
- def work():
- global n
- lock.acquire()
- temp=n
- time.sleep(0.1)
- n=temp-1
- lock.release()
- if __name__ == '__main__':
- lock=Lock()
- n=10
- l=[]
- for i in range(10):
- p=Thread(target=work)
- l.append(p)
- p.start()
- for p in l:
- p.join()
-
- print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
- from threading import Lock
- import time
-
-
- mutexA=Lock()
-
- mutexA.acquire()
- mutexA.acquire()
- print(123)
- mutexA.release()
- mutexA.release()
递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
- from threading import RLock
- import time
-
-
- mutexA=RLock()
-
- mutexA.acquire()
- mutexA.acquire()
- print(123)
- mutexA.release()
- mutexA.release()
互斥锁:相当于同一时间只能有一个人上厕所
信号量:相当于公共厕所,有多个位置可以抢占,但是超过规定的数量,就会阻塞抢占资源的进程
在运行时,可以明显的观察到,三个进程为一组一起运行,然后再三个进程
- from threading import Thread,Semaphore
- import time
-
-
- semaphore = Semaphore(3)
-
- def task(index):
- semaphore.acquire()
- print(f'threading {index} is running ')
- time.sleep(2)
- semaphore.release()
-
- if __name__ == '__main__':
- for i in range(9):
- t = Thread(target=task,args=(i,))
- t.start()
一些进程/线程 需要等待另外的 进程/线程 运行结束后才能运行
汽车需要遵守红绿灯的指示才能通行
- from threading import Thread,Event
- import time
-
- event = Event() # 造了一个红绿灯
-
- def light_thread():
- print('红灯亮着\n')
- # 等待绿灯亮
- time.sleep(3)
-
- print('绿灯了!!!!')
- # 需要通知汽车已经绿灯了,汽车接受这个信号
- event.set()
-
-
- def car_thread(name):
- print(f'{name}号汽车正在等红灯')
- # 等待收到绿灯信号
- event.wait()
- print(f'{name}号汽车通过')
-
-
- if __name__ == '__main__':
- # 先开启信号灯进程
- light = Thread(target=light_thread)
- light.start()
-
- for i in range(5):
- # 开启多个汽车进程
- car = Thread(target=car_thread,args=(i,))
- car.start()
-