本章将以通俗易懂、贴合实际的方式介绍以下内容:
协程、线程池和多进程都是并发编程的方式,但它们在实现方式、适用场景和性能方面有一些区别。下面是它们的主要区别:
协程
为什么协程比多线程效率高?
因为在传统的多线程环境中,操作系统需要在不同的线程之间切换,这涉及保存和恢复线程状态、上下文切换等操作,这些操作会引入一定的开销
相比之下,协程的切换则是在用户空间内完成的,不需要操作系统的干预,因此切换的开销非常小。这使得协程可以在一个线程内并发执行多个任务,而不需要频繁地切换线程。
当一个协程被阻塞(例如等待 I/O 操作)时,事件循环可以迅速切换到另一个协程,而无需承担完整线程切换的开销。这使得协程切换比线程切换更轻量级。
协程的效率提升来自于在切换时不依赖操作系统进行线程管理,避免了线程切换的开销。
线程池
多进程
多进程和多线程及协程都是通过任务切换来实现并发,只不过协程的任务切换开销较小,而go是真正的在一时刻执行多个任务。
在异步编程中,事件通常与异步操作的完成相关。如网络请求、文件读写的完成或用户输入发生时。例如,当网络请求完成、文件读写操作完成时,这些都可以被视为事件。这些事件会恢复之前等待的协程,以便程序能够继续执行后续的操作。
事件循环(Event Loop)是异步编程中的一个核心概念。它描述了程序中的一个持续运行的循环,这个循环等待并处理事件。事件循环不断地查看这些事件,并根据这些事件调用相应的处理函数(通常是异步函数或回调函数)。举个例子:
想象一下你正在一个餐厅工作,你是服务员。餐厅里有很多顾客,他们各自点了菜(这就像是异步操作),而你的任务就是等待这些菜做好(等待异步操作完成),然后把菜送到对应的顾客桌上(调用处理函数)。
事件循环就相当于你这个服务员。你不停地查看是否有顾客的菜已经做好(检查事件),一旦有菜做好,你就立刻把菜送到对应的顾客桌上(处理事件)。这个过程是持续不断的,直到餐厅关门(程序结束)。
在Python的asyncio库中,事件循环是一个具体的对象,它负责调度和执行异步任务(协程),等待 I/O 操作完成。你可以使用asyncio.get_event_loop()来获取这个对象,并调用它的方法来运行协程或等待某个协程完成。
asyncio是Python标准库中的模块,用于编写并发的异步代码,允许在单线程中同时执行多个任务。
下面主要介绍如何定义和运行一个异步函数
- import asyncio
-
- async def say_hello():
- print("hello")
- await asyncio.sleep(1)
- print("world")
-
- if __name__ == "__main__":
- loop = asyncio.get_event_loop() #获取当前的事件循环
- loop.run_until_complete(say_hello()) #启动事件循环并运行给定的协程,直到该协程完成。
- loop.close() #关闭事件循环
从Python 3.7起,推荐使用asyncio.run(say_hello())来替代获取事件循环、运行协程和关闭事件循环的步骤。在 Python 3.10 及以后的版本中,asyncio.get_event_loop() 和相关函数已被弃用。
- import asyncio
-
- async def say_hello():
- print("hello")
- await asyncio.sleep(1)
- print("world")
-
- if __name__ == "__main__":
- asyncio.run(say_hello())
在windows下,某些时候使用asyncio.run()会报错:https://www.yuque.com/dafcs-dqalz/uz9oso
上面的say_hello 协程案例并没有直接展示协程在并发方面的优势。为了演示并发,我们可以创建多个协程并使用asyncio.gather() 运行它们,比如模拟多个异步任务同时进行的场景。
await asyncio.gather(*tasks) 用于并发地运行多个协程(coroutine)并等待它们全部完成。详细解释:
- import asyncio, time
-
- async def say_hello():
- print("hello")
- await asyncio.sleep(2)
- print("world")
-
- async def main():
-
- # 创建并运行三个协程,放入列表中
- tasks = [
- say_hello(),
- say_hello(),
- say_hello()
- ]
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- start_time = time.time() # 记录开始时间
- asyncio.run(main())
- end_time = time.time() # 记录结束时间
- print(f"程序运行时间: {end_time - start_time:.2f} 秒") #输出 程序运行时间: 2.00 秒
由于 say_hello 协程没有返回值,所以 await asyncio.gather(*tasks) 的结果将是一个包含三个 None 的元组

