• 【Python随笔】python的web开发——WSGI、ASGI、uvicorn与FastAPI


    今天这篇文章,聊一下pythonweb开发上的一些基础实现,阐述下自己理解中的WSGIASGI,以及拿uvicorn+FastAPI的组合举个ASGI应用的例子。

    WSGI

    pythonweb服务的诞生,其实追溯到一种机制,叫做WSGI,全称Web Server Gateway InterfaceWSGI的提案来源于PEP-333,可以理解为一种python-web-serverpython-web-app的接口通信标准。在这种场景下,pythonweb服务呈现以下的工作模式:

    • python-web-app,也就是web应用层,实现WSGI接口,用作web请求的handler
    • 用户向python-web-server发送web请求
    • python-web-server,又称作WSGI Server,解析请求数据,整理当前session的环境信息
    • python-web-server加载python-web-app,调用python-web-app实例的WSGI接口,处理请求
    • python-web-app处理完请求,返回结果给到python-web-server
    • python-web-server写回返回结果,给回用户

    代码上是这样的表现,以官方提案的例子为例:

    import os, sys
    
    # python-web-app
    def simple_app(environ, start_response):
        """
        python-web-app implementation
        :param environ: 由python-web-server提供,表示当前请求的环境信息
        :param start_response: 由python-web-server提供的回调,用以初始化返回结果的状态
        :return: 返回结果的数据内容
        """
        status = '200 OK'
        response_headers = [('Content-type', 'text/plain')]
        start_response(status, response_headers)
        return ['Hello world!\n']
    
    
    # python-web-server
    def run_with_cgi(application):
        """
        WSGI layer implementation
        :param application: 实现WSGI的app
        """
        environ = dict(os.environ.items())
        headers_set = []
        headers_sent = []
    
        def write(data):
            """写回数据的逻辑"""
            if not headers_set:
                 raise AssertionError("write() before start_response()")
            elif not headers_sent:
                 # Before the first output, send the stored headers
                 status, response_headers = headers_sent[:] = headers_set
                 sys.stdout.write('Status: %s\r\n' % status)
                 for header in response_headers:
                     sys.stdout.write('%s: %s\r\n' % header)
                 sys.stdout.write('\r\n')
            sys.stdout.write(data)
            sys.stdout.flush()
    
        def start_response(status, response_headers, exc_info=None):
            """初始化response的逻辑"""
            if exc_info:
                try:
                    if headers_sent:
                        raise exc_info[0], exc_info[1], exc_info[2]
                finally:
                    exc_info = None     # avoid dangling circular ref
            elif headers_set:
                raise AssertionError("Headers already set!")
            headers_set[:] = [status, response_headers]
            return write
    
        # 调用应用层的WSGI接口,获取返回数据
        result = application(environ, start_response)
        try:
            for data in result:  # 写回返回数据
                if data:    # don't send headers until body appears
                    write(data)
            if not headers_sent:
                write('')   # send headers now if body was empty
        finally:
            if hasattr(result, 'close'):
                result.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    通过WSGI,就可以实现python-web-apppython-web-server的分离,这样无论什么python-web-app,只要实现了WSGI接口标准,就能够无缝移植到其它支持WSGIpython-web-server上。

    ASGI

    python3推出异步IO实现asyncio之后,ASGI也应运而生。ASGI的目标和WSGI相同,但也有一些改进点,一方面是支持asyncio的机制,另一方面也能够解决WSGI难以支持WebSocket之类长连接模式的问题。要深入了解ASGI,可以参考这篇文档

    ASGI标准下,python-web-app需要这样的接口实现:

    async def application(scope, receive, send):
        """
        python-web-app应用层实现
        :param scope: 由python-web-server提供,表示当前连接的环境信息
        :param receive: 通过这个协程,可以收到由python-web-server发来的事件
        :param send: 通过这个协程,可以写回事件给python-web-server,比如让python-web-server处理response
        """
        event = await receive()
        ...
        await send({"type": "websocket.send", "text": "Hello world!"})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    不论是receive到的还是send出去的event,都会包含一个type字段表示这个event的类型,一般type会有:

    • http.xxxhttp连接、请求、返回相关
    • websocket.xxxwebsocket连接、请求、返回相关
    • xxx.send/receive:收发消息相关
    • lifespan.xxxweb服务生命周期相关

    ASGI案例之uvicorn+FastAPI

    为了更加直观感受ASGI的应用,本文也顺带以uvicornFastAPI的组合,通过源码实现来看ASGI是如何串联起python-web-serverpython-web-app的。

    在笔者封装的简易http-web-app框架start-fastapi中,就支持了通过uvicorn启动FastAPI应用。其中,main.pyuvicorn实例会加载app模块下的APP这一FastAPI实例,启动web-app应用。

    # ============ start-fastapi project ============
    
    # main.py
    def main() -> None:
        uvicorn.run('app:APP', **cfg)
    
    # app/__init__.py
    APP = FastAPI(**FASTAPI_CFG)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    首先从uvicorn.run开始看起,其代码实现如下:

    # uvicorn/main.py
    def run(app: typing.Union[ASGIApplication, str], **kwargs: typing.Any) -> None:
        config = Config(app, **kwargs)  # uvicorn Config实例
        server = Server(config=config)  # uvicorn Server实例,包装Config实例
        if (config.reload or config.workers > 1) and not isinstance(app, str):
            sys.exit(1)
        if config.should_reload:  # 用watchdog监测文件改动,实时重启,开发环境用
            sock = config.bind_socket()
            ChangeReload(config, target=server.run, sockets=[sock]).run()
        elif config.workers > 1:  # spawn多个worker,实现多进程的web服务
            sock = config.bind_socket()
            Multiprocess(config, target=server.run, sockets=[sock]).run()
        else:  # 默认standalone的web服务
            server.run()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    默认会走Server实例的run方法,我们来看其中的实现:

    # uvicorn/server.py
    class Server:
        def run(self, sockets=None):
            self.config.setup_event_loop()  # 根据uvicorn配置,动态加载EventLoop的环境
            loop = asyncio.get_event_loop()  # EventLoop走asyncio的机制
            loop.run_until_complete(self.serve(sockets=sockets))  # 启动web服务
    
        async def serve(self, sockets=None):
            config = self.config
            if not config.loaded:  # 加载一次配置,即Config实例
                config.load()
            self.lifespan = config.lifespan_class(config)
            self.install_signal_handlers()  # 初始化os-signal处理逻辑
            await self.startup(sockets=sockets)  # 初始化服务
            if self.should_exit:
                return
            await self.main_loop()  # 开始主循环
            await self.shutdown(sockets=sockets)  # 终止服务
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    这里有两个重要步骤:

    • config.load:加载配置
    • startup:启动服务器

    首先看配置加载,里面会将app实例进行初始化:

    # uvicorn/config.py
    class Config:
        def load(self):
            assert not self.loaded
            # 上面略,会加载http_protocol_class/ws_protocol_class/lifespan_class
            try:
                # FastAPI走这个链路,加载到先前说的app.APP实例
                self.loaded_app = import_from_string(self.app)  
            except ImportFromStringError as exc:
                logger.error("Error loading ASGI app. %s" % exc)
                sys.exit(1)
    
            if self.interface == "auto":  # FastAPI走的是asgi3
                if inspect.isclass(self.loaded_app):
                    use_asgi_3 = hasattr(self.loaded_app, "__await__")
                elif inspect.isfunction(self.loaded_app):
                    use_asgi_3 = asyncio.iscoroutinefunction(self.loaded_app)
                else:
                    call = getattr(self.loaded_app, "__call__", None)
                    use_asgi_3 = asyncio.iscoroutinefunction(call)
                self.interface = "asgi3" if use_asgi_3 else "asgi2"
    
            self.loaded = True
    
    # fastapi/applications.py
    class FastAPI(Starlette):
        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            if self.root_path:
                scope["root_path"] = self.root_path
            if AsyncExitStack:
                async with AsyncExitStack() as stack:
                    scope["fastapi_astack"] = stack
                    await super().__call__(scope, receive, send)
            else:
                await super().__call__(scope, receive, send)  # pragma: no cover
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    可以看到FastAPIapp实现里,定义了ASGI,并且也在uvicornconfig.load里被识别到了。FastAPI继承了Starlette,而Starlette本身即是支持ASGIweb框架,为python-web-app提供了路由、中间件相关的应用级底层支持。FastAPI实际是对Starlette的包装,相关handlermiddleware的注册也是给到Starlette框架里面的。针对web-server发来的请求,FastAPI在设置一些环境信息后,最终也是交由Starlette底层处理。

    之后回到uvicorn,看一下startup的实现:

    # uvicorn/server.py
    class Server:
        async def startup(self, sockets: list = None) -> None:
            await self.lifespan.startup()
            if self.lifespan.should_exit:
                self.should_exit = True
                return
            config = self.config
    
            async def handler(
                reader: asyncio.StreamReader, writer: asyncio.StreamWriter
            ) -> None:  # http-handler
                await handle_http(
                    reader, writer, server_state=self.server_state, config=config
                )
    
            # 这里省略其他分支
            try:
                server = await asyncio.start_server(
                    handler,
                    host=config.host,
                    port=config.port,
                    ssl=config.ssl,
                    backlog=config.backlog,
                )
            except OSError as exc:
                logger.error(exc)
                await self.lifespan.shutdown()
                sys.exit(1)
            
            # 下略
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    startup分两步:

    • 初始化lifespan
    • 定义http-handler,通过asyncio.start_server启动http-server

    在初始化lifespan过程中,uvicorn会发送lifespan.startup事件,这个事件就会被FastAPI-appASGI捕获到,最终层层往下,会走到StarletteRouter实例:

    # starlette/routing.py
    class Router:
        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            assert scope["type"] in ("http", "websocket", "lifespan")
            if "router" not in scope:
                scope["router"] = self
            if scope["type"] == "lifespan":
                await self.lifespan(scope, receive, send)  # 走到这里
                return
            # 下略
    
        async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
            first = True
            app = scope.get("app")
            await receive()
            try:
                if inspect.isasyncgenfunction(self.lifespan_context):
                    async for item in self.lifespan_context(app):  # 调用lifespan-event
                        first = False
                        await send({"type": "lifespan.startup.complete"})
                        await receive()
                # 下略
            except Exception as e:
                pass
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    StartletteRouter检测到lifespan事件时,就会走到lifespan逻辑,其中会看lifespan的当前阶段是否有对应的hook函数,有的话就执行。当前阶段是lifespan.startup,因此如果我们在FastAPI中定义了这个协程,就可以在startup阶段执行到:

    # register startup event
    @APP.on_event('startup')
    async def start_app():
        pass
    
    • 1
    • 2
    • 3
    • 4

    lifespan.startup之后,就定义http-handler并绑到listen-server上。http-handler会解析请求数据,然后调用appASGI接口处理请求,大致是这样的链路:

    class H11Protocol(asyncio.Protocol):
        def handle_events(self):
            while True:
                if event_type is h11.Request:
                    task = self.loop.create_task(self.cycle.run_asgi(app))
    
    class RequestResponseCycle:
        async def run_asgi(self, app):
            try:
                result = await app(self.scope, self.receive, self.send)
            except Exception as e:
                pass
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    好比我们GET健康检查接口/api/v1/core/health,那么最终被FastAPI-app捕获到的请求数据里,scope长这样:

    scope = {
        "type": "http",
        "method": "GET",
        "root_path": ""
        "path": "/api/v1/core/health",
        "query_string": b""
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    根据这些信息,层层往下,就会又走到Starlette的路由逻辑:

    # starlette/routing.py
    class Router:
        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            # 上略
            # 有全部匹配的路由就直接处理
            for route in self.routes:
                match, child_scope = route.matches(scope)
                if match == Match.FULL:
                    scope.update(child_scope)
                    await route.handle(scope, receive, send)  # 路由实例来handle
                    return
                elif match == Match.PARTIAL and partial is None:
                    partial = route
                    partial_scope = child_scope
            # 匹配部分匹配的路由
            if partial is not None:
                scope.update(partial_scope)
                await partial.handle(scope, receive, send)  
                return
            # 重定向
            if scope["type"] == "http" and self.redirect_slashes and scope["path"] != "/":
                redirect_scope = dict(scope)
                if scope["path"].endswith("/"):
                    redirect_scope["path"] = redirect_scope["path"].rstrip("/")
                else:
                    redirect_scope["path"] = redirect_scope["path"] + "/"
                for route in self.routes:
                    match, child_scope = route.matches(redirect_scope)
                    if match != Match.NONE:
                        redirect_url = URL(scope=redirect_scope)
                        response = RedirectResponse(url=str(redirect_url))
                        await response(scope, receive, send)
                        return
            # 默认逻辑
            await self.default(scope, receive, send)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    由于我们在start-fastapi项目中,通过APIRouter定义了这个路由的handler,注册到了Starlette中:

    # ============ start-fastapi ============
    # core/handler/base.py
    ROUTER = APIRouter()
    
    
    @ROUTER.get('/api/v1/core/health')
    def health_check():
        return Resp.ok(message='ok')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    那么/api/v1/core/health就会被完整匹配,走到对应路由实例的handle步骤:

    # starlette/routing.py
    class Route(BaseRoute):
        async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
            if self.methods and scope["method"] not in self.methods:  # 没有对应的method
                if "app" in scope:
                    raise HTTPException(status_code=405)
                else:
                    response = PlainTextResponse("Method Not Allowed", status_code=405)
                await response(scope, receive, send)
            else:  # 有method,直接处理
                await self.app(scope, receive, send)
    
                
    def request_response(func: typing.Callable) -> ASGIApp:
        is_coroutine = iscoroutinefunction_or_partial(func)
        async def app(scope: Scope, receive: Receive, send: Send) -> None:
            request = Request(scope, receive=receive, send=send)
            if is_coroutine:
                response = await func(request)
            else:
                response = await run_in_threadpool(func, request)
            await response(scope, receive, send)
        return app
    
    
    # fastapi/routing.py
    def get_request_handler() -> Callable[[Request], Coroutine[Any, Any, Response]]:
        raw_response = await run_endpoint_function(
            dependant=dependant, values=values, is_coroutine=is_coroutine
        )
        
     
    async def run_endpoint_function(
        *, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool
    ) -> Any:
        assert dependant.call is not None, "dependant.call must be a function"
        if is_coroutine:
            return await dependant.call(**values)
        else:
            return await run_in_threadpool(dependant.call, **values)
        
        
    async def run_in_threadpool(
        func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
    ) -> T:
        loop = asyncio.get_event_loop()
        if contextvars is not None:  # pragma: no cover
            # Ensure we run in the same context
            child = functools.partial(func, *args, **kwargs)
            context = contextvars.copy_context()
            func = context.run
            args = (child,)
        elif kwargs:  # pragma: no cover
            func = functools.partial(func, **kwargs)
        return await loop.run_in_executor(None, func, *args)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    由于我们对健康检查路由定义了GET方法,那么这个路由就支持处理。最终来到了FastAPIrun_endpoint_function方法,调用我们定义的Controller。由于我们是直接def health_check(),因此会走到loop.run_in_executor线程池方法,去执行Controller,然后返回结果。否则如果是async def定义的Controller的话,就直接await

    所以整个请求返回的链路就完成了,而且我们也会看到,针对需要耗时耗CPU的请求,尽量不要用async def定义FastAPIController,否则会有阻塞整个asyncio事件循环的风险,而用线程池处理就可以规避这种情况。

  • 相关阅读:
    外汇天眼:德国PPI利淡欧美镑美跌逾百点,美元涨近百点,黄金跌约20美元,关注美制造业指数
    朴素贝叶斯(基于概率论)
    真正意义上的产业互联网,其实是和互联网没有太多的关联的
    面试题解(Day04)
    什么是间谍软件恶意软件?
    含文档+PPT+源码等]精品基于Uniapp+SSM实现的校园心理健康APP[包运行成功]Android毕业设计Java项目源码论文
    JAVA毕业设计飞机航班信息查询系统计算机源码+lw文档+系统+调试部署+数据库
    41、集合
    python自动开发,基础——1
    webpack分环境打包(生产/开发两套打包)
  • 原文地址:https://blog.csdn.net/u013842501/article/details/126332364