第一章 Python 入门
第二章 Python基本概念
第三章 序列
第四章 控制语句
第五章 函数
第六章 面向对象基础
第七章 面向对象深入
第八章 异常机制
第九章 文件操作
第十章 模块
第十一章 GUI图形界面编程
第十二章 pygame游戏开发基础
第十三章 pyinstaller 使用详解
第十四章 并发编程初识
第十五章 并发编程三剑客-进程, 线程与协程
书写上文, 我们可以知道: 进程是资源(CPU、内存等)分配的基本单位,它是程序执行时的一个实例.
程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行. 进程切换需要的资源很最大,效率低。
进程创建有两种方式: 方法包装和类包装. 熟悉Java的人可能会发现, 类包装更符合我们原来的书写习惯
创建进程后, 我们使用start() 来启动进程
主要步骤:
实操代码
import time
from multiprocessing import Process
class MyProcess(Process):
"""进程的创建方式: 2.类包装"""
def __init__(self, name):
Process.__init__(self)
self.name = name
def run(self):
print(f"进程{self.name} 启动")
time.sleep(3)
print(f"进程{self.name} 结束")
if __name__ == "__main__":
print("创建进程")
p1 = MyProcess("p1")
p2 = MyProcess("p2")
p1.start()
p2.start()
执行结果

主要步骤:
实操代码
import os
import time
from multiprocessing import Process
def function(name):
"""进程的创建方式: 1.方法包装"""
print("当前进程ID:", os.getpid())
print("父进程ID", os.getppid())
print(f"Process:{name} start")
time.sleep(3)
print(f"Process:{name} end")
if __name__ == "__main__":
print("当前main进程ID: ", os.getppid())
# 创建进程
p1 = Process(target=function, args=("p1",))
p2 = Process(target=function, args=("p2",))
p1.start()
p2.start()
执行结果

多说一句
元组中如果只有一个元素, 是需要加逗号的!!! 这是因为括号( )既可以表示tuple,又可以表示数学公式中的小括号,
所以如果没有加逗号,那你里面放什么类型的数据那么类型就会是什么.
进程间通信有两种方式: Queue队列和Pipe管道方式
实现核心:
实操代码
from multiprocessing import Process, Queue
class MyProcess(Process):
def __init__(self, name, mq):
Process.__init__(self)
self.name = name
self.mq = mq
def run(self):
print("Process {} started".format(self.name))
print("===Queue", self.mq.get(), "===")
self.mq.put(self.name)
print("Process {} end".format(self.name))
if __name__ == "__main__":
# 创建进程列表
t_list = []
mq = Queue()
mq.put("1")
mq.put("2")
mq.put("3")
# 利用range序列重复创建进程
for i in range(3):
t = MyProcess("p{}".format(i), mq)
t.start()
t_list.append(t)
# 等待进程结束
for t in t_list:
t.join()
print(mq.get())
print(mq.get())
print(mq.get())
执行结果

Pipe 直译过来的意思是“管”或“管道”,和实际生活中的管(管道)是非常类似的.
Pipe方法返回(conn1, conn2)代表一个管道的两个端.

