• aiohttp从入门到精通


    I. 概述

    A. 异步编程概述

    异步编程是一种编写并发代码的方法,它能够提高程序的性能和响应速度。在传统的同步编程模型中,程序会等待一个函数或任务完成才能继续执行下一个任务,这可能会导致长时间的等待和阻塞。而在异步编程模型中,程序可以同时处理多个任务,并且不会阻塞主线程。

    异步编程通常基于回调、事件循环和协程来实现。回调是指当一个函数完成时,调用另一个函数来处理结果。事件循环是一种机制,通过在一个无限循环中不断地监听和处理事件,来实现异步操作和并发处理。而协程则是一种轻量级的线程,在运行时可以暂停和继续执行,从而允许程序更加高效地利用计算资源和IO资源。

    异步编程的优势在于可以提高程序的吞吐量和响应速度,在处理大量IO密集型任务时特别有效。在Python中,asyncio库是一种实现异步编程的方式。aiohttp就是基于asyncio库实现的HTTP客户端/服务器框架。

    B. aiohttp详解

    aiohttp是一个Python的HTTP客户端/服务器框架,它基于asyncio库实现异步编程模型,可以支持高性能和高并发的HTTP通信。aiohttp用于编写异步的Web服务器、Web应用程序或HTTP客户端,以提供对Web和HTTP资源的访问和操作。

    在aiohttp中,HTTP客户端提供了一种发送HTTP请求和处理响应的异步方式,而HTTP服务器则提供了一种异步处理HTTP请求和响应的方式。在使用aiohttp编写Web应用程序时,我们可以选择使用内置的路由器和视图功能来处理HTTP请求,并使用异步模板引擎来渲染HTML页面。

    aiohttp支持WebSocket协议,使得我们可以轻松地在应用程序中实现实时通信和数据推送。aiohttp的API设计简单、易用,与Python开发者熟悉的asyncio风格一致。因此,即使没有使用过aiohttp也可以较快上手。 aipyhttp适用于那些需要高性能、高吞吐量、高并发的Python网络应用场景,如实时聊天、在线游戏、大数据分析等。

    C. aiohttp优势

    1. 高性能

    在CPU密集型任务下,异步编程相较于同步编程能够更好地利用CPU资源,从而实现高性能和高吞吐量。在IO密集型任务下,异步编程能够有效地避免等待I/O操作时的线程阻塞,从而降低了系统负载,提高了响应速度。

    1. 高并发

    aiohttp支持异步编程模型,使得它能够同时处理多个请求。通过使用协程和事件循环机制,aiohttp可以轻松处理大量并发请求,提高了应用程序的并发处理能力。

    1. 轻量级

    aiohttp的API简单易用,框架本身也非常轻便,没有过多的依赖,因此容易学习和部署。

    1. WebSocket支持

    aiohttp支持WebSocket协议,并提供了WebSocket相关的API,开发者可以在应用程序中方便地实现实时通信和数据推送功能。

    II. 安装和配置

    pip install aiohttp

    一个简单的案例

    import aiohttp
    
    async def main():
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
        timeout = aiohttp.ClientTimeout(total=10) # 设置总超时时间为10秒
        async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
            async with session.get('https://www.example.com/') as response:
                html = await response.text()
                print(html)
    
    if __name__ == '__main__':
        asyncio.run(main())

    III. 请求与响应

    A. 发送POST请求

    开发一个POST请求服务,并实现请求发送的客户端的示例代码:

    Web服务器端代码:

    from aiohttp import web
    
    async def handle(request):
        data = await request.post()
        name = data.get('name', "Anonymous")
        age = data.get('age', "Unknown")
        text = f"Hello, {name}, you're {age} years old."
        return web.Response(text=text)
    
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.post('/', handle)])
    
    if __name__ == '__main__':
        web.run_app(app)

    客户端代码:

    import aiohttp
    import asyncio
    
    async def post_data(session, url):
        data = {'name': 'John', 'age': 30}
        async with session.post(url, data=data) as response:
            return await response.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            html = await post_data(session, 'http://localhost:8080/')
            print(html)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    这个示例中,我们首先创建了一个Web服务器,它能够处理POST请求并返回"Hello, {name}, you're {age} years old."的字符串。然后,我们编写了一个客户端程序,用于向服务器发送POST请求,并接收响应。

    在客户端程序中,我们使用ClientSession对象来创建会话,并调用post_data函数来发送POST请求。其中,我们通过data参数来指定POST请求的数据。最后,我们将得到的响应正文打印到控制台上。

    需要注意的是,在上述代码中,我们使用了 await request.post()来读取POST请求数据。在客户端程序中,我们使用了async with session.post(url, data=data)来发送POST请求。

    B. 发送GET请求

    开发一个GET请求服务,并实现请求发送的客户端的示例代码:

    Web服务器端代码:

    from aiohttp import web
    
    async def handle(request):
        name = request.match_info.get('name', "Anonymous")
        text = f"Hello, {name}"
        return web.Response(text=text)
    
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])
    
    if __name__ == '__main__':
        web.run_app(app)

    客户端代码:

    import aiohttp
    import asyncio
    
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, 'http://localhost:8080/John')
            print(html)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    首先创建了一个Web服务器,它能够处理GET请求并返回"Hello, {name}"的字符串。然后,我们编写了一个客户端程序,用于向服务器发送GET请求,并接收响应。

    在客户端程序中,我们使用ClientSession对象来创建会话,并调用fetch函数来发送GET请求。最后,我们将得到的响应正文打印到控制台上。

    需要注意的是,在客户端程序中,我们指定了服务器的URL为"http://localhost:8080/John",其中"John"是URL路径中的参数。如果我们想请求不同的资源,只需要修改URL即可。

    C. 获取响应信息

    下面是获取响应各部分信息的示例代码:

    import aiohttp
    import asyncio
    
    async def fetch(session, url):
        async with session.get(url) as response:
            # 获取状态码
            status = response.status
            
            # 获取响应头
            headers = response.headers
            
            # 以文本形式获取响应正文
            text = await response.text()
            
            # 以字节形式获取响应正文
            content = await response.read()
            
            return status, headers, text, content
    
    async def main():
        async with aiohttp.ClientSession() as session:
            status, headers, text, content = await fetch(session, 'https://www.example.com')
            print(status)
            print(headers)
            print(text)
            print(content)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    在这个示例中,我们定义了一个fetch函数,它用于向指定URL发送GET请求,并返回响应的状态码、响应头、响应正文(以文本和字节形式)等信息。

    main函数中,我们使用ClientSession对象来创建会话,并调用fetch函数来发送GET请求。然后,我们将得到的响应状态码、响应头、响应正文打印到控制台上。

    需要注意的是,在上述代码中,我们首先使用await response.text()await response.read()两种方式获取响应正文。其中,text()方法返回字符串类型的响应正文,而`read()`方法返回字节类型的响应正文。

    IV. Cookies和Session

    在aiohttp中,操作Cookies和Session非常容易。以下是一个示例代码,演示如何使用aiohttp进行Cookies和Session的操作:

    from aiohttp import web
    
    async def handle(request):
        session = await request.session()
        count = session.get('count', 0)
        count += 1
        session['count'] = count
        text = f"Visited {count} times."
        return web.Response(text=text)
    
    app = web.Application()
    app.add_routes([web.get('/', handle)])
    
    if __name__ == '__main__':
        web.run_app(app)

    在这个示例中,我们创建了一个Web服务器,它能够记录网页被访问的次数。当请求到达时,我们首先获取或创建一个会话对象,然后从会话对象中读取count值(如果不存在,则默认为0),并将其加1。最后,我们将更新后的count值存储回会话对象中,并返回带有访问次数信息的响应。

    在上面的示例中,我们使用了aiohttp提供的request.session()方法来创建或获取一个会话对象。在这个会话对象中,我们可以使用Python字典的语法来访问和修改任意键值对。例如,我们可以使用session['key'] = value来设置一个键值对,使用session.get('key', default)来读取一个键的值(如果不存在,则返回默认值)。

    需要注意的是,在使用会话对象时,必须启用Web服务器的Sessions功能。要启用Sessions功能,只需在创建Web服务器实例时,设置session_factory参数即可。例如:

    from aiohttp import web
    
    async def handle(request):
        session = await request.session()
        count = session.get('count', 0)
        count += 1
        session['count'] = count
        text = f"Visited {count} times."
        return web.Response(text=text)
    
    app = web.Application()
    app.add_routes([web.get('/', handle)])
    app.cleanup_ctx.append(lambda app: setup_session(app))
    
    async def setup_session(app):
        app['session'] = await aiohttp_session.setup(
            app, aiohttp_session.SimpleCookieStorage())
    
    if __name__ == '__main__':
        web.run_app(app)

    在这个示例中,我们使用了aiohttp-session库来管理Sessions。我们首先在cleanup_ctx列表中注册了一个setup_session函数,用于在Web服务器启动前设置Sessions。在setup_session函数中,我们创建了一个SimpleCookieStorage对象,并将其作为会话存储器传递给aiohttp_session.setup()方法。这样,在请求到达时,我们就可以通过await request.session()方法来获取或创建会话对象了。

    需要注意的是,如果要在Web服务器中使用Sessions功能,还需要按照官方文档说明正确地配置aiohttp_session库和相关依赖模块。

    V. 异步处理

    A. 协程

    aiohttp使用协程来实现异步处理。协程可以看作是一种轻量级的线程,能够在单个线程中运行多个任务。在aiohttp中,使用async/await语法来定义协程函数。

    例如,下面是一个简单的aiohttp协程示例:

    import aiohttp
    import asyncio
    
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, 'https://www.example.com')
            print(html)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    在上面的示例中,我们使用aiohttp的ClientSession类创建一个HTTP会话,并定义了一个fetch函数来获取URL的响应内容。然后,在main函数中,我们调用fetch函数并等待其完成。通过在loop中运行main函数,我们可以异步地获取URL的响应内容。

    B. 回调函数

    除了使用async/await语法来定义协程函数外,aiohttp还支持使用回调函数来处理异步请求。我们可以使用add_done_callback方法向请求对象添加一个回调函数,在请求完成时自动调用该回调函数。

    例如,下面是一个使用回调函数处理异步请求的aiohttp示例:

    import aiohttp
    import asyncio
    
    def handle_response(response):
        print(response.status)
        asyncio.get_event_loop().stop()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            url = 'https://www.example.com'
            async with session.get(url) as response:
                response.add_done_callback(handle_response)
                await asyncio.sleep(1)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    在上面的示例中,我们定义了一个handle_response函数来处理异步请求的响应内容。然后,在main函数中,我们向HTTP请求对象添加一个回调函数,并使用asyncio.sleep方法来等待异步请求完成。

    VI. SSL认证

    在aiohttp中,我们可以使用SSL/TLS协议来保护HTTP通信的安全性。SSL证书是一种数字证书,用于验证服务器身份并加密数据传输。在aiohttp中,我们可以通过配置SSL证书和可信机构列表来进行SSL认证。

    下面是一个简单的示例代码,演示如何在aiohttp中启用SSL认证:

    import aiohttp
    import asyncio
    
    async def main():
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.example.com') as response:
                print(await response.text())
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    在上面的示例中,我们使用aiohttp的ClientSession类创建一个HTTP会话,并向URL发起一个HTTPS请求。由于该URL使用了有效的SSL证书,因此该请求将成功执行。

    但是,如果我们尝试向一个使用无效或过期SSL证书的URL发起HTTPS请求,则会引发SSL错误。为了解决这个问题,我们需要指定要使用的SSL证书和可信机构列表。

    例如,下面是一个使用自签名SSL证书的aiohttp示例:

    import aiohttp
    import asyncio
    import ssl
    
    async def main():
    
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
        ssl_context.load_verify_locations('/path/to/cert.pem')
    
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.example.com', ssl=ssl_context) as response:
                print(await response.text())
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    在上面的示例中,我们通过ssl.SSLContext类创建了一个SSL上下文对象,并指定要使用的TLS协议版本。然后,我们使用load_verify_locations方法加载自签名证书,并将该证书用于SSL认证。

    需要注意的是,在使用自定义SSL证书时,我们需要将证书文件传递给load_verify_locations方法。如果证书文件不是PEM格式的,我们还需要指定证书文件类型。

    VII. WebSockets支持

    A. WebSocket介绍

    WebSocket是一种协议,用于在客户端和服务器之间进行实时双向通信。它允许数据在客户端和服务器之间以全双工方式流动,并且不需要使用HTTP请求/响应周期来发送数据。

    WebSocket通过在客户端和服务器之间建立持久连接来实现实时通信。这个连接是基于TCP协议的,但是与HTTP请求/响应不同,它只需要一个握手过程,就可以在客户端和服务器之间创建长期的连接。

    B. 使用aiohttp实现WebSocket

    aiohttp提供了对WebSockets的完整支持,允许我们创建和管理WebSocket连接,并在客户端和服务器之间进行实时双向通信。下面是一个简单的aiohttp WebSocket示例:

    import aiohttp
    import asyncio
    
    async def websocket_handler(request):
    
        ws = aiohttp.web.WebSocketResponse()
        await ws.prepare(request)
    
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                await ws.send_str('Hello, ' + msg.data)
            elif msg.type == aiohttp.WSMsgType.ERROR:
                break
    
        return ws
    
    app = aiohttp.web.Application()
    app.add_routes([aiohttp.web.get('/', websocket_handler)])
    
    if __name__ == '__main__':
        aiohttp.web.run_app(app)

    在上面的示例中定义了一个websocket_handler函数来处理WebSocket请求。在这个函数中,我们使用aiohttp的WebSocketResponse类创建一个WebSocket响应对象,并调用它的prepare方法来准备WebSocket连接。

  • 相关阅读:
    第2-1-3章 docker-compose安装FastDFS,实现文件存储服务
    LeetCode刷题笔记:165.输出二叉树
    Au@Ag(金核银壳纳米颗粒)修饰巯基SH/氨基/NH2/羧基COOH/PEG
    xauth: file /home/oracle/.Xauthority does not exist解决方案
    Python+Appium自动化测试-编写自动化脚本
    Presto 之 explain and explain analyze的实现
    【预测模型-RBF】基于径向基神经网络实现数据分类附matlab代码
    计算机网络——数据链路层介质访问控制
    Docker打包python镜像(Windows)
    项目中遇到的error
  • 原文地址:https://blog.csdn.net/csd11311/article/details/130903563