下面介绍了两种协程创建方式之间的差异:直接调用协程 vs asyncio.create_task()
在上面我们直接调用了say_hello()协程函数,并将返回的协程对象放入了tasks列表中
- # 创建并运行三个协程,放入列表中
- tasks = [
- say_hello(),
- say_hello(),
- say_hello()
- ]
- await asyncio.gather(*tasks)
还有一种是使用asyncio.create_task()显式地创建协程任务。这个函数会立即调度协程的执行(虽然它不会立即完成,因为协程是异步的),并返回一个Task对象,这个对象可以被用来取消任务、检查任务状态等。
- # 创建并运行三个协程,放入列表中
- tasks = []
- for _ in range(3):
- task = asyncio.create_task(say_hello()) # 显式地创建协程任务
- tasks.append(task)
-
- await asyncio.gather(*tasks) # 等待所有任务完成
asyncio.create_task() 与 直接调用协程的具体差异:
综上所述,使用asyncio.create_task()来创建和管理协程任务通常是一个更好的选择,因为它提供了更多的灵活性、可读性和资源管理功能。然而,在某些简单的场景下,直接调用协程也是可行的。
下面主要介绍使用asyncio.Semaphore()限制协程并发数量。
如下,当要执行100个携程时,程序默认会尝试同时运行它们,所耗时间也为2秒。然而,如果我们不希望程序同时运行过多协程以节省资源或避免性能问题,我们可以通过控制协程的并发数量来优化程序的执行。这样做可以确保在任何时候只有一定数量的协程在运行,从而更有效地管理资源并提高程序的稳定性和响应性。
- import asyncio, time
-
- async def say_hello():
- print("hello")
- await asyncio.sleep(2)
- print("world")
-
- async def main():
-
- tasks = []
- for _ in range(100):
- task = asyncio.create_task(say_hello())
- tasks.append(task)
-
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- start_time = time.time() # 记录开始时间
- asyncio.run(main())
- end_time = time.time() # 记录结束时间
- print(f"程序运行时间: {end_time - start_time:.2f} 秒") #输出 程序运行时间: 2.02 秒
要控制协程的并发数量,可以使用asyncio.Semaphore。Semaphore是一个基于计数器的同步原语,用于限制对共享资源的并发访问数量。在异步编程中,你可以用它来限制同时运行的协程数量。
限制同时运行的携程数量最大为20,程序所耗时间由原来的2s变为2 x 5 = 10s
- import asyncio, time
-
- async def say_hello(sem):
- async with sem:
- print("hello")
- await asyncio.sleep(2)
- print("world")
-
- async def main():
- sem = asyncio.Semaphore(20)
- tasks = []
- for _ in range(100):
- task = asyncio.create_task(say_hello(sem))
- tasks.append(task)
-
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- start_time = time.time() # 记录开始时间
- asyncio.run(main())
- end_time = time.time() # 记录结束时间
- print(f"程序运行时间: {end_time - start_time:.2f} 秒") #输出 程序运行时间: 10.06 秒
当你使用 asyncio 编写协程代码并希望发送HTTP请求时,你应该使用异步的HTTP库,如 aiohttp。而requests 库是一个同步的HTTP库,它不支持在协程中直接使用,因为它会阻塞事件循环,导致协程并发执行的优势无法体现。
aiohttp是基于asyncio的第三方库,专注于异步HTTP请求和Web应用的开发。它们的结合使得编写高性能的异步网络通信变得更加方便和灵活。
主要特点:
1. 实现方式
2. 性能特点
3. 适用场景
4. 其他功能
首先安装aiohttp
pip install aiohttp
对列表中的url批量请求,如果响应码为200则使用print输出。
- import asyncio
- import aiohttp
-
- async def fetch(session, url):
- async with session.get(url) as response:
- print(url, response.status)
-
- async def main():
- async with aiohttp.ClientSession() as session:
- # 创建一个任务列表
- tasks = []
- urls = [
- 'http://127.0.0.1',
- 'http://192.168.59.132',
- 'http://www.baidu.com',
- 'http://127.0.0.1',
- 'http://192.168.59.132',
- 'http://www.baidu.com',
- 'http://127.0.0.1',
- 'http://192.168.59.132',
- 'http://www.baidu.com'
- ]
-
- # 为每个URL创建一个fetch任务
- for url in urls:
- task = asyncio.create_task(fetch(session, url))
- tasks.append(task)
-
- # 等待所有任务完成
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- asyncio.run(main())
可以看到cmd中的输出并不会像线程池一样乱序,print时无需使用with lock。因为协程的执行顺序是由事件循环控制的

