当线程创建完成之后,并不会马上执行线程,而是等待某一事件发生,线程才会启动
- import threading
-
- # # 创建 event 对象
- # event = threading.Event()
- # # 重置代码中的 event 对象,使得所有该event事件都处于待命状态
- # event.clear()
- # # 阻塞线程,等待 event 指令
- # event.wait()
- # # 发送 event 指令,使得所有设置该 event 事件的线程执行
- # event.set()
-
- class MyThreading(threading.Thread):
- def __init__(self, event):
- super().__init__()
- self.event = event
-
- def run(self):
- print('线程{}已经初始化完成,随时准备启动...'.format(self.name))
- # 阻塞线程,让线程等待指令后再启动
- self.event.wait()
- print('{}开始执行...'.format(self.name))
-
- if __name__ == '__main__':
- event = threading.Event()
- # 创建 10 个自定义线程对象并放入列表
- threads = [MyThreading(event) for i in range(10)]
-
- # 重置代码中的 event 对象,使得所有该event事件都处于待命状态
- event.clear()
-
- # 执行线程
- # 执行到 run 方法中 self.event.wait() 位置,即打印了:线程{}已经初始化完成...
- [t.start() for t in threads]
-
- # 发送 event 指令,使得所有设置该 event 事件的线程执行
- # 即启动 threads 列表中的所有线程
- # 接着执行 run 方法中 self.event.wait() 后面的代码,即打印了:{}开始执行...
- event.set()
-
- [t.join() for t in threads]
- import threading
-
- # condition 对象适用于线程轮流执行,或一个线程等待另一个线程的情况,如两个人的对话等
-
- # 创建 condition 对象
- cond = threading.Condition()
-
- class ThreadA(threading.Thread):
- def __init__(self, cond, name):
- super().__init__(name=name)
- self.cond = cond
-
- def run(self):
- # 获取锁
- self.cond.acquire()
- # 线程A说了第一句话
- print(self.getName(), ':一二三四五')
- # 唤醒其他处于 wait 状态的线程(通知线程B可以说话了)
- self.cond.notify()
- # 线程A进入 wait 状态,等待线程B通知(唤醒)
- self.cond.wait()
-
- # 被线程A唤醒后说了第二句话
- print(self.name, ':山无棱,天地合,乃敢与君绝')
- self.cond.notify() # 通知线程B
- self.cond.wait() # 等待线程B通知
-
- # 被线程A唤醒后说了第三句话,最后一句话
- print(self.name, ':有钱吗?借点')
- self.cond.notify() # 通知线程B
- self.cond.release() # 释放锁
-
-
- class ThreadB(threading.Thread):
- def __init__(self, cond, name):
- super().__init__(name=name)
- self.cond = cond
-
- def run(self):
- # 获取锁
- self.cond.acquire()
- self.cond.wait() # 由于它不是第一个说话的人,所以一开始等待通知
-
- # 线程B说了第一句话
- print(self.getName(), ':上山打老虎')
- # 唤醒其他处于 wait 状态的线程(通知线程A可以说话了)
- self.cond.notify()
- # 线程B进入 wait 状态,等待线程A通知(唤醒)
- self.cond.wait()
-
- # 被线程B唤醒后说了第二句话
- print(self.name, ':海可枯,石可烂,激情永不散')
- self.cond.notify() # 通知线程A
- self.cond.wait() # 等待线程A通知
-
- # 被线程B唤醒后说了第三句话,最后一句话
- print(self.name, ':没有,滚')
- # self.cond.notify() # 已经是最后一句话,不需要通知线程A
- self.cond.release() # 释放锁
-
- if __name__ == '__main__':
- a = ThreadA(cond, 'AAA')
- b = ThreadB(cond, 'BBB')
-
- # 线程A先说话,但是不能先启动线程A
- # 因为如果启动了线程A,然后线程A说完第一句话后,通知线程B
- # 但是此时线程B没有启动,就接收不了A的通知,B就会一直处于 wait 状态,即说不了话,也通知不了A
- # A等不到B的通知,也会一直处于 wait 状态
- # a.start()
- # b.start()
-
- b.start()
- a.start()
在使用多线程的过程中,会有一种变量的使用场景: 一个变量会被所有的线程使用,但是每个线程都会对该变量设置不同的值, threading.local() 提供了这种变量
- """
- 在使用多线程的过程中,会有一种变量的使用场景:
- 一个变量会被所有的线程使用,但是每个线程都会对该变量设置不同的值
- threading.local() 提供了这种变量
- 假设有一个场景:
- 设置一个 threading.local 变量,然后新建两个线程
- 分别设置这两个 threading.local 的值
- 再分别打印这两个 threading.local 的值
- 看每个线程打印出来的 threading.local 值是否不一样
- """
- import threading
-
- # local_data 实际上是一个对象
- local_data = threading.local()
- # 设置 local_data 的名字
- local_data.name = 'local_data'
-
-
- class MyThread(threading.Thread):
- def run(self):
- print('赋值前-子线程:', threading.currentThread(), local_data.__dict__)
- # 在子线程中修改 local_data.name 的值
- local_data.name = self.getName()
- print('赋值后-子线程:', threading.currentThread(), local_data.__dict__)
-
-
- if __name__ == '__main__':
- print('开始前-主线程:', local_data.__dict__)
-
- t1 = MyThread()
- t1.start()
- t1.join()
-
- t2 = MyThread()
- t2.start()
- t2.join()
-
- print('结束后-主线程:', local_data.__dict__)
-
- """
- 输出结果:
- 开始前-主线程: {'name': 'local_data'}
- 赋值前-子线程:
{} - 赋值后-子线程:
{'name': 'Thread-1'} - 赋值前-子线程:
{} - 赋值后-子线程:
{'name': 'Thread-2'} - 结束后-主线程: {'name': 'local_data'}
- """
线程池中存放多个线程,当有业务需要线程来执行时,可以直接从线程池中获取一个线程来执行该业务, 业务执行完毕之后,线程不会释放,而是被放回线程池中,从而节省了线程的创建以及销毁的时间。 Python concurrent.futures 模块中的 ThreadPoolExecutor 就提供了线程池,该线程池有以下特点:
- from concurrent.futures import ThreadPoolExecutor
- import time
-
- # 创建线程池对象,并指定线程池中最大的线程数为 3
- # 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
- # 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
- # 再将返回的线程分配给需要的业务
- executor = ThreadPoolExecutor(max_workers=3)
-
- # 定义一个业务
- # 假设这里模拟一个爬虫,爬取一个网页页面
- def get_html(timers):
- time.sleep(timers) # 模拟耗时操作
- print('获取网页信息{}完毕'.format(timers))
- return timers
-
- # 提交要执行的函数,即要完成的业务到线程池中,然后线程池就会自动分配线程去完成对应的业务
- # submit 方法会立即返回,不会阻塞主线程
- # get_html 的参数放在后面,即 1 会作为参数传递给 get_html() 中的 timers
- # 以下创建了四个任务
- task1 = executor.submit(get_html, 1)
- task2 = executor.submit(get_html, 2)
- task3 = executor.submit(get_html, 3)
- task4 = executor.submit(get_html, 4)
-
- bool1 = task1.done() # 检查任务是否完成,完成返回 True
- bool2 = task2.cancel() # 取消任务执行,只有该任务没有被放入线程池中才能取消成功,成功返回 True
-
- # 拿到任务执行的结果,如 get_html 的返回值
- # timeout 参数用于设置等待结果的最长等待时间,单位为秒
- # result 方法是一个阻塞方法
- timers = task3.result(timeout=10)
- print(timers)
- print(111)
- # 线程池的简单应用
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import time
-
- # 创建线程池对象,并指定线程池中最大的线程数为 3
- # 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
- # 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
- # 再将返回的线程分配给需要的业务
- executor = ThreadPoolExecutor(max_workers=3)
-
- # 定义一个业务
- # 假设这里模拟一个爬虫,爬取一个网页页面
- def get_html(timers):
- time.sleep(timers) # 模拟耗时操作
- print('获取网页信息{}完毕'.format(timers))
- return timers
-
- # 模拟要爬取的 url
- urls = [1, 2, 3]
- # 通过列表推导式构造多线程任务
- all_tasks = [executor.submit(get_html, url) for url in urls]
- # as_completed 接收一个可迭代对象
- # as_completed 是一个生成器,当任务没有完成时,它会阻塞,只有当任务结束返回结果时才会继续往下执行
- # as_completed 函数的作用:拿到所有任务执行完毕之后的结果
- # 不需要我们手动调用 done 方法不停地判断任务是否完成
- for item in as_completed(all_tasks):
- data = item.result()
- print('主线程中获取任务的返回值是{}'.format(data))
- """
- 执行结果:
- 获取网页信息1完毕
- 主线程中获取任务的返回值是1
- 获取网页信息2完毕
- 主线程中获取任务的返回值是2
- 获取网页信息3完毕
- 主线程中获取任务的返回值是3
- """
- from concurrent.futures import ThreadPoolExecutor
- import time
-
- # 创建线程池对象,并指定线程池中最大的线程数为 3
- # 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
- # 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
- # 再将返回的线程分配给需要的业务
- executor = ThreadPoolExecutor(max_workers=3)
-
- # 定义一个业务
- # 假设这里模拟一个爬虫,爬取一个网页页面
- def get_html(timers):
- time.sleep(timers) # 模拟耗时操作
- print('获取网页信息{}完毕'.format(timers))
- return timers
-
- # 模拟要爬取的 url
- urls = [4, 2, 3]
-
-
- # map 方法和 as_complete 类似
- # map 也是一个生成器,当任务没有完成时,它会阻塞,只有当任务结束返回结果时才会继续往下执行
- # map 会自动映射 urls 中的每一个元素传递给 get_html 函数,并自动提交 ,不需要通过 submit 方法提交任务
- # map 方法直接拿到任务执行的结果
- # as_complete 和 map 都可以拿到线程池中各个线程执行的结果,但有以下区别:
- # as_complete 会根据任务完成的快慢得到结果,即哪个任务先完成就会先得到该任务的结果
- # 而 map 会严格按照任务的顺序得到结果,比如按照 urls 列表中的映射顺序得到对应的结果
- # 所以两种适用于不同的场景
- for data in executor.map(get_html, urls):
- print('主线程中获取任务的返回值是{}'.format(data))
- """
- 获取网页信息2完毕
- 获取网页信息3完毕
- 获取网页信息4完毕
- 主线程中获取任务的返回值是4
- 主线程中获取任务的返回值是2
- 主线程中获取任务的返回值是3
- """
- from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
- import time
-
- # 创建线程池对象,并指定线程池中最大的线程数为 3
- # 当业务数不超过 3 的时候,ThreadPoolExecutor 就会创建一个新的线程来执行业务
- # 当超过 3 时,ThreadPoolExecutor 不会创建新的线程,而是等待执行其他业务的线程执行完毕后返回
- # 再将返回的线程分配给需要的业务
- executor = ThreadPoolExecutor(max_workers=3)
-
- # 定义一个业务
- # 假设这里模拟一个爬虫,爬取一个网页页面
- def get_html(timers):
- time.sleep(timers) # 模拟耗时操作
- print('获取网页信息{}完毕'.format(timers))
- return timers
-
- # 模拟要爬取的 url
- urls = [4, 2, 3]
-
- all_tasks = [executor.submit(get_html, url) for url in urls]
-
- # 让主线程阻塞,直到参数里的条件成立
- # 根据 wait 函数的参数,条件成立的情况是:所有任务执行完毕
- # ALL_COMPLETED 表示所有任务都执行完成
- # 还有其他的参数,如 FIRST_COMPLETED 表示只要有一个任务完成就条件成立
- wait(all_tasks, return_when=FIRST_COMPLETED)
-
- # 如果想等代码执行完毕之后再打印下列语句,可以使用 wait 语句
- print('代码执行完毕')
使用 concurrent.future 模块提供的 ProcessPoolExecutor 来实现进程池,用法和线程池完全一致,参考上述线程池的使用(建议使用该种方式使用进程池)
下面是基于 Pool 类实现的进程池的使用
- import multiprocessing
- import time
-
- # 定义一个业务
- # 假设这里模拟一个爬虫,爬取一个网页页面
- def get_html(n):
- time.sleep(n) # 模拟耗时操作
- print('子进程{}获取内容成功'.format(n))
- return n
-
- if __name__ == '__main__':
- # 设置进程数,一般设置为和 CPU 数量一致的比较合理
- # multiprocessing.cpu_count() 获取当前主机的 CPU 核心数
- pool = multiprocessing.Pool(multiprocessing.cpu_count())
-
- # apply_async 是一个异步方法
- # apply 是一个同步方法
- # 作用类似于 submit 方法
- result = pool.apply_async(get_html, args=(2,))
-
- pool.close() # 必须在 join 方法前调用,否则会抛出异常
- # join 方法会等待所有的子进程执行完毕之后,才会继续往下执行主进程的代码
- # 即 join 会阻塞主进程代码
- pool.join()
-
- # result.get() 拿到子进程执行结果, get 方法是一个阻塞方法
- print(result.get())
- print('end...')
- print()
-
- # map 方法的使用
- pool = multiprocessing.Pool(multiprocessing.cpu_count())
- # imap 方法会按照列表顺序输出
- # imap_unordered 方法则不会按照列表顺序执行,而是按照任务执行的快慢输出
- for result in pool.imap(get_html, [1, 2, 3]):
- print('{}休眠执行成功'.format(result))
同步信号量的作用是用于控制同时工作的线程数量,如读文件时只能同时允许两个线程读,在爬虫时控制同时爬虫的线程,防止触发网站反扒机制
- import threading
- import time
-
- # 还是模拟一个爬虫
- # HtmlSpider 类负责根据给定的 URL 去爬取网页内容
- class HtmlSpider(threading.Thread):
- def __init__(self, url, sem):
- super().__init__()
- self.url = url
- self.sem = sem
-
- def run(self):
- time.sleep(2)
- print('网页内容获取完成')
- self.sem.release() # 线程完成任务,释放锁
-
- # UrlProducer类负责给 HtmlSpider 类提供网页的 URL
- class UrlProducer(threading.Thread):
- def __init__(self, sem):
- super().__init__()
- self.sem = sem
-
- def run(self):
- for i in range(10):
- self.sem.acquire() # 获取锁,获取成功才能执行线程
- html_thread = HtmlSpider('url{}'.format(i), self.sem) # 创建HtmlSpider线程
- html_thread.start() # 启动线程
-
- if __name__ == '__main__':
- # 创建线程同步信号量
- # 参数 value 指定允许同时工作的线程数
- sem = threading.Semaphore(value=3)
- url_producer = UrlProducer(sem)
- url_producer.start()