• 异步并发怎么做?


    1、flask的异步并发

    问题

    • flask在开发环境下是单线程的,如果某个请求长时间无响应(阻塞),会导致其他请求也无法响应。
    • flask原生并不支持协程异步并发。也就是说即使使用了协程异步编程,但如果使用flask开发的app接口,外部并发请求时,flask还是会把这些 "已经异步编程的接口"当成 同步接口执行。

    解决办法

    • 1、使用多线程或者多进程的服务器,例如 Gunicorn 或者 uWSGI。这些服务器可以同时处理多个请求。
    • 2、使用异步非阻塞的服务器,例如 Tornado 或者 Twisted。这些服务器可以在处理一个请求的时候,如果遇到 IO 阻塞,就先去处理其他的请求,等 IO 完成后再回来继续处理这个请求。
    • 3、使用异步编程,例如 asyncio 或者 gevent。这些库可以让你的代码在遇到 IO 阻塞的时候,自动切换到其他的任务,从而提高整体的并发性能。
      但flask并不支持异步编程,推荐支持异步编程的 Web 框架,如 aiohttp 或 fastapi。
    • 4、对于超时的请求,可以考虑设置一些超时机制,例如使用 future 或者 promise,如果一个请求超时,就直接返回错误,不再等待它完成。这样可以避免一个请求阻塞住整个服务器。

    实现方案

    (1)falsk + 异步视图装饰器

    本质: 多线程并发。虽然宏观上看好像是协程的并发(单线程的并发)。

    虽然asynic await关键字旨在实现同一线程里的并发,但flask并不原生支持这一特性(“不支持”不是说“不能调用”)。
    所以,同一时间、同一线程、多个请求并发请求时,不管有无asynic await关键字修饰请求接口,flask都是一个一个顺序的、同步处理他们,而不是并发处理。

    这里的装饰器实现的功能:
    Making Flask async and Quart sync, Quart 的作者 PG Jones 给出了一个 Flask 异步化的代码,route 方法可加上 async 关键字@run_async 装饰

    当并发请求时,给每个请求开创一个线程单独的处理,这样并发的请求就被放到了多线程里,从宏观上看就是并发了,这样也不会因为某个接口请求阻塞而导致其他接口也无法响应的问题。
    这种异步装饰器,只是宏观上的“像”协程并发(协程:单线程下的并发),其实是多线程并发

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    '''
    @Project :hippo-ai-py 
    @Author  :cf
    @Date    :2024/4/2
    @Desc    : api公用信息
    '''
    
    import asyncio
    from concurrent.futures import Future, ThreadPoolExecutor
    from functools import wraps
    
    from flask import Flask, has_request_context, copy_current_request_context
    
    
    def run_async(func):
        '''
        flask异步视图装饰器。
        Args:
            func: 调用的方法
    
        Returns:
    
        '''
        @wraps(func)
        def _wrapper(*args, **kwargs):
            call_result = Future()
    
            def _run():
                loop = asyncio.new_event_loop()
                try:
                    result = loop.run_until_complete(func(*args, **kwargs))
                except Exception as error:
                    call_result.set_exception(error)
                else:
                    call_result.set_result(result)
                finally:
                    loop.close()
    
            loop_executor = ThreadPoolExecutor(max_workers=1)
            if has_request_context():
                _run = copy_current_request_context(_run)
            loop_future = loop_executor.submit(_run)
            loop_future.result()
            return call_result.result()
    
        return _wrapper
    
    
    app = Flask(__name__)
    
    async def fetch(url):
        print(f"{threading.current_thread().name}:{url}")
        return requests.get(url).text
    
    
    async def main(t):
        await asyncio.sleep(t)
        tasks = [fetch(url) for url in ["https://baidu.com", "https://bing.com", "https://yanbin.blog"]]
        return await asyncio.gather(*tasks)
    
    
    @app.route("/")
    @run_async
    async def index():
        time1 = time.time()
        responses = await main(3)
        time2 = time.time()
        print(f'{threading.current_thread().name}--3:response sizes: {[len(res) for res in responses]},耗时{time2 - time1}s\n')
       return responses 
    
    
    if __name__ == "__main__":
        # app.run(debug=False, use_reloader=False,threaded=False)  
        app.run(debug=False, use_reloader=False)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    (2)WSGI启动服务

    本质: 多线程、多进程的并发。

    如何在 uwsgi 配置中传递自定义参数?

    2、fastapi异步编程

    本质: 协程(单线程)并发。

    待续。。。。。。。。。

  • 相关阅读:
    传输中的差错检验技术
    使用jmx_exporter监控Kafka
    ACPI规范概览-2
    MySQL事务详解
    2022最新阿里Java面经,转疯了
    ODrive移植keil(五)—— 开环控制和电流变换
    【毕业设计】Stm32酒驾检查系统 - 单片机 嵌入式 物联网
    华大(现在改名小华半导体)芯片启动文件详细讲解
    「前端进阶」从多线程角度来看 Event Loop
    Jenkins权限配置和构建VUE项目
  • 原文地址:https://blog.csdn.net/qq_19072921/article/details/138166204