• Python异步编程——asyncio、协程


    Python异步编程——asyncio、协程

    Python asyncio高性能异步编程

    • 异步非阻塞、asyncio
    • fastapi、django3.x asgi、aiohttp:通过异步提升性能

    具体内容

    • 协程
    • Python3.4内置模块:asyncio

    协程(coroutine)

    计算机提供了进程和线程,协程则是人为创造的,是用户态上下文切换的一种技术,也叫微线程;

    实现方法:

    • greenlet:第三方的模块 稍早的代码
    • yield关键字:保存状态 切换后 继续执行
    • asyncio装饰器(py3.4)
    • async/await关键字(py3.5)
    def func1():
        print(1)
        
        print(2)
        
    def func2():
        print(3)
        
        print(4)
    
    func1()
    func2()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1
    2
    3
    4
    
    • 1
    • 2
    • 3
    • 4

    greenlet

    pip install greenlet

    同一个线程在执行代码,并完成在不同函数之间的切换执行

    def func1():
        print(1) # 2:
        gr2.switch() # 3:切换到func2
        print(2) # 6:
        gr2.switch() # 7:切换到func2
        
    def func2():
        print(3) # 4:
        gr1.switch() # 5:切换到func1
        print(4) # 8:
    
    gr1 = greenlet(func1())
    gr2 = greenlet(func2())
    
    gr1.switch() # 1:执行func1
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    yield(了解)

    
    
    def func1():
        yield 1
        yield from func2()
        yield 2
        
    def func2():
        yield 3
        
        yield 4
    
    f1 = func1() # 生成器
    for item in f1:
        print(item)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    1
    3
    4
    2
    
    • 1
    • 2
    • 3
    • 4

    asyncio

    !python
    
    • 1
    Python 3.7.2 (default, Dec 29 2018, 00:00:04) 
    [Clang 4.0.1 (tags/RELEASE_401/final)] :: Anaconda, Inc. on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    >>> 
    KeyboardInterrupt
    >>> 
    >>> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    import asyncio
    
    @asyncio.coroutine
    def func1():
        print(1)
        yield from asyncio.sleep(2) # 遇到io耗时操作 自动切换到tasks中的其他任务
        print(2)
        
    @asyncio.coroutine
    def func2():
        print(3)
        yield from asyncio.sleep(2) # 遇到io耗时操作 自动切换到tasks中的其他任务
        print(4)
    
    tasks = [
        asyncio.ensure_future( func1() ),
        asyncio.ensure_future( func2() )
    ]
    
    loop = asyncio.get_event_loop()
    # 同一个线程在执行代码,并完成在不同函数之间的切换执行
    # 协程函数 必须以这样的方式执行
    loop.run_until_complete(asyncio.wait(tasks))
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    ---------------------------------------------------------------------------
    
    RuntimeError                              Traceback (most recent call last)
    
     in ()
         21 # 同一个线程在执行代码,并完成在不同函数之间的切换执行
         22 # 协程函数 必须以这样的方式执行
    ---> 23 loop.run_until_complete(asyncio.wait(tasks))
         24 
    
    
    ~/anaconda3/lib/python3.7/asyncio/base_events.py in run_until_complete(self, future)
        569         future.add_done_callback(_run_until_complete_cb)
        570         try:
    --> 571             self.run_forever()
        572         except:
        573             if new_task and future.done() and not future.cancelled():
    
    
    ~/anaconda3/lib/python3.7/asyncio/base_events.py in run_forever(self)
        524         self._check_closed()
        525         if self.is_running():
    --> 526             raise RuntimeError('This event loop is already running')
        527         if events._get_running_loop() is not None:
        528             raise RuntimeError(
    
    
    RuntimeError: This event loop is already running
    
    
    1
    3
    2
    4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    解决This event loop is already running问题

    !pip install nest_asyncio
    
    • 1
    Collecting nest_asyncio
      Downloading nest_asyncio-1.5.5-py3-none-any.whl (5.2 kB)
    Installing collected packages: nest-asyncio
    Successfully installed nest-asyncio-1.5.5
    [33mWARNING: You are using pip version 20.3.3; however, version 22.2.2 is available.
    You should consider upgrading via the '/Users/huaqiang/anaconda3/bin/python -m pip install --upgrade pip' command.[0m
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    async & await关键字(推荐)

    
    
    import asyncio
    
    async def func1():
        print(1)
        await asyncio.sleep(2) # 遇到io耗时操作 自动切换到tasks中的其他任务
        print(2)
        
    async def func2():
        print(3)
        await asyncio.sleep(2) # 遇到io耗时操作 自动切换到tasks中的其他任务
        print(4)
    
    tasks = [
        asyncio.ensure_future( func1() ),
        asyncio.ensure_future( func2() )
    ]
    
    loop = asyncio.get_event_loop()
    # 同一个线程在执行代码,并完成在不同函数之间的切换执行
    # 协程函数 必须以这样的方式执行
    loop.run_until_complete(asyncio.wait(tasks))
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    ---------------------------------------------------------------------------
    
    RuntimeError                              Traceback (most recent call last)
    
     in ()
         22 # 同一个线程在执行代码,并完成在不同函数之间的切换执行
         23 # 协程函数 必须以这样的方式执行
    ---> 24 loop.run_until_complete(asyncio.wait(tasks))
    
    
    ~/anaconda3/lib/python3.7/asyncio/base_events.py in run_until_complete(self, future)
        569         future.add_done_callback(_run_until_complete_cb)
        570         try:
    --> 571             self.run_forever()
        572         except:
        573             if new_task and future.done() and not future.cancelled():
    
    
    ~/anaconda3/lib/python3.7/asyncio/base_events.py in run_forever(self)
        524         self._check_closed()
        525         if self.is_running():
    --> 526             raise RuntimeError('This event loop is already running')
        527         if events._get_running_loop() is not None:
        528             raise RuntimeError(
    
    
    RuntimeError: This event loop is already running
    
    
    1
    3
    2
    4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    协程的意义:

    • 在一个线程中如果遇到IO等待时间,线程不会傻傻等,而是利用空闲的时间再去干点其他事情;

    Python3.4内置模块:asyncio

    图片下载示例

    # pip install requests
    
    import requests
    import re
    
    def download_image(url, file_name=None):
        response = requests.get(url)
        if file_name == None:        
            originstr = url.split('/')[-1]
            if '?' in url:
                bstr = r'(((?!\?).)*)\?' 
            else:
                bstr = r'(((?!\?).)*)$' 
            b = re.compile(bstr ,  re.DOTALL)
            str_list = b.findall(originstr)
            file_name = str_list[0][0]
        
        with open('./' + file_name, mode='wb') as file:
            file.write(response.content)
    
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    images = ['https://picx.zhimg.com/v2-2c501d26e3a3511ae71057008c4f3984_720w.jpg?source=7e7ef6e2',
    'https://pic1.zhimg.com/v2-e18dd0d67c5f6074b33628b456ced6a0_b.jpg',
    'https://pic3.zhimg.com/v2-4137087ac7fd7bb11c2ab15a438afbce_b.jpg'] 
    
    images
    
    • 1
    • 2
    • 3
    • 4
    • 5
    ['https://picx.zhimg.com/v2-2c501d26e3a3511ae71057008c4f3984_720w.jpg?source=7e7ef6e2',
     'https://pic1.zhimg.com/v2-e18dd0d67c5f6074b33628b456ced6a0_b.jpg',
     'https://pic3.zhimg.com/v2-4137087ac7fd7bb11c2ab15a438afbce_b.jpg']
    
    • 1
    • 2
    • 3
    if __name__ == '__main__':
        # 不使用协程
        for item in images:
            download_image(item)
        
        print('over')    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    over
    
    • 1

    使用协程的方式下载:

    • 这里使用 aiohttp,需要安装下 pip install aiohttp
    !pip install aiohttp
    
    • 1
    Collecting aiohttp
      Downloading aiohttp-3.8.1-cp37-cp37m-macosx_10_9_x86_64.whl (570 kB)
    [K     |████████████████████████████████| 570 kB 552 kB/s eta 0:00:01
    [?25hRequirement already satisfied: charset-normalizer<3.0,>=2.0 in /Users/huaqiang/anaconda3/lib/python3.7/site-packages (from aiohttp) (2.0.12)
    Requirement already satisfied: typing-extensions>=3.7.4 in /Users/huaqiang/anaconda3/lib/python3.7/site-packages (from aiohttp) (3.7.4.3)
    Requirement already satisfied: attrs>=17.3.0 in /Users/huaqiang/anaconda3/lib/python3.7/site-packages (from aiohttp) (18.2.0)
    Collecting asynctest==0.13.0
      Downloading asynctest-0.13.0-py3-none-any.whl (26 kB)
    Collecting aiosignal>=1.1.2
      Downloading aiosignal-1.2.0-py3-none-any.whl (8.2 kB)
    Collecting async-timeout<5.0,>=4.0.0a3
      Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
    Collecting frozenlist>=1.1.1
      Downloading frozenlist-1.3.1-cp37-cp37m-macosx_10_9_x86_64.whl (36 kB)
    Collecting multidict<7.0,>=4.5
      Downloading multidict-6.0.2-cp37-cp37m-macosx_10_9_x86_64.whl (28 kB)
    Collecting yarl<2.0,>=1.0
      Downloading yarl-1.8.1-cp37-cp37m-macosx_10_9_x86_64.whl (60 kB)
    [K     |████████████████████████████████| 60 kB 4.7 MB/s eta 0:00:01
    [?25hRequirement already satisfied: idna>=2.0 in /Users/huaqiang/anaconda3/lib/python3.7/site-packages (from yarl<2.0,>=1.0->aiohttp) (2.10)
    Installing collected packages: multidict, frozenlist, yarl, asynctest, async-timeout, aiosignal, aiohttp
    Successfully installed aiohttp-3.8.1 aiosignal-1.2.0 async-timeout-4.0.2 asynctest-0.13.0 frozenlist-1.3.1 multidict-6.0.2 yarl-1.8.1
    [33mWARNING: You are using pip version 20.3.3; however, version 22.2.2 is available.
    You should consider upgrading via the '/Users/huaqiang/anaconda3/bin/python -m pip install --upgrade pip' command.[0m
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    # pip install aiohttp
    import nest_asyncio
    nest_asyncio.apply() # PyCharm中 不需要这两句
    
    import aiohttp
    import asyncio
    
    async def download_image(session, url, file_name=None):
        print('send')
        async with session.get(url, verify_ssl=False) as response:
    #         await response.text()
            content = await response.content.read()
            
            if file_name == None:        
                originstr = url.split('/')[-1]
                if '?' in url:
                    bstr = r'(((?!\?).)*)\?' 
                else:
                    bstr = r'(((?!\?).)*)$' 
                b = re.compile(bstr ,  re.DOTALL)
                str_list = b.findall(originstr)
                file_name = str_list[0][0]
        
            with open('./' + file_name, mode='wb') as file:
                file.write(content)
                print('download')
    
    async def main():
        async with aiohttp.ClientSession() as session:
            images = ['https://picx.zhimg.com/v2-2c501d26e3a3511ae71057008c4f3984_720w.jpg?source=7e7ef6e2',
                'https://pic1.zhimg.com/v2-e18dd0d67c5f6074b33628b456ced6a0_b.jpg',
                'https://pic3.zhimg.com/v2-4137087ac7fd7bb11c2ab15a438afbce_b.jpg'] 
    
            tasks = [asyncio.create_task(download_image(session, url)) for url in images]
            await asyncio.wait(tasks)
            
            
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    if __name__ == '__main__':
        # 使用协程:在遇到io时 就会切换继续发请求 所以 三个请求几乎是一起发的 然后才开始下载(异步)
        asyncio.run(main())
        print('over')
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    send
    send
    send
    download
    download
    download
    over
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    异步编程

    事件循环

    循环检测所有可执行、已完成任务;

    import asyncio
    
    # 获取一个事件循环
    loop = asyncio.get_event_loop()
    # 将任务放到循环中
    loop.run_until_complete(任务)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 协程函数:使用 async def 声明的函数
    • 协程对象:执行 协程函数 所返回的对象
    async def func1():
        print('func1')
    
    result = func1() # 得到协程对象(即任务)时 协程函数中的代码并不会运行
    
    # 执行方式1
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(result) # 事件循环会帮助执行 具体的任务代码
    
    # 执行方式2(py3.7)
    asyncio.run(result)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    func1
    
    • 1

    await

    await + 可等待对象:

    • 协程对象
    • Future
    • Task对象

    以协程函数为例:

    • await修饰的等待对象 会阻塞当前线程;
    • 并在其进入阻塞期间,由事件循环在下一个循环将线程资源提供给其他任务(等待对象);
    • 直到事件循环再次检测等待对象是否已经完成执行,则继续执行await后的代码;

    Task对象

    • 可以在事件循环中,并发的调度多个协程;
    • 通过 asyncio.create_task(协程对象)的方式 创建Task对象;(py3.7)
    • 也可以用更低级的loop.create_task()ensure_future()函数创建;
    # 示例
    import asyncio
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return '返回值'
    
    async def main():
        print("main 开始")
        
        task1 = asyncio.create_task(func()) # 创建Task对象,将协程任务加入事件循环
        task2 = asyncio.create_task(func())
        
        print('main 结束')
        
        ret1 = await task1
        ret2 = await task2
        print(ret1, ret2)
        
    asyncio.run( main() )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    main 开始
    main 结束
    1
    1
    2
    2
    返回值 返回值
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 示例(更常用的方式)
    import asyncio
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return '返回值'
    
    async def main():
        print("main 开始")
        
        task1 = asyncio.create_task(func()) # 创建Task对象,将协程任务加入事件循环
        task2 = asyncio.create_task(func())
        
        task_list = [task1,
                     task2]
        
        print('main 结束')
        
        done, pending = await asyncio.wait(task_list)
    #     done:集合类型,完成的结果
    #     pending:集合类型,是没完成的结果
    #     done, pending = await asyncio.wait(task_list, timeout=None)
        print(done, pending)
        
    asyncio.run( main() )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    main 开始
    main 结束
    1
    1
    2
    2
    {:4> result='返回值'>, :4> result='返回值'>} set()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # 示例:特别的
    import nest_asyncio
    nest_asyncio.apply() # PyCharm中 不需要这两句
    
    import asyncio
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return '返回值'
    
    
    task_list = [func(),func()]
    
    # asyncio.wait 会自动完成 从 协程对象 到 Task对象的包装
    done, pending = asyncio.run(asyncio.wait(task_list))
    print(done, pending)
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    1
    1
    2
    2
    {:7> result='返回值'>, :7> result='返回值'>} set()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Future对象:(asyncio的future对象)

    • Task类的基类
    • 支持hold on 直到等待结果拿到
    
    import asyncio
    
    async def set_after(fut):
        await asyncio.sleep(2)
        fut.set_result("ok")
    
    async def main():
        loop = asyncio.get_running_loop()
        fut = loop.create_future() 
        await loop.create_task( set_after(fut) )
        
        # 由于这里创建的future对象 没有具体的任务 也不会终止 因此会一直处于阻塞状态
        data = await(fut)
        # 但一旦 fut被赋值了 具体的任务 那么就会得到最终的结果
        print(data)
    
    asyncio.run(main())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    ok
    
    • 1

    Future对象:(concurrent.futures.Future对象)

    • 使用线程池或进程池实现异步操作时,所要用到的对象;
    import time
    from concurrent.futures import Future
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import ProcessPoolExecutor
    
    
    def func(value):
        time.sleep(1)
        print(value)
        
    pool = ThreadPoolExecutor(max_workers=5)    
    
    for i in range(10):
        fut = pool.submit(func, 1)
        print(fut)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    
    
    
    
    
    
    
    
    
    
    1
    1111
    
    
    
    1
    1111
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    实际的使用场景可能会存在:两种Future对象交叉使用的场景

    • 比如链接MySQL 就不支持协程,那就会用到 进程、线程来做异步编程;
    # 示例
    import time
    import asyncio
    import concurrent.futures
    
    def func1():
        # 某个耗时操作
        time.sleep(2)
        return 'func1'
    
    async def main():
        loop = asyncio.get_running_loop()
        
        # 使用场景1:run_in_executor
        # 调用内部 ThreadPoolExecutor 去线程池中申请一个线程执行 func1 返回 concurrent.futures.Future对象
        # 然后 会调用 asyncio.wrap_future 将这个对象包装为 asyncio.Future对象,以支持wait语法
        fut = loop.run_in_executor(None, func1)
        result = await fut
        print(result)
        
        # 使用场景2:线程池
        with concurrent.futures.ThreadPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, func1)
            print(result)
        
        # 使用场景3:进程池
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, func1)
            print(result)
    
    asyncio.run(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    func1
    func1
    func1
    
    • 1
    • 2
    • 3

    使用asyncio + request(不支持asyncio的模块)进行图片下载

    import asyncio
    import requests
        
    async def download_image(url, file_name=None):
        print('开始下载')
        
        loop = asyncio.get_event_loop()
        # requests不支持异步 需要使用线程池来配合
        fut = loop.run_in_executor(None, requests.get, url)
        response = await fut
    
        if file_name == None:        
            originstr = url.split('/')[-1]
            if '?' in url:
                bstr = r'(((?!\?).)*)\?' 
            else:
                bstr = r'(((?!\?).)*)$' 
            b = re.compile(bstr ,  re.DOTALL)
            str_list = b.findall(originstr)
            file_name = str_list[0][0]
    
        with open('./' + file_name, mode='wb') as file:
            file.write(response.content)
            print('下载完成')
    
    images = ['https://picx.zhimg.com/v2-2c501d26e3a3511ae71057008c4f3984_720w.jpg?source=7e7ef6e2',
        'https://pic1.zhimg.com/v2-e18dd0d67c5f6074b33628b456ced6a0_b.jpg',
        'https://pic3.zhimg.com/v2-4137087ac7fd7bb11c2ab15a438afbce_b.jpg'] 
    
    tasks = [asyncio.create_task(download_image(url)) for url in images]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    # 注意:这段代码需要耗费更多的资源,因为实际是起了三个线程        
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    开始下载
    开始下载
    开始下载
    
    
    
    
    
    ({:4> exception=NameError("name 're' is not defined")>,
      :4> exception=NameError("name 're' is not defined")>,
      :4> exception=NameError("name 're' is not defined")>},
     set())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    异步迭代器

    迭代器:实现 __iter____next__方法的对象;

    异步迭代器:实现了__aiter____anext__方法的对象;

    • 其中__anext__必须返回一个awaitable对象;
    • async_for会处理异步迭代器的__anext__方法所返回的可等待对象(直到其引发一个StopAsyncIteration异常);

    异步可迭代对象:

    • 可在async_for 语句中被使用的,通过其__aiter__方法返回的一个asynchronous iterator;
    import asyncio
    
    class Reader(object):
        """
            自定义的异步可迭代器
        """
        def __init__(self):
            self.count = 0
            
        async def readline(self):
            self.count += 1
            if self.count == 100:
                return None
            return self.count
        
        def __aiter__(self):
            return self
        
        async def __anext__(self):
            val = await self.readline()
            if val == None:
                raise StopAsyncIteration
            return val
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    
    async def func_t():
        obj_t = Reader()
        # async for 只能在协程函数中执行
        async for item in obj_t:
            print(item)
            
    asyncio.run(func_t())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1
    2
    3
    4
    5
    ...
    96
    97
    98
    99
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    异步上下文管理器

    上下文管理器:通过__enter____exit__方法来对with语句中的环境进行控制;

    异步上下文管理器:通过__aenter____aexit__方法来对async_with语句中的环境进行控制;

    import asyncio
    
    class AsyncContextManager:
        def __init__(self):
            self.conn = None
        
        async def do_something(self):
            # 操作
            return 666
        
        async def __aenter__(self):
            # 连接
            self.conn = await asyncio.sleep(1)
            return self
        
        async def __aexit__(self, exc_type, exc, tb):
            # 关闭
            await asyncio.sleep(1)
                   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    
    async def func_t():
        # async with 只能在协程函数中执行
        async with AsyncContextManager() as obj_t:
            result = await obj_t.do_something()
            print(result)
            
    asyncio.run(func_t())
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    666
    
    • 1

    uvloop

    是asyncio的事件循环的替代方案;效率更高;

    安装:

    • pip install uvloop

    使用:

    import asyncio
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    # 其他实现代码照旧
    
    # 此时事件循环会由uvloop接替
    asyncio.run(...)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    其他:

    • asgi(异步的服务网关接口) uvicon(fastapi中有使用)的效率很高,其内部也是通过uvloop提高的效率;

    事例

    异步链接MySQL

    pip intall aiomysql

    import asyncio
    import aiomysql
    
    async def execute():
        conn = await aiomysql.connect(host = 'ip', port='port', user='root', password='', db='mysql')
        
        cur = await conn.cursor()
        await cur.execute("SELECT Host,User FROM user")
        
        result = await cur.fetchall()
        
        await cur.close()
        conn.close()
        
    asyncio.run(execute())
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    异步链接Redis

    import asyncio
    import aioredis
    
    async def execute(address, password):
        redis = await aioredis.create_redis_pool(address, password=password)
        
        await = redis.hmset_dict("car", key1=1, key2=2, key3=3)
        
        result = await redis.hgetall('car', encoding="utf-8")
        
        redis.close()
        
        await redis.wait_closed()
        
    task_list = [
        execute('redis://...', 'password'),
        execute('redis://...', 'password')
    ]    
    
    asyncio.run(asyncio.wait(task_list))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    FastAPI框架异步

    pip install fastapi框架

    pip install uviorn Asgi

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import asyncio
    import uvicorn
    
    from fastapi import FastAPI
    
    app = FastAPI()
    
    # 同步支持并发请求
    @app.get("/")
    def index():
        return "hello world"
    
    if __name__ == "__main__":
        uvicorn.run("main:app", host="127.0.01", port=8080, log_level="info")
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    # 异步请求
    @app.get("/")
    async def index():
        # 异步支持并发请求
            
        # 举例:从连接池获取一个链接
        conn = await REDIS_POOL.acquire()
        redis = Redis(conn)
        
        await redis.hmset_dict("car", key1=1, key2=2, key3=3)
        await redis.hgetall('car', encoding='utf-8')
        
        # 归还连接池
        REDIS_POOL.release(conn)
        
        return result
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    小结:通过一个线程 利用IO等待时间去做一些其他的事情。

    
    
    • 1
  • 相关阅读:
    C#毕业设计——基于C#+asp.net+sqlserver二手交易系统设计与实现(毕业论文+程序源码)——二手交易系统
    技术分享 | App测试时常用的adb命令你都掌握了哪些呢?
    x264、x265、OpenH264 简要对比
    ROS2中Executors对比和优化
    加密磁盘密钥设置方案浅析 — TKS1
    剑指 Offer 45. 把数组排成最小的数
    SQL获取IP电脑名
    2003—2004 学年第二学期 重修考试试题
    Docker Compose及Docker 知识点整理
    typeScript--[接口属性interface]
  • 原文地址:https://blog.csdn.net/baby_hua/article/details/126934706