实现核心
conn1, conn2 = multiprocessing.Pipe()conn1.send/conn1.recv实操代码
import multiprocessing
import time
def fun1(conn1):
"""
管道结构
进程<==>conn1(管道头)==pipe==conn2(管道尾)<==>进程2
"""
sub_info = "进程向conn1发送消息, 管道另一头conn2 可以接收到消息"
print(f"进程1--{multiprocessing.current_process().pid}发送数据:{sub_info}")
time.sleep(1)
conn1.send(sub_info) # 调用conn1.send发送消息, 发送的消息会被管道的另一头接收
print(f"conn1接收消息:{conn1.recv()}") # conn1.recv接收消息, 如果没有消息可接收, recv方法会一直阻塞. 如果管道已经被关闭,那么recv方法会抛出EOFError
time.sleep(1)
def fun2(conn2):
sub_info = "进程向conn2发送消息, 管道另一头conn1 可以接收到消息"
print(f"进程2--{multiprocessing.current_process().pid}发送数据:{sub_info}")
time.sleep(1)
conn2.send(sub_info)
print(f"conn2接收消息:{conn2.recv()}")
time.sleep(1)
if __name__ == "__main__":
# 创建管道
# Pipe方法返回(conn1, conn2)代表一个管道的两个端.如果conn1带表头, conn2代表尾, conn1发送的消息只会被conn2接收, 同理conn2发送的消息也只会被conn1接收
conn1, conn2 = multiprocessing.Pipe()
# 创建子进程
# Python中,圆括号意味着调用函数. 在没有圆括号的情况下,Python会把函数当做普通对象
process1 = multiprocessing.Process(target=fun1, args=(conn1,))
process2 = multiprocessing.Process(target=fun2, args=(conn2,))
# 启动子进程
process1.start()
process2.start()
管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享
实现核心
实操代码
from multiprocessing import Manager, Process
def func1(name,m_list,m_dict):
m_dict['area'] = '罗布泊'
m_list.append('钱三强')
def func2(name, m_list, m_dict):
m_dict['work'] = '造核弹'
m_list.append('邓稼先')
if __name__ == "__main__":
with Manager() as mgr:
m_list = mgr.list()
m_dict = mgr.dict()
m_list.append("钱学森")
# 两个进程不能直接互相使用对象,需要互相传递
p1 = Process(target=func1, args=('p1', m_list, m_dict))
p1.start()
p1.join() # 等p1进程结束,主进程继续执行
print(m_list)
print(m_dict)
p2 = Process(target=func2, args=('p1', m_list, m_dict))
p2.start()
p2.join() # 等p2进程结束,主进程继续执行
print(m_list)
print(m_dict)
执行结果
太过机密, 不予展示
进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;
反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行
使用进程池的好处就是可以节约内存空间, 提高资源利用率
进程池相关方法
| 类/方法 | 功能 | 参数 |
|---|---|---|
| Pool(processes) | 创建进程池对象 | processes表示进程池中有多少进程 |
| pool.apply_async(func,args,kwds) | 异步执行;将事件放入到进程池队列 | func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参返回值:返回一个代表进程池事件的对象,通过返回值的get方法可以得到事件函数的返回值 |
| pool.apply(func,args,kwds) | 同步执行;将事件放入到进程池队列 | func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参 |
| pool.close() | 关闭进程池 | |
| pool.join() | 关闭进程池 | |
| pool.map(func,iter) | 类似于python的map函数,将要做的事件放入进程池 | func 要执行的函数 iter 迭代对象 |
实现核心
实操代码
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
sleep(2)
return name
def func2(args):
print("方法2输出: ", args)
if __name__ == "__main__":
pool = Pool(5)
pool.apply_async(func=func1, args=('进程1',), callback=func2)
pool.apply_async(func=func1, args=('进程2',), callback=func2)
pool.apply_async(func=func1, args=('进程3',), callback=func2)
pool.apply_async(func=func1, args=('进程4',))
pool.apply_async(func=func1, args=('进程5',))
pool.apply_async(func=func1, args=('进程6',))
pool.apply_async(func=func1, args=('进程7',))
pool.close()
pool.join()
执行结果

使用with 方法, 可以进行优雅的进行资源管理. 在这里是可以帮助我们优雅的关闭线程池
关于with方法
with所求值的对象必须有一个enter()方法,一个exit()方法.
紧跟with后面的语句被求值后,返回对象的__enter__()方法被调用,
这个方法的返回值将被赋值给as后面的变量。当with后面的代码块全部被执行完之后,将调用前面返回对象的exit()方法
实操代码
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
sleep(2)
return name
if __name__ == "__main__":
with Pool(5) as pool:
args = pool.map(func1, ('进程1,', '进程2,', '进程3,', '进程4,', '进程5,', '进程6,', '进程7,', '进程8,'))
for a in args:
print(a)
执行结果

