• python 基于aiohttp的异步爬虫实战


    钢铁知识库,一个学习python爬虫、数据分析的知识库。人生苦短,快用python。

    之前我们使用requests库爬取某个站点的时候,每发出一个请求,程序必须等待网站返回响应才能接着运行,而在整个爬虫过程中,整个爬虫程序是一直在等待的,实际上没有做任何事情。

    像这种占用磁盘/内存IO、网络IO的任务,大部分时间是CPU在等待的操作,就叫IO密集型任务。对于这种情况有没有优化方案呢,当然有,那就是使用aiohttp库实现异步爬虫。

    aiohttp是什么

    我们在使用requests请求时,只能等一个请求先出去再回来,才会发送下一个请求。明显效率不高阿,这时候如果换成异步请求的方式,就不会有这个等待。一个请求发出去,不管这个请求什么时间响应,程序通过await挂起协程对象后直接进行下一个请求。

    解决方法就是通过 aiohttp + asyncio,什么是aiohttp?一个基于 asyncio 的异步 HTTP 网络模块,可用于实现异步爬虫,速度明显快于 requests 的同步爬虫。

    requests和aiohttp区别

    区别就是一个同步一个是异步。话不多说直接上代码看效果。

    安装aiohttp

    pip install aiohttp
    
    • 1
    • requests同步示例:
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # author: 钢铁知识库
    import time
    import requests
    
    # 同步请求
    def main():
        start = time.time()
        for i in range(5):
            res = requests.get('http://httpbin.org/delay/2')
            print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status_code}')
        print(f'requests同步耗时:{time.time() - start}')
        
    if __name__ == '__main__':
        main()
    
    '''
    当前时间:2022-09-05 15:44:51.991685, status_code = 200
    当前时间:2022-09-05 15:44:54.528918, status_code = 200
    当前时间:2022-09-05 15:44:57.057373, status_code = 200
    当前时间:2022-09-05 15:44:59.643119, status_code = 200
    当前时间:2022-09-05 15:45:02.167362, status_code = 200
    requests同步耗时:12.785893440246582
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    可以看到5次请求总共用12.7秒,再来看同样的请求异步多少时间。

    • aiohttp异步示例:
    #!/usr/bin/env python
    # file: day6-9同步和异步.py
    # author: 钢铁知识库
    import asyncio
    import time
    import aiohttp
    
    async def async_http():
        # 声明一个支持异步的上下文管理器
        async with aiohttp.ClientSession() as session:
            res = await session.get('http://httpbin.org/delay/2')
            print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')
    
    tasks = [async_http() for _ in range(5)]
    start = time.time()
    # Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run()来代替最后的启动操作
    asyncio.run(asyncio.wait(tasks))
    print(f'aiohttp异步耗时:{time.time() - start}')
    
    '''
    当前时间:2022-09-05 15:42:32.363966, status_code = 200
    当前时间:2022-09-05 15:42:32.366957, status_code = 200
    当前时间:2022-09-05 15:42:32.374973, status_code = 200
    当前时间:2022-09-05 15:42:32.384909, status_code = 200
    当前时间:2022-09-05 15:42:32.390318, status_code = 200
    aiohttp异步耗时:2.5826876163482666
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    两次对比可以看到执行过程,时间一个是顺序执行,一个是同时执行。这就是同步和异步的区别。

    aiohttp使用介绍

    接下来我们会详细介绍aiohttp库的用法和爬取实战。aiohttp 是一个支持异步请求的库,它和 asyncio 配合使用,可以使我们非常方便地实现异步请求操作。asyncio模块,其内部实现了对TCP、UDP、SSL协议的异步操作,但是对于HTTP请求,就需要aiohttp实现了。

    aiohttp分为两部分,一部分是Client,一部分是Server。下面来说说aiohttp客户端部分的用法。

    基本实例

    先写一个简单的案例

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : 钢铁知识库
    import asyncio
    import aiohttp
    
    async def get_api(session, url):
        # 声明一个支持异步的上下文管理器
        async with session.get(url) as response:
            return await response.text(), response.status
    
    async def main():
        async with aiohttp.ClientSession() as session:
            html, status = await get_api(session, 'http://httpbin.org/delay/2')
            print(f'html: {html[:50]}')
            print(f'status : {status}')
    
    if __name__ == '__main__':
        #  Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作
        asyncio.get_event_loop().run_until_complete(main())
    '''
    html: {
      "args": {}, 
      "data": "", 
      "files": {}, 
      
    status : 200
    
    Process finished with exit code 0
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    aiohttp请求的方法和之前有明显区别,主要包括如下几点:

    1. 除了导入aiohttp库,还必须引入asyncio库,因为要实现异步,需要启动协程。
    2. 异步的方法定义不同,前面都要统一加async来修饰。
    3. with as用于声明上下文管理器,帮我们自动分配和释放资源,加上async代码支持异步。
    4. 对于返回协程对象的操作,前面需要加await来修饰。response.text()返回的是协程对象。
    5. 最后运行启用循环事件

    注意:Python3.7及以后的版本中,可以使用asyncio.run(main())代替最后的启动操作。

    URL参数设置

    对于URL参数的设置,我们可以借助params设置,传入一个字典即可,实例如下:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : 钢铁知识库
    import aiohttp
    import asyncio
    
    async def main():
        params = {'name': '钢铁知识库', 'age': 23}
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.httpbin.org/get', params=params) as res:
                print(await res.json())
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    '''
    {'args': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=钢铁知识库&age=23'}
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    可以看到实际请求的URL后面带了后缀,这就是params的内容。

    请求类型

    除了get请求,aiohttp还支持其它请求类型,如POST、PUT、DELETE等,和requests使用方式类似。

    session.post('http://httpbin.org/post', data=b'data')
    session.put('http://httpbin.org/put', data=b'data')
    session.delete('http://httpbin.org/delete')
    session.head('http://httpbin.org/get')
    session.options('http://httpbin.org/get')
    session.patch('http://httpbin.org/patch', data=b'data')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    要使用这些方法,只需要把对应的方法和参数替换一下。用法和get类似就不再举例。

    响应的几个方法

    对于响应来说,我们可以用如下方法分别获取其中的响应情况。状态码、响应头、响应体、响应体二进制内容、响应体JSON结果,实例如下:

    #!/usr/bin/env python
    # @Author  : 钢铁知识库
    import aiohttp
    import asyncio
    
    async def main():
        data = {'name': '钢铁知识库', 'age': 23}
        async with aiohttp.ClientSession() as session:
            async with session.post('https://www.httpbin.org/post', data=data) as response:
                print('status:', response.status)  # 状态码
                print('headers:', response.headers)  # 响应头
                print('body:', await response.text())  # 响应体
                print('bytes:', await response.read())  # 响应体二进制内容
                print('json:', await response.json())  # 响应体json数据
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    '''
    status: 200
    headers: 
    body: {
      "args": {}, 
      "data": "", 
      "files": {}, 
      "form": {
        "age": "23", 
        "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"
      }, 
      "headers": {
        "Accept": "*/*", 
        "Accept-Encoding": "gzip, deflate", 
        "Content-Length": "57", 
        "Content-Type": "application/x-www-form-urlencoded", 
        "Host": "www.httpbin.org", 
        "User-Agent": "Python/3.8 aiohttp/3.8.1", 
        "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"
      }, 
      "json": null, 
      "origin": "122.55.11.188", 
      "url": "https://www.httpbin.org/post"
    }
    
    bytes: b'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "age": "23", \n    "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Content-Length": "57", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "www.httpbin.org", \n    "User-Agent": "Python/3.8 aiohttp/3.8.1", \n    "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n  }, \n  "json": null, \n  "origin": "122.5.132.196", \n  "url": "https://www.httpbin.org/post"\n}\n'
    json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'}
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    可以看到有些字段前面需要加await,因为其返回的是一个协程对象(如async修饰的方法),那么前面就要加await。

    超时设置

    我们可以借助ClientTimeout对象设置超时,例如要设置1秒的超时时间,可以这么实现:

    #!/usr/bin/env python
    # @Author  : 钢铁知识库
    import aiohttp
    import asyncio
    
    async def main():
        # 设置 1 秒的超时 
        timeout = aiohttp.ClientTimeout(total=1)
        data = {'name': '钢铁知识库', 'age': 23}
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get('https://www.httpbin.org/delay/2', data=data) as response:
                print('status:', response.status)  # 状态码
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    '''
    Traceback (most recent call last):
    ####中间省略####
        raise asyncio.TimeoutError from None
    asyncio.exceptions.TimeoutError
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这里设置了超时1秒请求延时2秒,发现抛出异常asyncio.TimeoutError,如果正常则响应200。

    并发限制

    aiohttp可以支持非常高的并发量,但面对高并发网站可能会承受不住,随时有挂掉的危险,这时需要对并发进行一些控制。现在我们借助asyncio 的Semaphore来控制并发量,实例如下:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : 钢铁知识库
    import asyncio
    from datetime import datetime
    import aiohttp
    
    # 声明最大并发量
    semaphore = asyncio.Semaphore(2)
    
    async def get_api():
        async with semaphore:
            print(f'scrapting...{datetime.now()}')
            async with session.get('https://www.baidu.com') as response:
                await asyncio.sleep(2)
                # print(f'当前时间:{datetime.now()}, {response.status}')
    
    async def main():
        global session
        session = aiohttp.ClientSession()
        tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
        await asyncio.gather(*tasks)
        await session.close()
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    '''
    scrapting...2022-09-07 08:11:14.190000
    scrapting...2022-09-07 08:11:14.292000
    scrapting...2022-09-07 08:11:16.482000
    scrapting...2022-09-07 08:11:16.504000
    scrapting...2022-09-07 08:11:18.520000
    scrapting...2022-09-07 08:11:18.521000
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在main方法里,我们声明了1000个task,如果没有通过Semaphore进行并发限制,那这1000放到gather方法后会被同时执行,并发量相当大。有了信号量的控制之后,同时运行的task数量就会被控制,这样就能给aiohttp限制速度了。

    aiohttp异步爬取实战

    接下来我们通过异步方式练手一个小说爬虫,需求如下:

    需求页面:https://dushu.baidu.com/pc/detail?gid=4308080950

    目录接口:https://dushu.baidu.com/api/pc/getCatalog?data={“book_id”:“4308080950”}

    详情接口:https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}

    关键参数:book_id:小说ID、cid:章节id

    采集要求:使用协程方式写入,数据存放进mongo

    需求分析:点开需求页面,通过F12抓包可以发现两个接口。一个目录接口,一个详情接口。
    首先第一步先请求目录接口拿到cid章节id,然后将cid传递给详情接口拿到小说数据,最后存入mongo即可。

    话不多说,直接上代码:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : 钢铁知识库
    # 不合适就是不合适,真正合适的,你不会有半点犹豫。
    import asyncio
    import json,re
    import logging
    import aiohttp
    import requests
    from utils.conn_db import ConnDb
    
    # 日志格式
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
    
    # 章节目录api
    b_id = '4308080950'
    url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}'
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/104.0.0.0 Safari/537.36"
    }
    # 并发声明
    semaphore = asyncio.Semaphore(5)
    
    async def download(title,b_id, cid):
        data = {
            "book_id": b_id,
            "cid": f'{b_id}|{cid}',
        }
        data = json.dumps(data)
        detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data)
        async with semaphore:
            async with aiohttp.ClientSession(headers=headers) as session:
                async with session.get(detail_url) as response:
                    res = await response.json()
                    content = {
                        'title': title,
                        'content': res['data']['novel']['content']
                    }
                    # print(title)
                    await save_data(content)
    
    async def save_data(data):
        if data:
            client = ConnDb().conn_motor_mongo()
            db = client.baidu_novel
            collection = db.novel
            logging.info('saving data %s', data)
            await collection.update_one(
                {'title': data.get('title')},
                {'$set': data},
                upsert=True
            )
    
    async def main():
        res = requests.get(url, headers=headers)
        tasks = []
        for re in res.json()['data']['novel']['items']:     # 拿到某小说目录cid
            title = re['title']
            cid = re['cid']
            tasks.append(download(title, b_id, cid))    # 将请求放到列表里,再通过gather执行并发
        await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    至此,我们就使用aiohttp完成了对小说章节的爬取。

    要实现异步处理,得先要有挂起操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样才能充分利用好资源,要实现异步,需要了解 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。

    await 后面的对象必须是如下格式之一:

    • A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
    • A generator-based coroutine object returned from a function decorated with types.coroutine,一个由 types.coroutine 修饰的生成器,这个生成器可以返回 coroutine 对象。
    • An object with an await method returning an iterator,一个包含 await 方法的对象返回的一个迭代器。

    ---- 20220909 钢铁知识库

    总结

    以上就是借助协程async和异步aiohttp两个主要模块完成异步爬虫的内容,
    aiohttp 以异步方式爬取网站的耗时远小于 requests 同步方式,以上列举的例子希望对你有帮助。

    注意,线程和协程是两个概念,后面找机会我们再聊聊进程和线程、线程和协程的关系。

  • 相关阅读:
    zero-shot, one-shot和few-shot
    使用Python将MySQL查询结果导出到Excel(xlsxwriter)
    字节码文件(.class文件)
    iOS自动化测试方案(二):Xcode开发者工具构建WDA应用到iphone
    日立F-4700FL数据分析实验报告
    中国牛仔服装行业市场深度调研及投资价值研究报告
    python 自动化学习(三) 句柄获取、模拟按键、opencv安装
    python+django社区医院诊所医疗管理系统_6t4o8
    图上问题训练题解
    ChatGPT追祖寻宗:GPT-2论文要点解读
  • 原文地址:https://blog.csdn.net/u011463397/article/details/126778588