• 浅谈Coroutine使用方法


    1.概述

    了解携程可以看一下携程和任务
    那什么是携程,先从官方的解释入手看一下什么事Coroutines

    Coroutines are a more generalized form of subroutines. Subroutines are entered at one point and exited at another point. Coroutines can be entered, exited, and resumed at many different points. They can be implemented with the async def statement. See also PEP 492 .

    Glossary — Python 3.8.2 documentation

    Coroutine是一个可以暂停执行将执行权让给其他Coroutine或者Awaitables obj的函数,等待执行完成后继续执行,并可以多次进行这样的暂停与继续。功能有点像cpu任务调度的时间片的概念,并发执行任务时,cpu调用给每一个任务分配了一定的时间片,时间片到了之后挂起当前的线程转给其他线程使用。

    Asyncio is a library to write concurrent code using the async/await
    syntax. asyncio is used as a foundation for multiple Python
    asynchronous frameworks that provide high-performance network and
    web-servers, database connection libraries, distributed task queues,
    etc.

    asyncio — Asynchronous I/O — Python 3.8.2 documentation

    那如果要判斷一個函數是不是 Coroutine?

    • 可以使用 asyncio.iscoroutinefunction(func) 方法判別。
    • 如果要判斷一個函數返回的是不是 Coroutine 對象? 可以使用 asyncio.iscoroutine(obj) 方法判別。

    2.asyncio使用

    环境要求:

    1. Python 3.5+ 開始可以使用 asyncio 標準庫和 await syntax 語法糖來寫 Coroutines:PEP 492 — Coroutines with async and await syntax | Python.org
    2. Python 3.7+ 開始可以使用 asyncio.run( ):cpython/runners.py at 3.8 · python/cpython · GitHub
    # @File : demo1.py
    import asyncio
    import time
    
    now = lambda : time.time()
    
    async def dosomething(num):
        print(f'第{num}任务,第一步')
        await asyncio.sleep(2)
        print(f'第{num}任务,第二步')
    
    if __name__ == '__main__':
        start = now()
        tasks = [dosomething(i) for i in range(5)]
        asyncio.run(asyncio.wait(tasks))
        print('Time: ', now() - start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出结果如下:

    4任务,第一步
    第1任务,第一步
    第3任务,第一步
    第0任务,第一步
    第2任务,第一步
    第4任务,第二步
    第1任务,第二步
    第3任务,第二步
    第0任务,第二步
    第2任务,第二步
    Time:  2.003861904144287
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    如果只是使用普通的函数调用,在函数dosomething中sleep(2),则运行完成需要耗时10s+

    2.1 async / await

    async / await是 Python3.5+之后出现的语法糖,让Coroutine和Coroutine之间的调度更加清晰。总体来说:

    • async: 用来告知function有异步的功能
    • await:用来标记Coroutine切换暂停和继续的点,类似一个tag

    使用 async 来宣告一个 native Coroutine:
    注意:async def func 内无法与 yield 或 yield from 共同使用,会引发SyntaxError 错误
    使用方法:将 async 加在 function 前面,如下所示:

    async def read_data(db):
        pass
    
    • 1
    • 2

    使用await让Coroutine挂起:
    注意: await 后面必须接一个 Coroutine 对象或是 awaitable 类型的对象
    await 的目的是将控制权回传给事件循环 (event loop) 并等待返回,而背后实现暂停挂起函数操作的是 yield
    使用方法:加在要等待的 function 前面,如下所示:

    async def read_data(db):
        data = await db.fetch('SELECT ...')
    
    • 1
    • 2

    那什么事Awaitables特性?
    有三种主要类型:coroutines、 Tasks 、Futures

    1. coroutines:一个 async def 函数就是 coroutine,也因为 awaitables 特性所以可以等待其他 coroutine。
    2. tasks:tasks 是用来调度 coroutines,可通过 asyncio.create_task( ) 來打包 coroutines。
    3. futures:futures 是一个异步操作 (asynchronous operation) 返回的結果

    2.2 如何建事件循环?

    Python 3.5+ 使用 asyncio.get_event_loop 先建立一个 event_loop,然后再将 Coroutine 放进 run_until_complete() 里面,直到所有 Coroutine 运行结束

    import asyncio
    
    async def hello_world(x):
        print('hello_world x' + str(x))
        await asyncio.sleep(x)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(hello_world(3))
    loop.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Python 3.7推出更简单的方法:
    Python 3.7+ 之后将 loop 封裝,只需要使用 asyncio.run() 一行程式就結束,不用在建立 event_loop 结束时也不需要 loop.close,因为他都帮你做完了,有兴趣可以參考:cpython/runners.py at 3.8 · python/cpython · GitHub

    import asyncio
    
    async def hello_world(x):
        print('hello_world x' + str(x))
        await asyncio.sleep(x)
    
    asyncio.run(hello_world(2))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.3 如何建立任务Task

    建立任务有两种方法:

    • asyncio.create_task() : Python 3.7+以上可以使用
    • asyncio.ensure_future():可读性较差
    # In Python 3.7+
    task = asyncio.create_task(main())
    
    # This works in all Python versions but is less readable
    task = asyncio.ensure_future(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里给出一个完整的示例:

    # @File : demo2.py
    import asyncio
    import time
    
    async def dosomething(num):
        print('start{}'.format(num))
        await asyncio.sleep(num)
        print('sleep{}'.format(num))
    
    async def main():
        task1 = asyncio.create_task(dosomething(1))
        task2 = asyncio.create_task(dosomething(2))
        task3 = asyncio.create_task(dosomething(3))
        await task1
        await task2 
        await task3
    
    if __name__ == '__main__':
        time_start = time.time()
        asyncio.run(main())
        print(time.time() - time_start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    输出结果如下:

    start1
    start2
    start3
    sleep1
    sleep2
    sleep3
    3.000866651535034
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.4 如何同时运行多个Tasks任务

    使用 asyncio.gather(),可同时放入多个Coroutinues或者 awaitable object进入事件循环(event loop),等待Coroutinues都结束后,并依续收集其回传的值。
    asyncio.gather( *aws, loop=None, return_exceptions=False)

    *aws :可传入多个 awaitable objects
    Loop:此参数将会在 Python version 3.10 移除
    return_exceptions:default 是 False,当发生 exception 时会立即中断 task,如果设定为 True 则发生错误的信息回与其他成功信息一起回传(如下示例,最终的 results 结果里面包含了 ValueError() 結果)

    # @File : demo3.py
    import asyncio
    import time
    
    now = lambda: time.time()
    
    async def dosomething(num):
        print('第 {} 任务,第一步'.format(num))
        await asyncio.sleep(2)
        print('第 {} 任务,第二步'.format(num))
        return '第 {} 任务完成'.format(num)
    
    async def raise_error(num):
        raise ValueError
        print('这里不会被执行')
    
    async def main():
        tasks = [dosomething(i) for i in range(5)]
        tasks1 = [raise_error(i) for i in range(5)]
    
        results = await asyncio.gather(*tasks, *tasks1, return_exceptions=True)
        print(results)
    
    
    if __name__ == "__main__":
    
    
        start = now()
        asyncio.run(main())
        print('TIME: ', now() - start)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    输出结果如下:

    0 任务,第一步
    第 1 任务,第一步
    第 2 任务,第一步
    第 3 任务,第一步
    第 4 任务,第一步
    第 0 任务,第二步
    第 1 任务,第二步
    第 2 任务,第二步
    第 3 任务,第二步
    第 4 任务,第二步
    ['第 0 任务完成', '第 1 任务完成', '第 2 任务完成', '第 3 任务完成', '第 4 任务完成', ValueError(), ValueError(), ValueError(), ValueError(), ValueError()]
    TIME:  2.001753091812134
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.5 并发请求示例

    从并发编程谈协程
    使用asyncio并发get请求
    How could I use requests in asyncio?

    参考:协程与任务

  • 相关阅读:
    Google Earth Engine(GEE)——建立一个图表(ui.Chart.array.values)chart散点图
    LeetCode:5. 最长回文子串
    黑客必备一款API泄露的利用工具
    全真模拟题!PMP提分必练
    《C++避坑神器·二十一》回调函数使用
    3.4向量范数与矩阵范数&3.5线性方程组的迭代解法
    基于SpringBoot和Vue2.0的轻量级博客开发
    【深入浅出 Yarn 架构与实现】5-3 Yarn 调度器资源抢占模型
    如何设计一个好的游戏剧情(Part 1:主题的设定)
    证书管理:从手工到平台化
  • 原文地址:https://blog.csdn.net/rhx_qiuzhi/article/details/127452923