• 【Python工程师之高性能爬虫】


    前言

    如何在spiders中使用异步操作实现高性能的数据爬取
    首先讲解一下异步爬虫的方式:

    1. 多线程、多进程(不建议):
      弊端:无法无限制的开启多线程或者多进程
      优势:可以为相关阻塞的方法类单独开启线程进程,从而实现异步执行脚本
    2. 线程池、进程池(适当的使用):
      弊端:线程池或进程池中的数量是有上限的。
      优势:固定了线程和进程的数量,从而降低系统对进程或者线程创建和销毁次数,可以很好地降低系统的开销
    3. 单线程 + 异步协程(推荐):
      一些概念和两个关键字:
      ①event_loop(事件循环):相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件时,函数就会被循环执行。
      ②coroutline(协程对象):我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用async关键字来定义一个方法,这个方法在调用时不会被立即被执行,而是返回一个协程对象。
      ③task(任务):,它是对协程对象的进一步封装,包含了任务的各个状态。
      ④future(任务):代表将来执行或还没有执行的任务,实际上和task没有本质区别。
      ⑤async(协程):定义一个协程。
      ⑥await(等待执行):用来挂起阻塞方法的执行

    tips:

    await

    await语句后必须是一个 可等待对象 ,可等待对象主要有三种:Python协程,Task,Future。通常情况下没有必要在应用层级的代码中创建 Future 对象。

    Coroutine

    协程(Coroutine),又称微线程,纤程。通常我们认为线程是轻量级的进程,因此我们也把协程理解为轻量级的线程即微线程。
    协程的作用是在执行函数A时可以随时中断去执行函数B,然后中断函数B继续执行函数A(可以自由切换)。
    这里的中断,不是函数的调用,而是有点类似CPU的中断。这一整个过程看似像多线程,然而协程只有一个线程执行。
    
    协程的优势
    执行效率极高,因为是子程序(函数)切换不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,
    协程的性能优势越明显。
    
    不需要锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,只需要判断状态,因此执行效率高的多。
    
    协程可以处理IO密集型程序的效率问题,但不适合处理CPU密集型程序,如要充分发挥CPU利用率应结合多进程+协程。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    asyncio

    asyncio是Python3.4引入的一个标准库,直接内置了对异步IO的支持。asyncio模块提供了使用协程构建并发应用的工具。它使用一种单线程
    单进程的的方式实现并发,应用的各个部分彼此合作, 可以显示的切换任务,一般会在程序阻塞I/O操作的时候发生上下文切换如等待读写文件,
    或者请求网络。同时asyncio也支持调度代码在将来的某个特定事件运行,从而支持一个协程等待另一个协程完成,以处理系统信号和识别其
    他一些事件。
    在 asyncio 程序中使用同步代码虽然并不会报错,但是也失去了并发的意义,例如网络请求,如果使用仅支持同步的 requests,
    在发起一次请求后在收到响应结果之前不能发起其他请求,这样要并发访问多个网页时,即使使用了 asyncio,在发送一次请求
    后, 切换到其他协程还是会因为同步问题而阻塞,并不能有速度上的提升,这时候就需要其他支持异步操作的请求库如 aiohttp.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    单线程爬虫

    这里使用requests 请求,requests是一个同步请求的类库

    import requests
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36'
    }
    
    
    def get_content(url):
        response = requests.get(url=url, headers=headers)
        if response.status_code == 200:
            return response.content
    
    
    def parse_content(content):
        print('相应数据的长度为:', len(content))
    
    
    if __name__ == "__main__":
        urls = [
            'https://item.jd.com/100030771664.html',
            'https://item.jd.com/100030771664.html',
            'https://item.jd.com/100030771664.html',
        ]
        for url in urls:
            content = get_content(url)
            parse_content(content)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    协程

    asyncio 是 Python 中的异步IO库,用来编写并发协程,适用于IO阻塞且需要大量并发的场景,例如爬虫、文件读写。

    asyncio 在 Python3.4 被引入,经过几个版本的迭代,特性、语法糖均有了不同程度的改进,这也使得不同版本的 Python 在 asyncio 的用法上各不相同,显得有些杂乱,以前使用的时候也是本着能用就行的原则,在写法上走了一些弯路,现在对 Python3.7+ 和 Python3.6 中 asyncio 的用法做一个梳理,以便以后能更好的使用

    import asyncio
    
    
    async def request(url):
        return url
    
    
    c = request('www.baidu.com')
    
    
    def callback_func(task):
        print(task.result())
    
    
    # 绑定回调
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(c)
    # 将回调函数绑定到任务对象中
    task.add_done_callback(callback_func)
    loop.run_until_complete(task)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    单线程异步协程实现

    在request的基础上 使用异步IO库的asyncio

    import requests
    import asyncio
    import time
    
    
    start = time.time()
    urls = [
        'http://127.0.0.1:5000/111',
        'http://127.0.0.1:5000/222',
        'http://127.0.0.1:5000/333',
    ]
    
    
    async def get_page(url):
        print('正在下载', url)
        response = requests.get(url)
        print('下载完毕', response.text)
    
    tasks = []
    for url in urls:
        c = get_page(url)
        task = asyncio.ensure_future(c)
        tasks.append(task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print('总耗时:', time.time()-start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    线程池爬数据

    from multiprocessing.dummy import Pool as Pool
    import time
    
    
    def func(msg):
        print('msg:', msg)
        time.sleep(2)
        print('end:')
        
    # 三个线程
    pool = Pool(processes=3)
    for i in range(1, 5):
        msg = 'hello %d' % (i)
        # 非阻塞
        pool.apply_async(func, (msg,))
        # 阻塞,apply()源自内建函数,用于间接的调用函数,并且按位置把元祖或字典作为参数传入。
        # pool.apply(func,(msg,))
        # 非阻塞, 注意与apply传的参数的区别
        # pool.imap(func,[msg,])
        # 阻塞
        # pool.map(func, [msg, ])
    
    print('start~~~~~~~~~~~~~~~')
    pool.close()
    pool.join()
    print('done~~~~~~~~~~~~~~~')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这里演示一个aiohttp实现多任务异步协程

    aiohttp是一个建立在asyncio上的,既支持http又支持websocket的一个库。并且同时支持客户端和服务端。

    import asyncio
    import logging
    import time
    import json
    from threading import Thread
    from aiohttp import ClientSession, ClientTimeout, TCPConnector, BasicAuth
    
    import base64
    
    from urllib.parse import unquote, quote
    # 默认请求头
    HEADERS = {
        'accept': 'text/javascript, text/html, application/xml, text/xml, */*',
        # "User-Agent": "curl/7.x/line",
        'accept-encoding': 'gzip, deflate, br',
        'accept-language': 'zh-CN,zh;q=0.9',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',
    }
    
    # 默认超时时间
    TIMEOUT = 15
    
    
    def start_loop(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    
    class AioCrawl:
    
        def __init__(self):
            self.logger = logging.getLogger(__name__)
    
            self.proxyServer = None
    
            # 启动事件循环
            self.event_loop = asyncio.new_event_loop()
            self.t = Thread(target=start_loop, args=(self.event_loop,))
            self.t.setDaemon(True)
            self.t.start()
    
            self.concurrent = 0  # 记录并发数
    
        async def fetch(self, url, method='GET', headers=None, timeout=TIMEOUT, cookies=None, data=None, proxy=None):
            """采集纤程
            :param url: str
            :param method: 'GET' or 'POST'
            :param headers: dict()
            :param timeout: int
            :param cookies:
            :param data: dict()
            :param proxy: str
            :return: (status, content)
            """
    
            method = 'POST' if method.upper() == 'POST' else 'GET'
            headers = headers if headers else HEADERS
            timeout = ClientTimeout(total=timeout)
            cookies = cookies if cookies else None
            data = data if data and isinstance(data, dict) else {}
            proxy = proxy if proxy else self.proxyServer
    
            tcp_connector = TCPConnector(limit=64)  # 禁用证书验证
    
            async with ClientSession(headers=headers, timeout=timeout, cookies=cookies, connector=tcp_connector) as session:
                try:
                    if method == 'GET':
                        async with session.get(url, proxy=proxy) as response:
                            content = await response.read()
                            return response.status, content
                    else:
                        async with session.post(url, data=data, proxy=proxy) as response:
                            content = await response.read()
                            return response.status, content
                except Exception as e:
                    raise e
    
        def callback(self, future):
            """回调函数
            1.处理并转换成Result对象
            2.写数据库
            """
            msg = str(future.exception()) if future.exception() else 'success'
            code = 1 if msg == 'success' else 0
            status = future.result()[0] if code == 1 else None
            data = future.result()[1] if code == 1 else b''  # 空串
    
            data_len = len(data) if data else 0
            if code == 0 or (status is not None and status != 200):  # 打印小异常
                self.logger.warning('<url="{}", code={}, msg="{}", status={}, data(len):{}>'.format(
                    future.url, code, msg, status, data_len))
    
            self.concurrent -= 1  # 并发数-1
    
            return data
    
        def add_tasks(self, tasks, method='GET', data=None, headers=None):
            """添加任务
            :param tasks: list <class Task>
            :return: future
            """
            resultList = []
            for task in tasks:
                headers = headers if headers else HEADERS
                # asyncio.run_coroutine_threadsafe 接收一个协程对象和,事件循环对象
                future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers), self.event_loop)
                future.add_done_callback(self.callback)  # 给future对象添加回调函数
                self.concurrent += 1  # 并发数加 1
                result = future.result()
                # print(result)
                resultList.append(str(result[1], encoding="utf-8"))
            return resultList
    
        def add_one_tasks(self, task, headers=None, method='GET', data=None, proxy=None):
            """添加任务
            :param tasks: list <class Task>
            :return: future
            """
            future = asyncio.run_coroutine_threadsafe(self.fetch(task, method=method, data=data, headers=headers, proxy=proxy), self.event_loop)
            future.add_done_callback(self.callback)  # 给future对象添加回调函数
            result = future.result()
            return [str(result[1], encoding="utf-8")]
    
        def getProductParm(self, productguid):
    
            base = '{"productguid":"%s","areacode":"","referer":"https://zc.plap.mil.cn/productdetail.html?productguid=%s"}' % (
                productguid, productguid)
            # 编码
            base_d = quote(base)
    
            return str(base64.b64encode(base_d.encode("utf-8")), "utf-8")
    
    if __name__ == '__main__':
        a = AioCrawl()
        headers = {
            "Host": "api.erp.idodb.com",
            "Accept": "application/json",
            "Content-Type": "application/json;charset=UTF-8",
            "token": "f62f837d0c9fda331fd6ce35d0017a16",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36"
                }
        data = {"ware_name": "口罩", "ware_model": "", "ware_brand_name": "汉盾", "pagesize": 10, "pageindex": 2,
                   "sc_id": "4A6F7946-0704-41B2-8027-2CC13B6E96F2"}
        result = a.add_one_tasks(
            task='https://zc.plap.mil.cn/productdetail.html?productguid=118fc555-e384-11eb-89a9-fefcfe9556b7',
            data=json.dumps(data),
            headers=headers,
            method="POST")  # 模拟动态添加任务
        print(result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
  • 相关阅读:
    4.cmake-更好的hello-world
    android 11后文件读写访问权限申请
    connection_reset解决方案
    地理信息系统概论复习重点
    MATLAB SAC算法reward震荡问题
    ubuntu用Dockerfile配置ros+cuda+torch镜像及rviz可视化
    别看了,这就是你的题呀(四)
    Kubernetes集群部署
    使用 Rosetta 建立 Ubuntu x86 开发环境
    cgroup version jdk version k8s
  • 原文地址:https://blog.csdn.net/weixin_36723038/article/details/125544874