• 流畅的Python(十六)-协程


    一、核心要义

    1. 生成器作为协程使用时的行为状态

    2.使用装饰器自动预激协程

    3.调用方使用close和throw方法控制协程

    4.协程终止时如何返回值

    5.yield from新句法的用途和语义

    6.案例:出租车队运营仿真

    二、代码示例

    1、协程入门案例

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. # @Time : 2024/3/2 9:07
    4. # @Author : Maple
    5. # @File : 01-协程入门案例.py
    6. # @Software: PyCharm
    7. import inspect
    8. """
    9. 协程有4种状态
    10. 1.'GEN_CREATED': 等待开始执行
    11. 2.'GEN_RUNNINF': 解释器正在执行
    12. 3.'GEN_SUSPENDED': 在yield表达式处暂停
    13. 4.'GEN_CLOSED': 执行结束
    14. 注意:只有当协程处于状态3的时候,才可以send值.而刚创建的协程,没有预激活(即没有执行next方法)
    15. ,此时处于状态1,如果给协程send值,会报错
    16. """
    17. def simple_coro(a):
    18. print('-->start: a = ',a)
    19. b = yield a
    20. print('-->Received:b = ',b)
    21. c = yield a + b
    22. print('-->Received:c = ', c)
    23. if __name__ == '__main__':
    24. # 1.实例化生成器函数
    25. s = simple_coro(2)
    26. ## 查看此时协程的状态: 创建状态
    27. print(inspect.getgeneratorstate(s)) # GEN_CREATED
    28. # 2.调用next,预激活协程,此时协程会停止于第一个yield a处(等待外部调用函数发送send,往下推进执行),并且将a值返回给外部
    29. a = next(s)
    30. print(a)
    31. ## 查看此时协程的状态: 阻塞状态(等待外部通过send发送值)
    32. print(inspect.getgeneratorstate(s)) # GEN_SUSPENDED
    33. # 3.通过send给s发送值,重新启动协程,并且外部的值会赋给b,最终再次阻塞在下一个yield处(yield a + b)
    34. #并将外部 a + b的值返回给调用方【此时协程又会处于阻塞状态,等待外部发送send值】
    35. c = s.send(10)
    36. print(c) # 12(备注:2 + 10)
    37. ## ## 查看此时协程的状态: 阻塞状态(等待外部通过send发送值)
    38. print(inspect.getgeneratorstate(s)) # GEN_SUSPENDED
    39. # 4.再次通过send给s发送值,重新启动协程,并且外部的值会赋值给c,输出print(-->Received:c = xx)
    40. # 由于协程后面没有yield了,所以最后协程终止,导致[生成器]抛出StopIteration异常
    41. """
    42. Traceback (most recent call last):
    43. File "D:/01-study/python/fluent_python/16-协程/01-协程入门案例.py", line 39, in
    44. s.send(100)
    45. StopIteration
    46. """
    47. s.send(100)

    2、使用协程计算平均值

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. # @Time : 2024/3/2 9:28
    4. # @Author : Maple
    5. # @File : 02-使用协程计算平均值.py
    6. # @Software: PyCharm
    7. def avg():
    8. sum = 0
    9. count = 0
    10. average = None
    11. while True:
    12. a = yield average
    13. sum += a
    14. count +=1
    15. average = sum/count
    16. if __name__ == '__main__':
    17. a = avg()
    18. # 预激活协程
    19. next(a)
    20. # 计算初始平均值
    21. avg1 = a.send(10)
    22. print(avg1) # 10.0
    23. # 发送新数据,计算前两个数据的平均值
    24. avg2 = a.send(20)
    25. print(avg2) # 15.0
    26. # 再次发送新数据,计算前三个数据的平均值
    27. avg3 = a.send(30)
    28. print(avg3) # 20.0

    3、预激协程的装饰器

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. # @Time : 2024/3/2 9:36
    4. # @Author : Maple
    5. # @File : 03-预激协程的装饰器.py
    6. # @Software: PyCharm
    7. from inspect import getgeneratorstate
    8. def coroutine(func):
    9. """装饰器:让被装饰的协程向前执行一步,阻带到第一个yield处"""
    10. def primer(*args,**kwargs):
    11. gen = func(*args,**kwargs)
    12. next(gen)
    13. return gen
    14. return primer
    15. # 使用装饰器,装饰协程
    16. @coroutine
    17. def avg():
    18. sum = 0
    19. count = 0
    20. average = None
    21. while True:
    22. a = yield average
    23. sum += a
    24. count +=1
    25. average = sum/count
    26. if __name__ == '__main__':
    27. a = avg()
    28. # 1.初始状态的协程就已经处于阻塞状态了(而不是刚创建)
    29. print(getgeneratorstate(a)) # GEN_SUSPENDED,即无需再通过next去预激活协程
    30. # 计算初始平均值
    31. avg1 = a.send(10)
    32. print(avg1) # 10.0
    33. # 发送新数据,计算前两个数据的平均值
    34. avg2 = a.send(20)
    35. print(avg2) # 15.0
    36. # 再次发送新数据,计算前三个数据的平均值
    37. avg3 = a.send(30)
    38. print(avg3) # 20.0

    4、终止协程和异常处理

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. # @Time : 2024/3/2 9:54
    4. # @Author : Maple
    5. # @File : 04-终止协程和异常处理.py
    6. # @Software: PyCharm
    7. # 自定义异常
    8. class DemoException(Exception):
    9. """演示用异常"""
    10. def demo_exc_handing():
    11. print('-->coroutine started')
    12. try:
    13. while True:
    14. try:
    15. x = yield
    16. except DemoException:
    17. print('**DemoException handling,Continuning****')
    18. else:
    19. print('-->coroutine received: {!r}'.format(x))
    20. raise RuntimeError('This line should never run')
    21. # 该条语句永远也不会执行:如果while循环中遇到DemoException,协程处理后,会继续运行
    22. # 如果遇到其它异常,生成器无法运行,则协程立刻终止(此时RuntimeError仍然没有机会执行)
    23. finally:
    24. """如果不管协程是否结束,都有一些清理工作需要协程处理,可以使用finally"""
    25. print('-->coroutine ending')
    26. if __name__ == '__main__':
    27. # 1.协程测试1
    28. print('*****1.协程测试1***********')
    29. d = demo_exc_handing()
    30. # 1-1.预激活协程
    31. next(d) # ->coroutine started
    32. # 1-2.给协程发送数据
    33. d.send(10) # -->coroutine received: 10
    34. d.send(20) # -->coroutine received: 20
    35. # 1-3.通过throw给协程发送异常
    36. # 异常被处理,协程可以继续运行
    37. d.throw(DemoException) # **DemoException handling,Continuning****
    38. d.send(30) # -->coroutine received: 30
    39. # 1-4.通过close方法,关闭协程
    40. d.close()
    41. from inspect import getgeneratorstate
    42. print(getgeneratorstate(d)) # GEN_CLOSED
    43. # 2.协程测试2
    44. print('*****2.协程测试2***********')
    45. d2 = demo_exc_handing()
    46. next(d2) #-->coroutine started
    47. d2.send(10) # -->coroutine received: 10
    48. # 通过throw给协程发送一个其无法处理的异常,协程会停止,状态变成'GEN_CLOSED'
    49. d2.throw(ZeroDivisionError)
    50. print(getgeneratorstate(d2)) # GEN_CLOSED

    5、协程返回值

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. # @Time : 2024/3/2 10:22
    4. # @Author : Maple
    5. # @File : 05-协程返回值.py
    6. # @Software: PyCharm
    7. from collections import namedtuple
    8. Result = namedtuple('Result','count average')
    9. """
    10. 接收协程返回值
    11. """
    12. def avg():
    13. sum = 0
    14. count = 0
    15. average = None
    16. while True:
    17. a = yield
    18. # 由于协程必须正常终止,外部才能接收到返回值,所以要提供一种机制,保证协程可以正常终止
    19. if a is None:
    20. break
    21. sum += a
    22. count +=1
    23. average = sum/count
    24. # 协程正常终止,抛出StopIteration异常,此返回值会绑定在异常的一个属性中
    25. return Result(count,average)
    26. if __name__ == '__main__':
    27. a = avg()
    28. next(a)
    29. a.send(10)
    30. a.send(20)
    31. a.send(30)
    32. try:
    33. a.send(None)
    34. except StopIteration as exc:
    35. # 协程返回值绑定在异常中的属性中
    36. result = exc.value
    37. print(result) # Result(count=3, average=20.0)

    6、委派生成器

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. # @Time : 2024/3/2 10:31
    4. # @Author : Maple
    5. # @File : 06-委派生成器.py
    6. # @Software: PyCharm
    7. from collections import namedtuple
    8. Result = namedtuple('Result','count average')
    9. # 子生成器
    10. def avg():
    11. sum = 0
    12. count = 0
    13. average = None
    14. while True:
    15. a = yield
    16. # 由于协程必须正常终止,外部才能接收到返回值,所以要提供一种机制,保证协程可以正常终止
    17. if a is None:
    18. break
    19. sum += a
    20. count +=1
    21. average = sum/count
    22. # 协程正常终止,抛出StopIteration异常,此返回值会绑定在异常的一个属性中
    23. return Result(count,average)
    24. # 委派生成器
    25. def grouper(results,key):
    26. while True:
    27. # results:接收子生成器的返回值
    28. results[key] = yield from avg()
    29. # 调用方
    30. def main(data):
    31. result = {}
    32. for key,value in data.items():
    33. # 每次遍历都生成一个新的委派生成器
    34. group = grouper(result,key)
    35. # 预激活委派生成器
    36. next(group)
    37. # 利用委派生成器,给子生成器avg发送数据,
    38. for data in value:
    39. group.send(data)
    40. # 给子生成器发送一个None,终止子生成器(抛出StopIteration异常,yield from能够处理该异常)
    41. # 最终result会接收到 子生成器返回的值(注意:子生成器产出的值 是直接反馈给调用方,而不是委派生成器)
    42. group.send(None)
    43. report(result)
    44. def report(results):
    45. for key,result in sorted(results.items()):
    46. group,unit = key.split(';')
    47. print('{:2} {:5} averaging {:.2f}{}'.format(result.count,group,result.average,unit))
    48. if __name__ == '__main__':
    49. data = {
    50. 'girls;kg' : [40.5,50.2,51.2,40.8],
    51. 'girls;m': [1.53, 1.61, 1.46, 1.8],
    52. 'boys;kg': [50.5, 62.4, 71.4, 60.4],
    53. 'boys;m': [1.63, 1.71, 1.76, 1.8]
    54. }
    55. """
    56. 4 boys averaging 61.18kg
    57. 4 boys averaging 1.72m
    58. 4 girls averaging 45.67kg
    59. 4 girls averaging 1.60m
    60. """
    61. main(data)

    7、出租车运营仿真案例

    1. #!/usr/bin/env python
    2. # -*- coding: utf-8 -*-
    3. # @Time : 2024/3/2 12:58
    4. # @Author : Maple
    5. # @File : 07-出租车运营仿真案例.py
    6. # @Software: PyCharm
    7. import queue
    8. import random
    9. from collections import namedtuple
    10. Event = namedtuple('Event','time ident action')
    11. DEFAULT_NUMBER_OF_TAXIS = 3
    12. DEFAULT_END_TIME = 180
    13. SEARCH_DURATION = 5
    14. TRIP_DURATION = 20
    15. DEPARTURE_INTERVAL = 5
    16. def taxi_process(ident,trips,start_time =0):
    17. time = yield Event(start_time,ident,'leave garage')
    18. for i in range(trips):
    19. time = yield Event(time,ident,'pick up passenger')
    20. time = yield Event(time,ident,'drop off passenger')
    21. yield Event(time,ident,'go home')
    22. class Simulator:
    23. def __init__(self,procs_map):
    24. self.events = queue.PriorityQueue()
    25. self.procs = dict(procs_map)
    26. def run(self,end_time):
    27. for _,proc in self.procs.items():
    28. # 放入每位司机的初始事件(从停车场出发)
    29. self.events.put(next(proc))
    30. # 仿真的主循环
    31. sim_time = 0
    32. while sim_time < end_time:
    33. if self.events.empty():
    34. print('end simulate')
    35. break
    36. # 取出start_time最早的事件
    37. current_event = self.events.get()
    38. sim_time, ident, previous_action = current_event
    39. # 输出当前事件
    40. print('taxi:',ident,ident *' ',current_event)
    41. # 当前事件,是归属于哪一位司机
    42. active_proc = self.procs[ident]
    43. # 获取该司机下一个事件,并传递下一个事件开始时间
    44. next_time = sim_time + Caculate_nexttime(previous_action)
    45. try:
    46. next_event = active_proc.send(next_time)
    47. except StopIteration:
    48. # 如果某位司机的事件已经遍历完毕,就删掉其对应记录
    49. del self.procs[ident]
    50. else:
    51. # 将下一次事件放入事件库
    52. self.events.put(next_event)
    53. else:
    54. msg = '** end stimulation time: {} events pending ***'
    55. msg.format(self.events.qsize())
    56. def Caculate_nexttime(previous_action):
    57. if previous_action in ['leave garage','drop off passenger']:
    58. # 新状态是四处徘徊
    59. interval = SEARCH_DURATION
    60. elif previous_action == 'pick up passenger':
    61. # 新状态是行程开始
    62. interval = TRIP_DURATION
    63. elif previous_action == 'go home':
    64. interval = 1
    65. else:
    66. raise ValueError('Unknown previous_action:%s' % previous_action)
    67. return int(random.expovariate(1/interval)) + 1
    68. def main(end_time = DEFAULT_END_TIME,num_taxis = DEFAULT_NUMBER_OF_TAXIS,seed = None):
    69. if seed is not None:
    70. random.seed(seed)
    71. # 默认创建的司机事件集合
    72. """
    73. proc_map = {
    74. '0':taxi_process(0,2,0),
    75. '1':taxi_process(1,4,5),
    76. '2':taxi_process(2,6,10)
    77. }
    78. """
    79. proc_map = {i:taxi_process(i,(i+1) *2 ,i * DEPARTURE_INTERVAL ) for i in range(num_taxis)}
    80. sim = Simulator(proc_map)
    81. sim.run(end_time)
    82. if __name__ == '__main__':
    83. main(seed=3)

  • 相关阅读:
    认识时间复杂度和简单排序算法
    华玉通软发布“云雀”执行管理中间件,为智能驾驶关键应用提供确定性执行保障
    Java EE——线程(3)
    编写自己的 WordPress 模板
    word2vec的算法原理(不用开源包,python实现)
    基于 JSch 实现服务的自定义监控解决方案
    【vue2】前端如何播放rtsp 视频流,拿到rtsp视频流地址如何处理,海康视频rtsp h264 如何播放
    PostgreSQL 16 关于vacuum freezing 性能提升的来由 与 之前PostgreSQL 的缺陷讨论
    Chapter 12 贝叶斯网络
    vue目录树的封装
  • 原文地址:https://blog.csdn.net/weixin_37901366/article/details/136415050