现在的操作系统大多采用时间片轮转的方式工作,需要频繁的切换进程,由于每个进程都占有一份独立的内存空间,所以每次切换进程时都需要切换内存空间(程序上下文),
这将造成很大的开销,此时操作系统的响应速度很慢,为了解决操作系统响应速度慢的问题,操作系统引入了更轻量的进程——线程,因为线程不占有内存空间,它包括在进程的内存空间中,共享进程的资源,所以切换线程的开销要小很多,
又由于线程相比进程更加轻量,操作系统可以启动更多的线程来执行任务(程序段),这进一步提高了操作系统的并发能力。现在的操作系统一般都是采用以进程为单位进行资源分配,以线程为单位进行调度,这样的工作方式大大提高了操作系统的响应速度。
线程(Thread):轻量级进程,是操作系统进行调度的最小单位。一个线程是一个任务(一个程序段)的一次执行过程。线程不占有内存空间,它包括在进程的内存空间中。在同一个进程内,多个线程共享进程的资源。一个进程至少有一个线程。
操作系统创建线程时,线程处于创建态,CPU调度线程时,线程处于运行态,此时其它已创建的或者时间片到的线程就处于就绪态,当然还有些线程在进行磁盘、网络等IO时就处于阻塞态,操作系统销毁线程时,线程就处于终止态。另外,线程还具有静止就绪态和静止阻塞态,处于这两种状态,说明这个线程被操作系统挂起了,操作系统挂起线程,是为了观察和分析线程状态。
创建一个线程通常有两种方法:
(1)在实例化一个线程对象时,将要执行的任务函数以参数的形式传入;
(2)继承Thread类的同时重写它的run方法。
注意: 每个进程只有一个主线程(main)。
方法一
import time
import threading
def printNumber(n: int) -> None:
while True:
print(n)
time.sleep(n)
for i in range(1, 3):
t = threading.Thread(target=printNumber, args=(i, ))
t.start()
方法二
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要调用父类的初始化函数,否则否发创建线程
super().__init__()
def run(self) -> None:
while True:
print(self.n)
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.start()
守护线程(Daemon Thread)也叫后台进程,它的目的是为其他线程提供服务所以优先级比较低, 如果当前进程中所有的线程被杀死了,那么守护线程也就没有了存在的必要。因此守护线程会随着非守护线程的消亡而消亡。Thread类中,子线程被创建时默认是非守护线程,我们可以通过setDaemon(True)将一个子线程设置为守护线程。
注意: 将子线程设置为守护线程必须在调用start()方法之前,否则回引发RuntimeError异常。
t = MyThread(i)
t.setDaemon(True) # 在启动线程前设置守护线程
t.start()
位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。
sleep() 函数的语法规则如下所示: time.sleep(secs)
` 单位是秒 ,我们可以通过传小数来进行毫秒级控制
在上面的案例中我们已经使用了很多sleep这就不演示了
join()会使主线程进入等待状态(阻塞),直到调用join()方法的子线程运行结束。同时你也可以通过设置timeout参数来设定等待的时间,如:
for i in range(1, 3):
t = MyThread(i)
t.start()
t.join(3)
在threading module中,有一个非常特别的类local。一旦在主线程实例化了一个local,它会一直活在主线程中,每个线程的值将会保存在相应的子线程的字典中。可以为每个线程创建一块独立的空间,让他存放数据。线程和线程之间数据不可以共享和访问
local = threading.local() # 全局对象
def worker():
local.value = 0
local.value1=100
for i in range(100):
time.sleep(0.0001)
local.value += 1
local.value1+= 1
print(threading.current_thread(), local.value)
print(threading.current_thread(), local.value1)
if __name__ == '__main__':
for i in range(10):
threading.Thread(target=worker).start()
现在假设你创建了两个子线程操作同一个全局变量number,number被初始化为0,两个子线程通过for循环对这个number进行+1,每个子线程循环1000000次,两个子线程同时进行。如果一切正常的话,最终这个number会变成2000000,然而现实并非如此。结果可能不是2000000
这种情况称为“脏数据”。产生脏数据的原因是,当一个线程在对数据进行修改时,修改到一半时另一个线程读取了未经修改的数据并进行修改。如何避免脏数据的产生呢?一个办法就是用join方法,即先让一个线程执行完毕再执行另一个线程。但这样的本质是把多线程变成了单线程,失去了多线程的意义。另一个办法就是用线程锁
锁是为了可能出现的数据不同步或数据不对称问题的解决方法,如果需要大量的用户访问相同数据时,为了数据的一致性和安全。那么就需要加锁。
锁相当于是将用户访问需求进行队列化,即第一个用户访问时,后续用户无法进行相同数据的访问。直到第一个用户访问完成后,由队列中的第二个用户继续访问。
threading模块中提供了5种最常见的锁,下面是按照功能进行划分:
Lock锁的称呼有很多,如:
它们是什么意思呢?如下所示:
同步锁一次只能放行一个线程,一个被加锁的线程在运行时不会将执行权交出去,只有当该线程被解锁时才会将执行权通过系统调度交由其他线程。
一把锁有两个状态:locked
和 unlocked
状态。锁刚被创建的时候是处于 unlocked
状态。
lock.acquire()
会让锁从 unlocked -> locked
。
lock.release()
会让锁从 locked -> unlocked
。
如果锁已经处于 locked
状态,对它使用 acquire()
的线程会被阻塞,直到另一个线程调用了 release()
使该锁解锁。
release()
方法只能在锁locked
时别调用,并释放锁。否则会抛出RuntimeError
错误。
import time
import threading
number = 0
lock = threading.Lock() # 实例化一个锁
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
global number
for i in range(1000000):
lock.acquire() # 开锁,只允许当前线程访问共享的数据
number += 1
lock.release() # 释放锁,允许其他线程访问共享数据
for i in range(1, 3):
t = MyThread(i)
t.start()
# 给5秒钟让两个子线程执行完毕
time.sleep(5)
# 确保两个子线程执行完毕
print("活跃的线程个数:", threading.active_count())
# 输出最终数值
print("number: ", number) # 2000000
RLock和Lock的用法相同,区别在于:Lock只能开一次然后释放一次,不能开多次,而RLock可以开多次,再进行多次释放[5]。当然需要注意的是:RLock中虽然可以开多次,但是acquire和release的次数必须对应。
import threading
#创建一个rlock对象
lock = threading.RLock()
#初始化共享资源
abce = 0
#本线程访问共享资源
lock.acquire() # 加锁
abce = abce + 1
#这个线程尝试访问共享资源
lock.acquire() # 再次加锁
abce = abce + 2
lock.release()# 释放里面的锁
lock.release() # 释放外面的锁
print(abce)
注意: acquire是可以设置锁的超时时间,来防止死锁
就像一个餐桌吃饭,桌子上有一把叉子和一份面,只有两个都拿到的人才能吃面,那么二个人抢夺的情况下就必然会出现一个人拿着叉子不松手,一个人端着面不松手。这样就僵在那了。导致的死锁现象。
特点:
1. 一个线程中,存在一把以上的锁。
2. 多把锁交替使用。
解决办法: 把要干的事情理清楚 ,最好是吧加锁的过程都理清楚, 然后把锁相交的解开就行了
由于threading.Lock()和 threading.RLock() 对象中实现了enter__()与__exit()方法,故我们可以使用with语句进行上下文管理形式的加锁和解锁操作
lock = threading.Lock()
def add():
with lock:
# 自动加锁
global num
for i in range(10_000_000):
num += 1
# 自动解锁
条件锁是在递归锁的基础上增加了能够暂停线程运行的功能。 创建方式: condtion = threading.Condition()
`
有以下几种方法:
acquire([timeout]):调用关联锁的方法
release():解锁
wait():使线程进入 Condition 的等待池等待通知并释放解锁。使用前线程必须已获得锁定,否则将抛出异常。
notify():从等待池挑选一个线或几个线程并通知,收到通知的线程将自动调用 acquire() 尝试获得,其他线程仍然在等待池中等待通知,直到该线程收到通知 调用该方法,否则将会抛出异常。
notify_all():跟notify() 一样,但这个方法对应的是所有的线程。
我们一般用的使用条件锁最多的场景就是生产者和消费者模式:
import os
import threading
import threading
import time
condtion = threading.Condition()
sheep = ['1件产品','1件产品','1件产品','1件产品','1件产品']
class Producer(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
pass
def run(self):
global condtion, sheep
while True:
time.sleep(1)
condtion.acquire()
if len(sheep) < 10:
print(self.name + "生产了1件产品")
sheep.append('1件产品')
condtion.notify_all() # 通知所有消费者
else:
print("仓库满了,停止生产!")
condtion.wait()
condtion.release()
class Customer(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
pass
def run(self):
global condtion, sheep
while True:
time.sleep(0.5)
condtion.acquire() # 加锁
if len(sheep) > 0:
meat = sheep.pop()
print(self.name + "购买了" + meat + "还剩多少" + str(len(sheep)) + "件")
condtion.notify_all()
else:
print("买光了,等待")
condtion.wait() # 等待生产者唤醒
condtion.release() # 解锁
if __name__ == '__main__':
p1 = Producer("1号生产车间")
p2 = Producer("2号生产车间")
p3 = Producer("3号生产车间")
p4 = Producer("4号生产车间")
p5 = Producer("5号生产车间")
p6 = Producer("6号生产车间")
p1.start()
p2.start()
p4.start()
p5.start()
p6.start()
c1 = Customer('小王')
c2 = Customer('小李')
c3 = Customer('小贾')
c4 = Customer('小沈')
c5 = Customer('小刘')
c1.start()
c2.start()
c3.start()
c4.start()
c5.start()
它内部维护了一个计数器,每一次acquire操作都会让计数器减1,每一次release操作都会让计数器加1,当计数器为0时,任何线程的acquire操作都不会成功,Semaphore确保对资源的访问有一个上限, 这样,就可以控制并发量。
如果使用Lock,RLock,那么只能有一个线程获得对资源的访问,但现实中的问题并不总是这样,假设这样一个场景,一个线程安全的操作,同一个时刻可以允许两个线程进行,如果太多了效率会降低,那么Lock,Rlock,包括Condition就不适合这种场景。
假设你写了一个多线程爬虫,起10个线程去爬页面,10个线程访问过于频繁了,目标网站对你采取反爬措施。但你把线程数量降到2两个就没问题了。那么对于这个场景,你仍然可以启动10个线程,只是向目标网站发送请求的这个操作,你可以用Semaphore来控制,使得同一个时刻只有两个线程在请求页面
在比如说厕所有三个坑,那最多只允许三个人上厕所,后面的人只能等前面的人出来才能进去。
import threading
import time
semaphore = threading.Semaphore(2) #创建并发限制锁
def worker(id):
print('thread {id} acquire semaphore'.format(id=id))
semaphore.acquire()
print('thread {id} get semaphore do something'.format(id=id))
time.sleep(2)
semaphore.release()
print('thread {id} release semaphore'.format(id=id))
for i in range(10):
t = threading.Thread(target=worker, args=(i, ))
t.start()
Python提供了Event对象用于线程间通信,它是由线程设置的信号标志,如果信号标志位真,则其他线程等待直到信号接触。Event对象实现了简单的线程通信机制,它提供了设置信号,清除信号,等待等…用于实现线程间的通信。
Event相关函数介绍
set() — 将事件标志设置为 True, 通知所有在等待状态(wait)的线程恢复运行;
isSet() — 获取标志Flag当前状态,返回True 或者 False;
wait() — 如果标志状态是False线程将会处于阻塞状态,直到等待标志的状态为True恢复运行;
clear() — 将标志设置为False
import threading
import time
import threading
event = event_obj = threading.Event() # 创建事件对象
def student(name):
print('学生%s 正在听课' % name)
event.wait(2)
print('学生%s 课间活动' % name)
def teacher(name):
print('老师%s 正在授课' % name)
time.sleep(7)
event.set()
if __name__ == '__main__':
stu1 = threading.Thread(target=student, args=('allen',))
stu2 = threading.Thread(target=student, args=('wxx',))
stu3 = threading.Thread(target=student, args=('yxx',))
t1 = threading.Thread(target=teacher, args=('winnie',))
t1.start()
stu1.start()
stu2.start()
stu3.start()
t1.join()
stu1.join()
stu2.join()
stu3.join()
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
注意: ThreadPoolExecutor无法调用类中的方法,原因是实例方法不能被pickle。只能调用非类中的方法,或者类中的@staticmethod方法。
Exectuor 提供了如下常用方法:
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future 提供了如下方法:
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束,如果结束返回True否则返回False
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown(
注意: 当程序使用 Future 的 result() 方法来获取结果时,该方法会阻塞当前线程,如果没有指定 timeout 参数,当前线程将一直处于阻塞状态,直到 Future 代表的任务返回。
wait 方法可以让主线程阻塞,直到满足设定的要求。wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。
等待条件默认为ALL_COMPLETED,表明要等待所有的任务都结束。
等待条设置为 FIRST_COMPLETED,表示第一个任务完成就停止等待。
等待条件设置为 FIRST_EXCEPTION,表示某个线程中出现异常结束等待,没有异常的话等同ALL_COMPLETED
前面程序调用了 Future 的 result() 方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result() 方法的阻塞才会被解除。如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的 add_done_callback()
` 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
另外,由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池
import threading
from concurrent.futures import ThreadPoolExecutor
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
def get_result(future):
print(future.result())
if __name__ == '__main__':
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
print('--------------')
上面主程序分别为 future1、future2 添加了同一个回调函数,该回调函数会在线程任务结束时获取其返回值。
主程序的最后一行代码打印了一条横线。由于程序并未直接调用 future1、future2 的 result() 方法,因此主线程不会被阻塞,可以立即看到输出主线程打印出的横线。接下来将会看到两个新线程并发执行,当线程任务执行完成后,get_result() 函数被触发,输出线程任务的返回值。
此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1)
方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。
import time
from concurrent.futures import ThreadPoolExecutor
def get_html(i):
time.sleep(i)
print("get page {} success".format(i))
return i
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=2)
# map会根据参数的个数批量创建并且启动线程,并将对应的参数传递到线程方法中,然后会一直阻塞到所有方法结束并返回,(返回的顺序和参数的顺序一致)
for data in executor.map(get_html, [1,3,2,4]):
print("get {} page".format(data))