async with aiohttp.ClientSession() as session 用于创建一个 HTTP 会话(session)对象,并自动管理其生命周期。主要特点如下
关于复用 TCP 连接解释:
上面async with aiohttp.ClientSession() as session放在了main函数中,我们也可以fetch函数中,都能正常工作。但是它们之间的区别是什么?
放在main函数中:
放在fetch函数中:
建议:
- import asyncio
- import aiohttp
-
- async def fetch(url):
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- print(url, response.status)
-
- async def main():
-
- # 创建一个任务列表
- tasks = []
- urls = [
- 'http://127.0.0.1',
- 'http://192.168.59.132',
- 'http://www.baidu.com',
- 'http://127.0.0.1',
- 'http://192.168.59.132',
- 'http://www.baidu.com',
- 'http://127.0.0.1',
- 'http://192.168.59.132',
- 'http://www.baidu.com'
- ]
-
- # 为每个URL创建一个fetch任务
- for url in urls:
- task = asyncio.create_task(fetch(url))
- tasks.append(task)
-
- # 等待所有任务完成
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- asyncio.run(main())
“async with session.get(url) as response”在Python的异步编程中用于发起一个HTTP GET请求,并管理HTTP响应对象的生命周期。
该请求会返回一个HTTP响应对象,这个对象包含了HTTP响应的所有信息,如状态码、响应头、响应体等。
- header = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'}
- session.get(url=url, headers=header)
- #表示整个请求(包括连接和读取)的超时时间为15秒。
- session.get(url, timeout=aiohttp.ClientTimeout(total=15))
- #默认情况下,allow_redirects 参数的值为 True,即自动重定向。
- session.get(allow_redirects=False)
- ##所有http或https的url都会走8080代理端口
- session.get(url=url, proxy='http://127.0.0.1:8080')
- connector = aiohttp.TCPConnector(ssl=False)
- async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url) as response 语句中的 response 对象是一个HTTP响应的对象,它提供了多种方法和属性来访问响应的不同部分。
- #获取一个异步上下文管理器,response 就是异步上下文管理器返回的响应对象
- async with session.get(url) as response:
- #获取响应码,会自动处理重定向。即获取的是重定向后的状态码
- code = response.status
- #以字符串的形式返回响应体的内容。或者使用await response.read()来获取原始的字节响应体。
- await response.text()
- #返回一个包含响应内容的 bytes 对象。你可以使用它来获取响应的原始字节内容。
- content = response.read()
- #响应头
- headers = response.headers
和普通的异常处理一样,如下捕获http请求的错误
- async def fetch(session, url):
- try:
- async with session.get(url) as response:
- print(url, response.status)
- except Exception as e:
- print(str(e))
如下手动实现重试逻辑,当max_retries = 2时,当请求错误时会自动重试两次
- import asyncio
- import aiohttp
-
- max_retries = 2 # 设置最大重试次数
-
- async def fetch(session, url, retries=0):
- try:
- async with session.get(url, proxy='http://127.0.0.1:8080', timeout=aiohttp.ClientTimeout(total=3)) as response:
- print(url, response.status)
- except Exception as e:
- if retries < max_retries:
- # 递归调用自身,增加重试次数
- await asyncio.sleep(0.5) # 可以添加等待时间,避免过于频繁的请求
- return await fetch(session, url, retries + 1)
- else:
- pass
-
- async def main():
- async with aiohttp.ClientSession() as session:
- # 创建一个任务列表
- tasks = []
- urls = [
- 'http://192.168.59.132',
- 'http://192.168.59.134'
- ]
-
- # 为每个URL创建一个fetch任务
- for url in urls:
- task = asyncio.create_task(fetch(session, url))
- tasks.append(task)
-
- # 等待所有任务完成
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- asyncio.run(main())
在批量请求时,往往需要将结果保存,如保存到列表中,此时我们需要使用同步机制来确保资源在任一时刻只有一个协程可以访问和修改列表。当一个协程获得了锁并准备修改列表时,其他尝试获取同一锁的协程将被阻塞,直到锁被释放为止。
在asyncio中,通常通过asyncio.Lock实现。
- import asyncio
- import aiohttp
-
- max_retries = 2 # 设置最大重试次数
- alive_code = [200, 301, 302, 303, 304, 401, 403]
- results = []
- # 创建一个锁来保护对results的写入
- lock = asyncio.Lock()
-
- async def dirfuzzMain(session, sem, path, retries=0):
- async with sem:
- try:
- async with session.get(path, proxy='http://127.0.0.1:8080', timeout=aiohttp.ClientTimeout(total=3)) as response:
- code = response.status
- if code in alive_code:
- print(path, code)
- # 使用锁来保护对results的写入
- async with lock:
- results.append((path, code))
-
- except Exception as e:
- if retries < max_retries:
- # 递归调用自身,增加重试次数
- await asyncio.sleep(0.5) # 可以添加等待时间,避免过于频繁的请求
- return await dirfuzzMain(session, sem, path, retries + 1)
- else:
- pass
-
- async def main():
- url = "http://192.168.59.132"
- dicc = [line.strip().rstrip('/') for line in open("dicc.txt", "r", encoding="utf-8")]
-
- sem = asyncio.Semaphore(60)
- connector = aiohttp.TCPConnector(ssl=False)
- async with aiohttp.ClientSession(connector=connector) as session:
- # 创建一个任务列表
- tasks = []
- # 为每个URL创建一个fetch任务
- for path in dicc:
- path = url + "/" + path
- task = asyncio.create_task(dirfuzzMain(session, sem, path))
- tasks.append(task)
-
- # 等待所有任务完成
- await asyncio.gather(*tasks)
- print(results)
-
- if __name__ == "__main__":
- loop = asyncio.get_event_loop() #获取当前的事件循环
- loop.run_until_complete(main()) #启动事件循环并运行给定的协程,直到该协程完成。
tqdm 通常用于在 Python 中显示一个进度条,特别是在循环或迭代过程中。它可以帮助用户直观地了解代码的执行进度。
在协程中使用 tqdm
使用 with 语句
- import asyncio
- import aiohttp
- from tqdm.asyncio import tqdm # 导入 tqdm 的异步版本
-
- max_retries = 1 # 设置最大重试次数
- alive_code = [200, 301, 302, 303, 304, 401, 403]
- results = []
- lock = asyncio.Lock()
-
- async def dirfuzzMain(session, sem, path, progress_bar, retries=0):
- async with sem:
- try:
- async with session.get(path, proxy='http://127.0.0.1:8080', timeout=aiohttp.ClientTimeout(total=3)) as response:
- code = response.status
- if code in alive_code:
- tqdm.write(path + " " + str(code))
- progress_bar.update()
- async with lock:
- results.append((path, code))
- else:
- progress_bar.update()
-
- except Exception as e:
- if retries < max_retries:
- # 递归调用自身,增加重试次数
- await asyncio.sleep(0.5) # 可以添加等待时间,避免过于频繁的请求
- return await dirfuzzMain(session, sem, path, progress_bar, retries + 1)
- else:
- progress_bar.update()
-
- async def main():
- url = "http://192.168.59.132"
- dicc = [line.strip().rstrip('/') for line in open("dicc.txt", "r", encoding="utf-8")]
-
- sem = asyncio.Semaphore(3)
- connector = aiohttp.TCPConnector(ssl=False)
- with tqdm(total=len(dicc), desc="Requesting", bar_format='{desc}: {percentage:.0f}% ({n_fmt}/{total_fmt}) {elapsed}') as progress_bar:
- async with aiohttp.ClientSession(connector=connector) as session:
- # 创建一个任务列表
- tasks = []
- # 为每个URL创建一个fetch任务
- for path in dicc:
- path = url + "/" + path
- task = asyncio.create_task(dirfuzzMain(session, sem, path, progress_bar))
- tasks.append(task)
- await asyncio.gather(*tasks)
-
- if __name__ == "__main__":
- loop = asyncio.get_event_loop() #获取当前的事件循环
- loop.run_until_complete(main()) #启动事件循环并运行给定的协程,直到该协程完成。

