• python异步编程之asyncio高阶API


    image

    1|0asyncio 高阶API列表


    asyncio中函数可以分为高阶函数和低阶函数。低阶函数用于调用事件循环、linux 套接字、信号等更底层的功能,高阶函数是屏蔽了更多底层细节的任务并发,任务执行函数。通常开发中使用更多的是高阶函数。本篇主要介绍asyncio中常用的高阶函数。
    由于asyncio在不同的版本中有差异,本文以及本系列都以python3.10为准。

    函数 功能
    run() 创建事件循环,运行一个协程,关闭事件循环。
    create_task() 创建一个asyncio的Task对象
    await sleep() 休眠几秒
    await gather() 并发执行所有事件的调度和等待
    await wait_for() 有超时控制的运行
    await shield() 屏蔽取消操作
    await wait() 完成情况的监控器
    current_task() 返回当前Task对象
    all_tasks() 返回事件循环中所有的task对象
    Task Task对象
    to_thread() 在不同的 OS 线程中异步地运行一个函数
    run_coroutine_threadsafe() 从其他OS线程中调度一个协程
    for in as_completed() 用 for 循环监控完成情况

    2|0run


    函数原型:

    asyncio.run(coro, *, debug=False)

    功能:创建事件循环,运行传入的协程。该函数总是会创建一个新的事件循环并在结束时关闭它,应该被当做asyncio程序的主入口点。run() 函数是用来创建事件,将task加入事件,运行事件的函数。

    async def main():
        await asyncio.sleep(1)
        print('hello')
    
    asyncio.run(main())

    run() 从功能上等价于以下低阶API。获取一个事件循环,创建一个task,加入事件循环。

    loop = asyncio.get_event_loop()
    task = loop.create_task(main())
    loop.run_until_complete(task)

    3|0create_task


    函数原型:

    asyncio.create_task(coro, *, name=None)

    功能:将协程函数封装成一个Task。协程函数没有生命周期,但是Task有生命周期。
    将协程打包为一个 Task 并自动寻找事件循环加入。返回 Task 对象。该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

    async def coro():
        await asyncio.sleep(1)
        print("i am coro")
    
    
    async def main():
        task = asyncio.create_task(coro())
        print(f"task状态:{task._state}")
        await asyncio.sleep(2)
        print(f"task状态:{task._state}")
        print("i am main")
    
    asyncio.run(main())

    结果:

    task状态:PENDING
    i am coro
    task状态:FINISHED
    i am main

    结果分析:
    可以看到task运行中的状态和结束的生命周期状态

    4|0gather


    函数原型:

    asyncio.gather(*aws, return_exceptions=False)

    功能:
    并发执行所有可等待对象,收集任务结果,返回所有已经完成的task的结果。结果将是一个由所有返回值组成的列表。结果值的顺序与传入的task的顺序一致。可等待对象可以是协程和task。
    如果序列中是协程而不是task,那么会将其自动封装成task加入事件循环。

    import asyncio
    
    async def coro(value):
        print(f"hello coro{value}")
        return f"coro{value}"
    
    async def main():
        tasks = [coro(i) for i in range(5)]
        res = await asyncio.gather(*tasks)
        for i in res:
            print(i)
    
    asyncio.run(main())

    结果:

    hello coro0
    hello coro1
    hello coro2
    hello coro3
    hello coro4
    coro0
    coro1
    coro2
    coro3
    coro4

    结果分析:
    获取了所有协程的返回值,并且返回的顺序和任务的顺序一致。

    5|0wait


    函数原型:

    asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

    功能:
    并发地运行序列中的可等待对象,并进入阻塞状态直到满足 return_when 所指定的条件。将task任务结果收集起来,返回两个 Task/Future 集合: (done, pending)。done是已经完成的任务,pending是未完成的任务,未完成的原因可能是超时或return_when策略。
    aws:
    aws中保存的是task而不是协程,从3.8起不建议传入协程,3.11将不再支持传入协程。
    timeout:
    如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。
    请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将不会继续执行,不会返回结果。
    return_when:
    return_when 指定此函数应在何时返回。它必须为以下参数之一:

    参数 描述
    FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
    FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。
    ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

    基础使用示例

    import asyncio
    
    
    async def coro(value):
        print(f"hello coro{value}")
        return f"coro{value}"
    
    async def main():
        tasks = [asyncio.create_task(coro(i)) for i in range(5)]
        done, pending = await asyncio.wait(tasks)
        for i in done:
            print(i.result())
    
    asyncio.run(main())

    结果:

    hello coro0
    hello coro1
    hello coro2
    hello coro3
    hello coro4
    coro1
    coro2
    coro0
    coro3
    coro4

    结果分析:
    返回结果和执行顺序并不是一致的

    指定超时时间

    import asyncio
    from asyncio import FIRST_COMPLETED
    
    async def coro(value):
        print(f"hello coro{value}")
        await asyncio.sleep(value)
        return f"coro{value}"
    
    async def main():
        tasks = [asyncio.create_task(coro(i)) for i in range(5)]
        done, pending = await asyncio.wait(tasks, timeout=3)
    
        print("---------finish----------")
        for i in done:
            print(i.result())
    
        print("---------pending----------")
        for i in pending:
            print(i)
    
    asyncio.run(main())
    hello coro0
    hello coro1
    hello coro2
    hello coro3
    hello coro4
    ---------finish----------
    coro1
    coro2
    coro0
    ---------pending----------
    <Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future finished result=None>>
    <Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
    
    

    结果分析:
    超时未完成的task会保存在pending中,未完成的task在超时之后不会继续执行,没有返回结果。

    return_when配置任意任务完成就返回

    import asyncio
    from asyncio import FIRST_COMPLETED
    
    async def coro(value):
        print(f"hello coro{value}")
        await asyncio.sleep(value)
        return f"coro{value}"
    
    async def main():
        tasks = [asyncio.create_task(coro(i)) for i in range(5)]
        done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
    
        print("---------finish----------")
        for i in done:
            print(i.result())
    
        print("---------pending----------")
        for i in pending:
            print(i)
    
    asyncio.run(main())
    
    
    

    结果:

    hello coro0
    hello coro1
    hello coro2
    hello coro3
    hello coro4
    ---------finish----------
    coro0
    ---------pending----------
    <Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
    <Task pending name='Task-3' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
    <Task pending name='Task-4' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
    <Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
    
    

    结果分析:
    获取到任意结果就返回,未完成的task保存在pending中。未完成的task在超时之后不会继续执行。

    6|0as_completed


    函数原型:

    asyncio.as_completed(aws, *, timeout=None)

    说明:并发执行aws中保存的可等待对象,返回一个协程的迭代器。可以从迭代器中取出最先执行完成的task的结果。返回结果和执行顺序不一致。aws中可以是task或协程序列。

    import asyncio
    
    
    async def coro(value):
        print(f"hello coro{value}")
        return f"coro{value}"
    
    async def main():
        tasks = [coro(i) for i in range(5)]
        for item in asyncio.as_completed(tasks):
            res = await item
            print(res)
    
    
    asyncio.run(main())
    
    

    结果:

    hello coro2
    hello coro3
    hello coro4
    hello coro1
    hello coro0
    coro2
    coro3
    coro4
    coro1
    coro0
    
    

    结果分析:
    所有任务都会执行完成,没有超时配置。返回顺序和执行顺序无关。

    7|0gather、wait、as_completed 异同点小结


    asyncio协程体系中可以实现创建多个任务并发执行的函数有以下三个:

    • asyncio.gather
    • asyncio.wait
    • asyncio.as_completed

    不同之处比较:

    特性/函数 gather wait as_completed
    入参 同时支持task和协程序列 只支持task序列 同时支持task和协程序列
    获取结果顺序 有序,和并发序列顺序相同 无序,和并发序列无关 无序,和并发序列无关
    返回 返回结果列表,保存的是函数返回值。 返回元组done、pending。元组中保存的是task,而非task 的函数返回值 返回一个迭代器,从中可迭代出函数返回值。

    8|0wait for


    函数原型:

    asyncio.wait_for(aw, timeout)

    功能:执行单个可等待对象,指定 timeout 秒数后超时
    等待可等待对象完成,指定timeout秒数后超时。和gather类似,可以自动将协程转化成任务加入循环。
    timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
    如果发生超时,任务将取消并引发 asyncio.TimeoutError。

    async def coro():
        # 睡眠5s
        await asyncio.sleep(3600)
        print('finish!')
    
    async def main():
        # Wait for at most 1 second
        try:
            await asyncio.wait_for(coro(), timeout=1.0)
        except asyncio.TimeoutError:
            print('timeout!')
    
    asyncio.run(main())

    结果:

    timeout!

    高阶API中常用的函数基本就是这些,下一篇分析低阶函数。

    连载一系列关于python异步编程的文章。包括同异步框架性能对比、异步事情驱动原理等。欢迎关注微信公众号第一时间接收推送的文章。


    __EOF__

    本文作者goldsunshine
    本文链接https://www.cnblogs.com/goldsunshine/p/17949341.html
    关于博主:评论和私信会在第一时间回复。或者直接私信我。
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
    声援博主:如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。您的鼓励是博主的最大动力!
  • 相关阅读:
    Django框架之模板层template的一些介绍和使用
    远程桌面连接时license错误解决方法
    虚拟标签做添加点击事件,e.target 方法
    开源流式湖仓服务 Arctic 详解:并非另一套 Table Format
    MariaDB Tutorial
    Vue3 路由优化,使页面初次渲染效率翻倍
    2023年中国负极材料分类、产量及市场规模分析[图]
    JVM:运行时数据区-方法区
    【中秋特辑】赛博朋克『静夜思』,金属摇滚『月饼』,落霞秋水『思乡』,AI画笔下的中秋长这样!赠你 8400 个月亮 | ShowMeAI资讯日报
    android 11.0 获取当前界面的APP ,在APP的界面禁止灭屏
  • 原文地址:https://www.cnblogs.com/goldsunshine/p/17949341