线程是程序执行时的最小单位,也是CPU调度和分派的基本单位.
一个进程可以由很多个线程组成,每个线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度
线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行. 同样多线程也可以实现并发操作,每个请求分配一个线程来处理.
Python的标准库提供了两个模块:
_thread和threading,_thread是低级模块,threading是高级模块,
是对_thread进行了封装。多数情况下,我们只需要使用threading这个高级模块.
而线程的创建的方式有两种: 一种是方法包装, 一种是类包装
主要步骤:
实操代码
from threading import Thread
from time import sleep
def func1(name):
"""线程创建方式(方法包装)"""
for i in range(3):
print(f"thread:{name} : {i}")
sleep(1)
if __name__ == "__main__":
"""新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
print("主线程, start")
# 创建线程
t1 = Thread(target=func1, args=("t1",))
t2 = Thread(target=func1, args=("t2",))
# 启动线程
t1.start()
t2.start()
print("主线程, end")
"""
注意: 运行结果可能会出现换行问题,是因为多个线程抢夺控制台输出的IO流
"""
结果展示

主要步骤
实操代码
from threading import Thread
from time import sleep
class MyThread(Thread):
"""
类的方式创建线程(类包装)
"""
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(3):
print(f"thread : {self.name} : {i}")
sleep(1)
if __name__ == "__main__":
"""新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
print("主线程, start")
# 创建线程(类创建的方式)
t1 = MyThread("t1")
t2 = MyThread("t2")
# 启动线程
t1.start()
t2.start()
print("主线程, end")
结果展示
从结果可以看出, 启动线程对应的方法是异步的

