• python爬虫-异步爬虫


    注:本文章为学习过程中对知识点的记录,供自己复习使用,也给大家做个参考,如有错误,麻烦指出,大家共同探讨,互相进步。
    借鉴出处:
    该文章的路线和主要内容:崔庆才(第2版)python3网络爬虫开发实战

    前言:爬虫属于IO密集型任务,例如使用request库来爬取某个站点,当发出一个请求后,程序必须等待网站返回响应,才能接着运行,而在等待响应的过程中,整个爬虫程序是一直在等待的,实际上没有做任何事情。

    1、协程的基本原理

    1.1 基础知识

    • 阻塞
      阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在操作上是阻塞的。
    • 非阻塞
      程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情,则称该程序在该操作上是非阻塞的。
      非阻塞并不是在任何程序级别、任何情况下都存在的。仅当程序封装的级别可以囊括独立的子程序单元时,程序才可能存在非阻塞状态。
      非阻塞因阻塞的存在而存在,正因为阻塞导致程序运行的耗时增加与效率低下,我们才要把它变成非阻塞的。
    • 同步
      不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致,此时这些程序单元是同步执行的。
      例如在购物系统中更新商品库存时,需要用“行锁”作为通信信号,强制让不同的更新请求排队并按顺序执行,这里的更新库存操作就是同步的。
    • 异步
      为了完成某个任务,有时不同程序单元之间无须通信协调也能完成任务,此时不相关的程序单元之间可以是异步的。
    • 多进程
      多进程就是利用CPU多核的优势,在同一时间并执行多个任务,可以大大提高执行效率。
    • 协程
      协程(coroutine),又称作微线程、纤程,是一种运行在用户态的轻量级线程。
      协程拥有自己的寄存器上下文和栈。协程在调度切换时,将寄存器上下文和栈保存到其他地方,等切换回来的时候,再恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态。
      协程本质上是个单进程,相对于多进程,它没有线程上下文切换的开销,没有原子操作锁定及同步的开销,编程模型也非常简单。

    1.2 协程的用法

    Python中使用协程最常用的库为asyncio

    • event_loop:事件循环,相当于一个无限虚幻,我们可以把一些函数注册到这个事情循环上,当满足发生条件时,就调用对应的处理方法。
    • coroutine:中文翻译叫协程,在Python中常指代协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件训话调用。我们可以使用async关键字来定义一个方法,这个方法在调用时不会立即被执行,而是会返回一个协程对象。
    • task:任务,这是对协程对昂的进一步封装,包含协程对象的各个状态。
    • future:代表将来执行或者没有执行的任务的结果,实际上和task没有本质区别。
    1.2.1 定义协程

    例子1:了解协程对象、事件循环
    输入:

    import asyncio
    # async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
    async def execute(x):
        print('Number:', x)
    coroutine = execute(1)
    # 此时直接调用async定义的方法,返回的只是一个协程对象
    print('Coroutine:', coroutine)
    print('after execute')
    # 使用get_event_loop()方法创建一个事件循环loop,并调用loop对象的run_until_complete方法将协程对象注册到了事件循环中,才会触发定义的方法。
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)
    print('after loop')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    输出:

    Coroutine: <coroutine object execute at 0x000001E452DDAB40>
    after execute
    Number: 1
    after loop
    
    • 1
    • 2
    • 3
    • 4

    例子2:了解task任务

    import asyncio
    # async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
    async def execute(x):
        print('Number:', x)
        return x
    coroutine = execute(1)
    # 此时直接调用async定义的方法,返回的只是一个协程对象
    print('Coroutine:', coroutine)
    print('after execute')
    # 使用get_event_loop()方法创建一个事件循环loop
    loop = asyncio.get_event_loop()
    # 将协程对象转化为task任务,此时的任务还是pending状态
    task = loop.create_task(coroutine)
    print('Task:', task)
    # 将task任务注册到事件循环中,然后task状态变为了finished,result=1是execute()执行的结果
    loop.run_until_complete(task)
    print('Task:', task)
    print('after loop')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    输出:

    Coroutine: <coroutine object execute at 0x000002776288AB40>
    after execute
    Task: <Task pending name='Task-1' coro=<execute() running at D:\Project\scrape\urllib\haha.py:3>>
    Number: 1
    Task: <Task finished name='Task-1' coro=<execute() done, defined at D:\Project\scrape\urllib\haha.py:3> result=1>
    after loop
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    创建任务的另一种方式:

    task = ascynic.ensure_future(coroutine)
    
    • 1
    1.2.2 绑定回调

    绑定回调的作用就是当协程对象执行完毕之后,就去执行声明的回调函数。
    输入:

    import asyncio
    import requests
    # async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
    async def request():
        url = 'http://www.baidu.com'
        status = requests.get(url)
        return status
    # 定义的回调函数
    def callback(task):
        print('Status:', task.result())
    
    coroutine = request()
    task = asyncio.ensure_future(coroutine)
    # 通过add_done_callback()函数实现协程对象执行完毕后再去执行声明的callback方法的关联。
    task.add_done_callback(callback)
    print('Task:', task)
    # 使用get_event_loop()方法创建一个事件循环loop
    loop = asyncio.get_event_loop()
    # 将task任务注册到事件循环中
    loop.run_until_complete(task)
    print('Task:', task)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    输出:

    Task: <Task pending name='Task-1' coro=<request() running at D:\Project\scrape\urllib\haha.py:4> cb=[callback() at D:\Project\scrape\urllib\haha.py:9]>
    Status: <Response [200]>
    Task: <Task finished name='Task-1' coro=<request() done, defined at D:\Project\scrape\urllib\haha.py:4> result=<Response [200]>>
    
    • 1
    • 2
    • 3

    分析:实际上,即使不适用回调方法,在task运行完毕之后,也可以直接调用result方法获取结果。

    1.2.2 多任务协程

    执行多次请求,可以定义一个task列表,然后使用asyncio包中的wait方法执行。
    输入:

    import asyncio
    import requests
    # async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行。
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url)
        return status
    
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    print('Task1:', tasks)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        print('Task Result:', task.result())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    输出:

    Task1: [<Task pending name='Task-1' coro=<request() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-2' coro=<request() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-3' coro=<requ
    est() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-4' coro=<request() running at D:\Project\scrape\urllib\haha.py:4>>, <Task pending name='Task-5' coro=<request() running at D:\Project\scrape\urllib\haha
    .py:4>>]
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    Task Result: <Response [200]>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    分析:
    loop函数不要放在定义函数内,否则会报错。

    await后面的对象必须是如下格式之一:
    1、一个原生协程对象;
    2、一个由types.coroutine修饰的生成器,这个生成器可以返回协程对象;
    3、由一个包含__await__方法的对象返回的一个迭代器。

    上面声明的方式比较复杂,aiohttp是一个支持异步请求的库,它和asyncio配合使用,可以非常方便地实现异步请求操作。

    2、aiohttp的使用

    安装pip install aiohttp

    2.1 以一个例子开始aiohttp

    输入:

    import asyncio
    import aiohttp
    import time
    
    start = time.time()
    
    async def get(url):
        session = aiohttp.ClientSession()
        response = await session.get(url)
        await response.text()
        await session.close()
        return response
    
    async def request(id):
        url = f'https://www.httpbin.org/deplay/{id}'
        print('Waiting for::::::::',url)
        # 执行get方法时,会被挂起,但get方法内第一步时非阻塞的,挂起后会被立马唤醒
        response =  await get(url)
        print('Get response from:::::::', url, 'response::::::::', response)
    
    tasks = [asyncio.ensure_future(request(i)) for i in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print('耗时:::', end-start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    输入:

    Waiting for:::::::: https://www.httpbin.org/deplay/0
    Waiting for:::::::: https://www.httpbin.org/deplay/1
    Waiting for:::::::: https://www.httpbin.org/deplay/2
    Waiting for:::::::: https://www.httpbin.org/deplay/3
    Waiting for:::::::: https://www.httpbin.org/deplay/4
    Get response from::::::: https://www.httpbin.org/deplay/1 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/1) [404 NOT FOUND]>
    <CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
    entials': 'true')>
    
    Get response from::::::: https://www.httpbin.org/deplay/3 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/3) [404 NOT FOUND]>
    <CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
    entials': 'true')>
    
    Get response from::::::: https://www.httpbin.org/deplay/0 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/0) [404 NOT FOUND]>
    <CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
    entials': 'true')>
    
    Get response from::::::: https://www.httpbin.org/deplay/2 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/2) [404 NOT FOUND]>
    <CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
    entials': 'true')>
    
    Get response from::::::: https://www.httpbin.org/deplay/4 response:::::::: <ClientResponse(https://www.httpbin.org/deplay/4) [404 NOT FOUND]>
    <CIMultiDictProxy('Date': 'Tue, 01 Nov 2022 15:04:54 GMT', 'Content-Type': 'text/html', 'Content-Length': '233', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Cred
    entials': 'true')>
    
    耗时::: 1.092451810836792
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    分析:时间循环会运行第一个task,执行第一个get方法时,会被挂起,但get方法第一步是创建了ClientSession对象,是非阻塞的,挂起后会被立马唤醒。接着执行await session.get是会被挂起等待,此期间事件循环会寻找当前未被挂起的协程继续进行。都被挂起后,请求还没有响应,就继续等待直到获取到结果。

    2.2 基本介绍

      asynic模块内部实现了对TCP、UDP、SSL协议的异步操作,但是对于HTTP请求来说,就需要用aiohttp实现了。
      aiphttp是一个基于asynico的异步HTTP网络模块,它既提供了服务端,有提供了客户端。其中,我们用服务器可以搭建一个支持异步处理的处理器,这个服务器就是用来处理请求并返回响应的,类似于Django、Flask等一些Web服务器。而客户端可以用来发送请求,类似于使用requests发起一个HTTP请求然后获得响应,但requests发起的是通的网络请求,aiohttp则是异步的
    输入:
    aiohttp客户端例子

    import asyncio
    import aiohttp
    
    async def fetch(session,url):
        async with session.get(url) as response:
            return await response.text(), response.status
    
    async def main():
        async with aiohttp.ClientSession() as session:
            html,status =  await fetch(session, 'http://www.baidu.com')
            print(f'html:\n{html}','\n',f'status:{status}')
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    输出:

    html:
    ............
    status: 200
    
    • 1
    • 2
    • 3

    分析:

      aiohttp实现的异步爬取,与之前的定义有明显的区别,主要包括以下几点:

    • aiohttp是对http请求进行异步爬取的库,实现异步爬取,需要启动协程,而协程则需要借助asynico里面的事件循环才能执行。
    • 每个异步方法的前面都要统一+async来修饰。(async定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行)
    • with as前面加上async代表声明一个支持异步的上下文管理器。(with as能够自动分配和释放资源)
    • 对于一些返回协程对象的操作,官方API文档里,response.text返回的是client对象,所以要在前面加上await;response.status返回的是数值,因此前面不要加await。参考官方文档https://docs.aiohttp.org/en/stable/client_reference.html
    • 最后定义的协程对象要调用事件循环。

    注:3.7版本以后可以直接用asynico.run(main())代替上方显示声明事件循环,run方法内部会自动启动一个事件循环。

    2.3 URL参数设置

    # 借助params参数传参
    params = {'name':'Jack', 'age':24}
    aiohttp.ClientSession().get(url, params=params)
    
    #aiohttp支持其他请求类型,如POST、PUT、DELETE等,这些和requests的使用方法类似
     #- 对于POST表单提交,其对应的请求头中的Content-Type为application/x-www-form-urlencoded
    .post(url, data=data)
     #- 对应POST JSON数据提交,其对应的请求头中的Content-Type为application/json
     .post(url, json=data)
    .put(url, data=data)
    .delete(url)
    .head(url)
    .options(url)
    .patch(url, data=data)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    响应,与requests响应基本一致。需要加await的要查看响应类型是否是协程对象(如async修饰的方法),具体查看apihttp的API官方文档。

    设置超时

    # 设置超时1s
    timeout = apihttp.ClientTimout(total=1)
    aiohttp.ClientTimeout(timeout=timeout)
    # 如果超时,则会抛出TImeoutError异常,其类型为asynico.TimeoutError。
    
    • 1
    • 2
    • 3
    • 4

    ClientTimeout对象还有其他connect、socket_connect等参数,详细API可以参考官方文档:https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts

    并发限制
      由于aiohttp可以支持非常高的并发量,百万量都是能做到的,所以部分网站如果响应不过来,有瞬间将目标网站爬挂掉的危,这是就要限制爬取的并发量。
      一般情况下,借助asynico的Semaphore来控制并发量

    import asyncio
    import aiohttp
    
    CONCURRENCY = 5
    URL = 'https://www.baidu.com'
    # 创建信号量对象,用来控制并发量大小
    semaphore = asyncio.Semaphore(CONCURRENCY)
    session = None
    
    async def scrape_api():
        # 把semaphore直接放置在了对应的爬取方法里,使用async with语句将semaphore作为上下文对象即可。
        async with semaphore:
            print('正在爬取',URL)
            async with session.get(URL) as reponse:
                await asyncio.sleep(1)
                return await reponse.text()
    
    async def main():
        global session
        session = aiohttp.ClientSession()
        scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
        await asyncio.gather(*scrape_index_tasks)
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    分析:
    asyncio.await && asyncio.gather
    相同:从功能上看,asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
    不同:asyncio.wait 会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;而asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果。

    3、aiohttp异步爬取实战

    爬取目标
    1、爬取地址:https://spa5.scrape.center/
    2、使用aiohttp爬取全站的图书数据;
    3、将数据存储到数据库或独立文件中;

    import asyncio
    import aiohttp
    import logging
    import json
    from motor.motor_asyncio import AsyncIOMotorClient
    logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s:%(message)s')
    
    BASE_URL = 'https://spa5.scrape.center/api/book/?limit=18&offset={offset}'
    DETAIL_URL = 'https://spa5.scrape.center/api/book/{book_id}/'
    
    PAGE_SIZE = 18
    PAGE_NUMBER = 10
    COUCURRENCY = 20
    
    session = None
    
    semaphore = asyncio.Semaphore(COUCURRENCY)
    
    # 连接MongoDB
    MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
    MONGO_DB_NAME = 'books'
    MONGO_CONNECTION_NAME = 'books'
    client = AsyncIOMotorClient(MONGO_CONNECTION_STRING)
    db = client[MONGO_DB_NAME]
    collection = db[MONGO_CONNECTION_NAME]
    # 数据存储
    async def save_data(data):
        logging.info('保存数据 %s', data)
        if data:
            return await collection.update_one({
                'id': data.get('id')
            },{
                '$set': data
            }, upsert=True)
    
    # 定义爬取url返回json数据
    async def scrape_api(url):
        async with semaphore:
            try:
                logging.info("爬取路径:%s", url)
                async with session.get(url) as response:
                    return await response.json()
            except aiohttp.ClientError:
                logging.error("爬取%s出现了错误", url, exc_info=True)
    
    # 爬取列表页
    async def scrape_base(page):
        url = BASE_URL.format(offset = PAGE_SIZE * (page-1))
        return await scrape_api(url)
    
    # 爬取详情页
    async def scrape_detail(id):
        url = DETAIL_URL.format(book_id = id)
        data =  await scrape_api(url)
        await save_data(data)
    
    async def main():
        global session
        session = aiohttp.ClientSession()
        scrape_base_tasks = [asyncio.ensure_future(scrape_base(page)) for page in range(1,PAGE_NUMBER+1)]
        results = await asyncio.gather(*scrape_base_tasks)
        logging.info('results结果为 \n %s', json.dumps(results, ensure_ascii=False, indent=2))
        # 提取所有book_id
        ids = []
        for result in results:
            if not result:
                continue
            for item in result.get('results'):
                ids.append(item.get('id'))
        scrape_detail_tasks = [asyncio.ensure_future(scrape_detail(id)) for id in ids]
        await asyncio.wait(scrape_detail_tasks)
        await session.close()
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
  • 相关阅读:
    ES6 - 剩余参数,Array的扩展方法,String的扩展方法
    php中识别url被篡改并阻止访问的实现方式是什么
    【机器学习习题】估计一个模型在未见过的数据上的性能
    Spring Boot整合Spring Fox生成Swagger文档
    DDoS攻击与CC攻击:网络安全的两大挑战
    Docker 数据存储及持久化应用
    Nacos启动报错 db.num is null
    数据库数字类型与日期类型
    【牛客 - 剑指offer】JZ11 旋转数组的最小数字 Java实现
    达尔优EK87键盘说明书
  • 原文地址:https://blog.csdn.net/dayexiaofan/article/details/127525080