• python异步编程之asyncio低阶API


    image

    1|0低阶API介绍


    asyncio中低阶API的种类很多,涉及到开发的5个方面。包括:

    1. 获取事件循环
    2. 事件循环方法集
    3. 传输
    4. 协议
    5. 事件循环策略

    本篇中只讲解asyncio常见常用的函数,很多底层函数如网络、IPC、套接字、信号等不在本篇范围。

    2|0获取事件循环


    事件循环是异步中重要的概念之一,用于驱动任务的执行。包含的低阶API如下:

    函数 功能
    asyncio.get_running_loop() 获取当前运行的事件循环首选函数。
    asyncio.get_event_loop() 获得一个事件循环实例
    asyncio.set_event_loop() 将策略设置到事件循环
    asyncio.new_event_loop() 创建一个新的事件循环

    在asyncio初识这篇中提到过事件循环,可以把事件循环当做是一个while循环,在周期性的运行并执行一些任务。这个说法比较抽象,事件循环本质上其实是能调用操作系统IO模型的模块。以Linux系统为例,IO模型有阻塞,非阻塞,IO多路复用等。asyncio 常用的是IO多路复用模型的epoolkqueue。事件循环原理涉及到异步编程的操作系统原理,后续更新一系列相关文章。

    get_event_loop()
    创建一个事件循环,用于驱动协程的执行

    import asyncio
    
    async def demo(i):
        print(f"hello {i}")
    
    def main():
        loop = asyncio.get_event_loop()
        print(loop._selector)
        task = loop.create_task(demo(1))
        loop.run_until_complete(task)
    
    main()

    结果:

     object at 0x104eabe20>
    hello 1

    可以通过loop._selector属性获取到当前事件循环使用的是kqueue模型

    获取循环

    import asyncio
    
    async def demo(i):
        res = asyncio.get_running_loop()
        print(res)
        print(f"hello {i}")
    
    
    def main():
        loop = asyncio.get_event_loop()
        task = loop.create_task(demo(1))
        loop.run_until_complete(task)
    main()

    结果:

    <_UnixSelectorEventLoop running=True closed=False debug=False>
    hello 1
    
    

    推荐使用asyncio.run 创建事件循环,底层API主要用于库的编写。

    3|0生命周期


    生命周期是用于管理任务的启停的函数,如下:

    函数 功能
    loop.run_until_complete() 运行一个期程/任务/可等待对象直到完成。
    loop.run_forever() 一直运行事件循环,直到被显示停止
    loop.stop() 停止事件循环
    loop.close() 关闭事件循环
    loop.is_running() 返回 True , 如果事件循环正在运行
    loop.is_closed() 返回 True ,如果事件循环已经被关闭
    await loop.shutdown_asyncgens() 关闭异步生成器

    run_until_complete
    运行一个期程/任务/可等待对象直到完成。run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task。run_until_complete()是会自动关闭事件循环的函数,区别于run_forever()是需要手动关闭事件循环的函数。

    import asyncio 
    
    
    async def demo(i):
        print(f"hello {i}")
    
    
    def main():
        loop = asyncio.get_event_loop()
        
        task = loop.create_task(demo(1))
    
        # 传入的是一个任务
        loop.run_until_complete(task)
    
        # 传入的是一个协程也可以
        loop.run_until_complete(demo(20))
    
    
    main()

    结果:

    hello 1
    hello 20

    4|0调试


    函数 功能
    loop.set_debug() 开启或禁用调试模式
    loop.get_debug() 获取当前测试模式

    5|0调度回调函数


    在异步编程中回调函数是一种很常见的方法,想要在事件循环中增加一些回调函数,可以有如下方法:

    函数 功能
    loop.call_soon() 尽快调用回调。
    loop.call_soon_threadsafe() loop.call_soon() 方法线程安全的变体。
    loop.call_later() 在给定时间之后调用回调函数。
    loop.call_at() 在指定的时间调用回调函数。

    这些回调函数既可以回调普通函数也可以回调协程函数。
    call_soon
    函数原型:

    loop.call_soon(callback, *args, context=None)

    示例:

    import asyncio
    
    async def my_coroutine():
        print("协程被执行")
    
    async def other_coro():
        print("非call_soon调用")
    
    def callback_function():
        print("回调函数被执行")
    
    
    # 创建一个事件循环
    loop = asyncio.get_event_loop()
    
    # 使用create_task包装协程函数,并调度执行
    loop.call_soon(loop.create_task, my_coroutine())
    
    # 调度一个常规函数以尽快执行
    loop.call_soon(callback_function)
    
    # 启动一个事件循环
    task = loop.create_task(other_coro())
    loop.run_until_complete(task)

    结果:

    回调函数被执行
    call_soon调用
    协程被执行

    结果分析:
    call_soon调用普通函数直接传入函数名作为参数,调用协程函数需要讲协程通过loop.create_task封装成task。

    6|0线程/进程池


    函数 功能
    await loop.run_in_executor() 多线程中运行一个阻塞的函数
    loop.set_default_executor() 设置 loop.run_in_executor() 默认执行器

    asyncio.run_in_executor 用于在异步事件循环中执行一个阻塞的函数或方法。它将阻塞的调用委托给一个线程池或进程池,以确保不阻塞主事件循环。可以用于在协程中调用一些不支持异步编程的方法,不支持异步编程的模块。

    run_in_executor

    import asyncio
    import concurrent.futures
    
    def blocking_function():
        # 模拟一个阻塞的操作
        import time
        time.sleep(2)
        return "阻塞函数返回"
    
    async def async_function2():
        print("async_function2 start")
        await asyncio.sleep(1)
        print("async_function2 end")
    
    async def async_function():
        print("异步函数开始执行。。。")
    
        print("调用同步阻塞函数")
        # 使用run_in_executor调度执行阻塞函数
        result = await loop.run_in_executor(None, blocking_function)
    
        print(f"获取同步函数的结果: {result}")
    
    # 创建一个事件循环
    loop = asyncio.get_event_loop()
    
    # 运行异步函数
    loop.run_until_complete(asyncio.gather(async_function(), async_function2()))

    结果:

    异步函数开始执行。。。
    调用同步阻塞函数
    async_function2 start
    async_function2 end
    获取同步函数的结果: 阻塞函数返回

    结果分析:
    通过事件循环执行任务async_function,在async_function中通过loop.run_in_executor调用同步阻塞函数blocking_function,该阻塞函数没有影响事件循环中另一个任务async_function2的执行。
    await loop.run_in_executor(None, blocking_function)中None代表使用的是默认线程池,也可以替换成其他线程池。

    使用自定义线程池和进程池

    import asyncio
    import concurrent.futures
    
    def blocking_function():
        # 模拟一个阻塞的操作
        import time
        time.sleep(2)
        return "阻塞函数返回"
    
    async def async_function():
        print("异步函数开始执行。。。")
    
        print("调用同步阻塞函数")
    
        # 线程池
        with concurrent.futures.ThreadPoolExecutor() as pool:
            result = await loop.run_in_executor(
                pool, blocking_function)
            print('线程池调用返回结果:', result)
    
        # 进程池
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(
                pool, blocking_function)
            print('进程池调用返回结果:', result)
    
    if __name__ == '__main__':
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
    
        # 运行异步函数
        loop.run_until_complete(async_function())
    
    
    

    结果:

    异步函数开始执行。。。
    调用同步阻塞函数
    线程池调用返回结果: 阻塞函数返回
    进程池调用返回结果: 阻塞函数返回

    结果分析:
    通过线程池concurrent.futures.ThreadPoolExecutor()和进程池concurrent.futures.ProcessPoolExecutor()执行阻塞函数。

    7|0任务与期程


    函数 功能
    loop.create_future() 创建一个 Future 对象。
    loop.create_task() 将协程当作 Task 一样调度。
    loop.set_task_factory() 设置 loop.create_task() 使用的工厂,它将用来创建 Tasks 。
    loop.get_task_factory() 获取 loop.create_task() 使用的工厂,它用来创建 Tasks 。

    create_future
    create_future 的功能是创建一个future对象。future对象通常不需要手动创建,因为task会自动管理任务结果。相当于task是全自动,创建future是半自动。创建的future就需要手动的讲future状态设置成完成,才能表示task的状态为完成。

    import asyncio
    
    
    def foo(future, result):
        print(f"此时future的状态:{future}")
        future.set_result(result)
        print(f"此时future的状态:{future}")
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
    
        # 手动创建future对象
        all_done = loop.create_future()
    
        # 设置一个回调函数用于修改设置future的结果
        loop.call_soon(foo, all_done, "Future is done!")
    
        result = loop.run_until_complete(all_done)
    
        print("返回结果", result)
        print("获取future的结果", all_done.result())
    
    

    结果:

    此时future的状态:<Future pending cb=[_run_until_complete_cb() at /Users/lib/python3.10/asyncio/base_events.py:184]>
    此时future的状态:<Future finished result='Future is done!'>
    返回结果 Future is done!
    获取future的结果 Future is done!

    结果分析:
    future设置结果之后之后,future对象的状态就从pending变成finished状态。如果一个future没有手动设置结果,那么事件循环就不会停止。

    create_task
    将协程封装成一个task对象,事件循环主要操作的是task对象。协程没有状态,而task是有状态的。

    import asyncio 
    
    
    async def demo(i):
        print(f"hello {i}")
        await asyncio.sleep(1)
    
    def main():
        loop = asyncio.get_event_loop()
    
        # 将携程封装成task,给事件使用
        task = loop.create_task(demo(1))
    
        loop.run_until_complete(task)
    
    main()
    >>> 
    hello 1

    asyncio.create_task 和 loop.create_task的区别:
    两者实现的功能都是一样的,将协程封装成一个task,让协程拥有了生命周期。区别仅仅在于使用的方法。asyncio.create_task 是高阶API,不需要创建事件循环,而loop.create_task需要先创建事件循环再使用该方法。

    8|0小结


    以上是asyncio低阶API的使用介绍,前一篇是高阶API的使用介绍,用两篇介绍了asyncio常见的函数,以后遇到asyncio相关的代码就不会感到陌生。虽然asyncio是比较复杂的编程思想,但是有了这些函数的使用基础,能够更高效的掌握。

    连载一系列关于python异步编程的文章。包括同异步框架性能对比、异步事情驱动原理等。欢迎关注微信公众号第一时间接收推送的文章。


    __EOF__

    本文作者goldsunshine
    本文链接https://www.cnblogs.com/goldsunshine/p/17950911.html
    关于博主:评论和私信会在第一时间回复。或者直接私信我。
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
    声援博主:如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。您的鼓励是博主的最大动力!
  • 相关阅读:
    python脚本-网页爬虫获取网页图片
    了解HTTP协议
    如何禁用 HTTP TRACE/TRACK
    Acwing1015. 摘花生
    开创性的区块链操作系统项目——生物识别与机器学习
    18 矩阵置0
    进入vue之前需要了解的打包工具
    Android JVM学习(扎实基础版)
    AI短视频批量剪辑系统|罐头鱼AI工具|视频矩阵获客
    利用VB宏设置将多个excel表合并为一个
  • 原文地址:https://www.cnblogs.com/goldsunshine/p/17950911