由上图可知,主线程不会等待子线程结束(异步).
如果需要等待子线程结束后,再结束主线程,可使用join()方法
实操代码
if __name__ == "__main__":
"""新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
print("主线程, start")
# 创建线程(类创建的方式)
t1 = MyThread("t1")
t2 = MyThread("t2")
# 启动线程
t1.start()
t2.start()
# 主线程等t1, t2结束之后, 再往下执行
t1.join()
t2.join()
print("主线程, end")
在行为上还有一种叫守护线程,主要的特征是在它的生命周期中主线程死亡,它也就随之死亡.
在python中,线程通过 setDaemon(True|False)来设置是否为守护线程.
实操代码
from threading import Thread
from time import sleep
class MyThread(Thread):
"""守护线程"""
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(3):
print(f"thread : {self.name} : {i}")
sleep(1)
if __name__ == "__main__":
print("主线程, start")
# 创建线程(类创建的方式)
t1 = MyThread("t1")
# 设置为守护线程
# t1.setDaemon(True) # 该方法3.10后被废弃, 可以直接按照下面的方法设置
t1.daemon = True
# 启动线程
t1.start()
print("主线程, end")
执行结果
从结果可以看出, 主线程不会等待子线程结束(也就是执行循环体的后续循环),
因为子线程被设置成守护线程, 因此主线程执行完毕后子线程就会停止执行

事件Event主要用于唤醒正在阻塞等待状态的线程
注意:
Event() 可以创建一个事件管理标志,该标志(event)默认为False,event对象主要有四种方法可以调用:
| 方法名 | 说明 |
|---|---|
event.wait(timeout=None) | 调用该方法的线程会被阻塞,如果设置了timeout参数,超时后,线程会停止阻塞继续执行; |
event.set() | 将event的标志设置为True,调用wait方法的所有线程将被唤醒 |
event.clear() | 将event的标志设置为False,调用wait方法的所有线程将被阻塞 |
event.is_set() | 判断event的标志是否为True |
实操代码
import time
import threading
def chihuoguo(name):
"""利用事件实现就餐, 吃饭"""
# 等待时间, 进入阻塞等待状态
print(f'{name} 已经到餐厅')
print(f'小伙伴{name} 已经进入就餐状态!')
event.wait()
print(f'{name} 收到通知了.')
print(f'小伙伴{name} 开始吃咯!')
if __name__ == "__main__":
event = threading.Event()
# 创建新线程
threa1 = threading.Thread(target=chihuoguo, args=("玛奇玛",))
threa2 = threading.Thread(target=chihuoguo, args=("电次",))
# 开启线程
threa1.start()
threa2.start()
time.sleep(10)
# 发送时间通知
print('---->>>主线程(波吉塔)通知小伙伴开吃咯!')
event.set()
执行结果
event.wait() 之前的代码段都能够正常执行,event.set(), event.wait()之后的代码才会被执行
event.wait() 这段代码注释后, 由于主线程没有变更事件的标识, 即事件标识一直为false
在python中,无论你有多少核,在Cpython解释器中永远都是假象.
无论你是4核,8核,还是16核,同一时间执行的线程只有一个线程.
这个是python开发设计的一个缺陷,所以说python中的线程是“含有水分的线程”.
全局锁, 又称GIL, 全称是: Global Interpreter Lock. 他是用来保证同一时刻只有一个线程在运行.

注意:
处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象. 这时候就需要用到“线程同步”.
线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用
实操代码
from threading import Thread
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# 模拟提款操作
class Drawing(Thread):
def __init__(self, drawingNum, account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
if self.account.money - self.drawingNum < 0:
return
sleep(1) # 判断完后阻塞。其他线程开始运行
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
print(f"账户:{self.account.name},余额是:{self.account.money}")
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
if __name__ == "__main__":
a1 = Account(100, "timepause")
# 定义取钱线程对象
draw1 = Drawing(80, a1)
# 定义取钱线程对象
draw2 = Drawing(80, a1)
draw1.start()
draw2.start()
执行结果
可以看出, 没有线程同步机制,两个线程同时操作同一个账户对象,可以从只有100元的账户,轻松取出80*2=160元,账户余额变为-60. 但银行一般不会同意用户账户为负的.

互斥锁: 对共享数据进行锁定,保证同一时刻只能有一个线程去操作
注意:
threading 模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁实现步骤
lock1.acquire()获得锁之后进行加锁, 然后调用 lock1.release() 释放锁实操代码
from threading import Thread, Lock
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# 模拟提款操作
class Drawing(Thread):
def __init__(self, drawingNum, account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
# 获得锁
lock1.acquire()
if self.account.money - self.drawingNum < 0:
return
sleep(1) # 判断完后阻塞。其他线程开始运行
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
# 释放锁
lock1.release()
print(f"账户:{self.account.name},余额是:{self.account.money}")
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
if __name__ == "__main__":
a1 = Account(100, "timepause")
# 创建互斥锁
lock1 = Lock()
# 定义取钱线程对象
draw1 = Drawing(80, a1)
# 定义取钱线程对象
draw2 = Drawing(80, a1)
draw1.start()
draw2.start()
执行结果
可以看到, 加锁之后, 解决了因线程未同步引起的结果紊乱的问题

互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让N个(指定数值)线程访问?
这时候可以使用信号量. 信号量控制同时访问资源的数量.
信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过
应用场景
实操代码
from threading import Lock, Semaphore, Thread
from time import sleep
def home(name, se):
"""
信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过
"""
# 拿到信号量
se.acquire()
print(f"{name}进入了厕所")
sleep(3)
print(f'******************{name}走出了厕所')
# 归还信号量
se.release()
if __name__ == "__main__":
"""
一个房间一次只允许两个人使用: 若不使用信号量,会造成所有人都进入这个房子; 若只允许一人通过可以用锁Lock()
"""
# 创建信号量的对象,有两个坑位
se = Semaphore(2)
print("欢迎光临本厕所, 目前只有两个坑位(联系商务即可扩增~~~)")
for i in range(7):
p = Thread(target=home, args=(f'tom{i}', se)) # 前面加f的作用是令{}里面的变量引用可以生效
p.start()
执行结果
可以看到, 在使用信号量之后, 每次只能有两个人使用厕所. 在释放信号量之后, 后面的人可以继续使用厕所

在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的
实操代码
from threading import Lock, Thread
from time import sleep
def run1():
# 获取锁
lock1.acquire()
print("幸平创真拿到菜刀")
# lock1.release()
# print("幸平创真释放菜刀")
lock2.acquire()
print("幸平创真拿到锅")
# lock2.release()
# print("幸平创真释放锅")
def run2():
lock2.acquire()
print("卫宫士郎拿到锅")
# lock2.release()
# print("卫宫士郎释放锅")
lock1.acquire()
print("卫宫士郎拿到菜刀")
# lock1.release()
# print("卫宫士郎释放菜刀")
if __name__ == "__main__":
"""
死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题思路很简单,就是:同一个代码块,不要同时持有两个对象锁
"""
lock1 = Lock()
lock2 = Lock()
t1 = Thread(target=run1)
t2 = Thread(target=run2)
t1.start()
t2.start()
执行结果
可以看到, 在幸平创真拿到菜刀, 卫宫士郎拿到锅之后, 两人都不愿意释放手中的家伙, 因此无法继续进行做饭, 线程阻塞

放开注释的代码段之后, 由于每个代码块支持有一个对象锁, 因此不会发送死锁

多线程环境下,我们经常需要多个线程的并发和协作. 这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式

缓冲区和queue对象
从一个线程向另一个线程发送数据最安全的方式可能就是使用queue 库中的队列了。
创建一个被多个线程共享的 Queue对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。
Queue对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据
Queue对象类似进程中的Manager管理器, 本质都是创建了共享数据, 然后在不同进程/线程之间共享
实现步骤:
queue.put()将数据放入queue队列, 消费者通过调用queue.get() 获取queue中的数据实操代码
import queue
from threading import Thread
from time import sleep
def producer():
num = 1
while True:
if queue.qsize() < 5:
print(f"生产:{num}号消息")
queue.put(f"消费: {num}号消息")
num += 1
else:
print("消息队列已满, 等待被消费")
sleep(1)
def consumer():
while True:
print(f"获取消息:{queue.get()}")
sleep(1)
if __name__ == "__main__":
queue = queue.Queue()
# 注意这里 target=函数名, 而不是 target=函数名()
t = Thread(target=producer)
t.start()
c = Thread(target=consumer)
c.start()
c2 = Thread(target=consumer)
c2.start()
执行结果

协程也叫作纤程(Fiber),是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理.
我们可以将协程理解为运行在线程上的代码块, 协程挂起并不会引起线程阻塞, 他的作用是提高线程的利用率…
协程之间可以依靠邮箱来进行通信和数据共享, 了避免内存共享数据而带来的线程安全问题.
因为其轻量和高利用率的特点, 即使创建上千个线程也不会对系统造成很大负担, 而线程则恰恰相反.
协程是一种设计思想,不仅仅局限于某一门语言. 在Go, Java, Python 等语言中均有实现
协程的优点
协程的缺点
实操代码
不使用协程时
# 不使用协程执行多个任务
import time
def fun1():
for i in range(3):
print(f'原子弹:第{i}次爆炸啦')
time.sleep(1)
return "fun1执行完毕"
def fun2():
for k in range(3):
print(f'氢弹:第{k}次爆炸了')
time.sleep(1)
return "fun2执行完毕"
def main():
fun1()
fun2()
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
print(f"耗时{end_time - start_time}") # 不使用协程,耗时6秒
使用使用yield协程,实现任务切换
# 不使用协程执行多个任务
import time
def fun1():
for i in range(3):
print(f'原子弹:第{i}次爆炸啦')
yield # 只要方法包含了yield,就变成一个生成器
time.sleep(1)
return "fun1执行完毕"
def fun2():
g = fun1() # fun1是一个生成器,fun1()就不会直接调用,需要通过next()或for循环调用
print(type(g))
for k in range(3):
print(f'氢弹:第{k}次爆炸了')
next(g) # 继续执行fun1的代码
time.sleep(1)
return "fun2执行完毕"
def main():
fun1()
fun2()
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
print(f"耗时{end_time - start_time}") # 耗时5.0秒,效率差别不大
使用asyncio异步IO的典型使用方式实现协程
实现步骤:
await asyncio.gather(fun1(), fun2())asyncio.run(main()) 来运行main方法实操代码
# 不使用协程执行多个任务
import asyncio
import time
async def fun1(): # async表示方法是异步的
for i in range(3):
print(f'原子弹:第{i}次爆炸啦')
# await异步执行func1方法
await asyncio.sleep(1)
return "fun1执行完毕"
async def fun2():
for k in range(3):
print(f'氢弹:第{k}次爆炸了')
# await异步执行func2方法
await asyncio.sleep(1)
return "fun2执行完毕"
async def main():
res = await asyncio.gather(fun1(), fun2())
# 返回值为函数的返回值列表,本例为["func1执行完毕", "func2执行完毕"]
print(res)
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"耗时{end_time - start_time}") # 耗时3秒,效率极大提高
实操结果
从这里可以看到, 这里进用了3.02s左右, 只比理论最短用时3s多了0.02s左右, 从这里可以看出使用协程的巨大优势