syncio 协程运行在事件循环中,而事件循环本身可以捕获到 KeyboardInterrupt 并优雅地停止,因为它控制着协程的执行。当在协程中发生 KeyboardInterrupt 时,事件循环会开始取消所有挂起的任务,并等待它们完成或取消。
然而,线程池中的线程则不会直接捕获 Ctrl+C 信号,因为它们通常不直接与信号处理系统交互。Ctrl+C 会被主线程捕获,并触发一个 KeyboardInterrupt 异常。但是,这个异常只会在主线程中抛出,并不会自动传播到线程池中的其他线程。因此,线程池中的线程通常不会直接响应 Ctrl+C。
从 Python 3.7 开始,asyncio.run() 会自动处理 KeyboardInterrupt,并且会尝试优雅地取消并停止所有正在运行的任务。

- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print("Exiting due to Ctrl+C...")

如何在捕获ctrl+c的时候,暂停程序并提示用户是否继续还是退出?在协程中涉及到更复杂的处理逻辑。
请求1000多个url


虽在https://www.yuque.com/dafcs-dqalz/uz9oso/kd4dgha3mk62trse#cqjcb中推荐使用asyncio.run()来运行协程,但是在windows下某些时候它会报错

解决办法是还是将 asyncio.run(main())替换为如下旧的运行方式
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
