1. 生成器作为协程使用时的行为状态
2.使用装饰器自动预激协程
3.调用方使用close和throw方法控制协程
4.协程终止时如何返回值
5.yield from新句法的用途和语义
6.案例:出租车队运营仿真
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/3/2 9:07
- # @Author : Maple
- # @File : 01-协程入门案例.py
- # @Software: PyCharm
- import inspect
-
- """
- 协程有4种状态
- 1.'GEN_CREATED': 等待开始执行
- 2.'GEN_RUNNINF': 解释器正在执行
- 3.'GEN_SUSPENDED': 在yield表达式处暂停
- 4.'GEN_CLOSED': 执行结束
- 注意:只有当协程处于状态3的时候,才可以send值.而刚创建的协程,没有预激活(即没有执行next方法)
- ,此时处于状态1,如果给协程send值,会报错
- """
-
-
- def simple_coro(a):
- print('-->start: a = ',a)
- b = yield a
- print('-->Received:b = ',b)
- c = yield a + b
- print('-->Received:c = ', c)
-
- if __name__ == '__main__':
-
- # 1.实例化生成器函数
- s = simple_coro(2)
- ## 查看此时协程的状态: 创建状态
- print(inspect.getgeneratorstate(s)) # GEN_CREATED
-
- # 2.调用next,预激活协程,此时协程会停止于第一个yield a处(等待外部调用函数发送send,往下推进执行),并且将a值返回给外部
- a = next(s)
- print(a)
- ## 查看此时协程的状态: 阻塞状态(等待外部通过send发送值)
- print(inspect.getgeneratorstate(s)) # GEN_SUSPENDED
-
- # 3.通过send给s发送值,重新启动协程,并且外部的值会赋给b,最终再次阻塞在下一个yield处(yield a + b)
- #并将外部 a + b的值返回给调用方【此时协程又会处于阻塞状态,等待外部发送send值】
- c = s.send(10)
- print(c) # 12(备注:2 + 10)
- ## ## 查看此时协程的状态: 阻塞状态(等待外部通过send发送值)
- print(inspect.getgeneratorstate(s)) # GEN_SUSPENDED
-
- # 4.再次通过send给s发送值,重新启动协程,并且外部的值会赋值给c,输出print(-->Received:c = xx)
- # 由于协程后面没有yield了,所以最后协程终止,导致[生成器]抛出StopIteration异常
- """
- Traceback (most recent call last):
- File "D:/01-study/python/fluent_python/16-协程/01-协程入门案例.py", line 39, in
- s.send(100)
- StopIteration
- """
- s.send(100)
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/3/2 9:28
- # @Author : Maple
- # @File : 02-使用协程计算平均值.py
- # @Software: PyCharm
-
-
- def avg():
-
- sum = 0
- count = 0
- average = None
- while True:
- a = yield average
- sum += a
- count +=1
- average = sum/count
-
- if __name__ == '__main__':
-
- a = avg()
- # 预激活协程
- next(a)
- # 计算初始平均值
- avg1 = a.send(10)
- print(avg1) # 10.0
-
- # 发送新数据,计算前两个数据的平均值
- avg2 = a.send(20)
- print(avg2) # 15.0
-
- # 再次发送新数据,计算前三个数据的平均值
- avg3 = a.send(30)
- print(avg3) # 20.0
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/3/2 9:36
- # @Author : Maple
- # @File : 03-预激协程的装饰器.py
- # @Software: PyCharm
-
-
- from inspect import getgeneratorstate
-
- def coroutine(func):
- """装饰器:让被装饰的协程向前执行一步,阻带到第一个yield处"""
- def primer(*args,**kwargs):
- gen = func(*args,**kwargs)
- next(gen)
- return gen
- return primer
-
- # 使用装饰器,装饰协程
- @coroutine
- def avg():
-
- sum = 0
- count = 0
- average = None
- while True:
- a = yield average
- sum += a
- count +=1
- average = sum/count
-
-
- if __name__ == '__main__':
-
- a = avg()
- # 1.初始状态的协程就已经处于阻塞状态了(而不是刚创建)
- print(getgeneratorstate(a)) # GEN_SUSPENDED,即无需再通过next去预激活协程
-
- # 计算初始平均值
- avg1 = a.send(10)
- print(avg1) # 10.0
-
- # 发送新数据,计算前两个数据的平均值
- avg2 = a.send(20)
- print(avg2) # 15.0
-
- # 再次发送新数据,计算前三个数据的平均值
- avg3 = a.send(30)
- print(avg3) # 20.0
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/3/2 9:54
- # @Author : Maple
- # @File : 04-终止协程和异常处理.py
- # @Software: PyCharm
-
-
- # 自定义异常
- class DemoException(Exception):
- """演示用异常"""
-
-
- def demo_exc_handing():
- print('-->coroutine started')
- try:
- while True:
- try:
- x = yield
- except DemoException:
- print('**DemoException handling,Continuning****')
- else:
- print('-->coroutine received: {!r}'.format(x))
- raise RuntimeError('This line should never run')
-
- # 该条语句永远也不会执行:如果while循环中遇到DemoException,协程处理后,会继续运行
- # 如果遇到其它异常,生成器无法运行,则协程立刻终止(此时RuntimeError仍然没有机会执行)
- finally:
- """如果不管协程是否结束,都有一些清理工作需要协程处理,可以使用finally"""
- print('-->coroutine ending')
-
-
- if __name__ == '__main__':
-
- # 1.协程测试1
- print('*****1.协程测试1***********')
- d = demo_exc_handing()
- # 1-1.预激活协程
- next(d) # ->coroutine started
-
- # 1-2.给协程发送数据
- d.send(10) # -->coroutine received: 10
- d.send(20) # -->coroutine received: 20
-
- # 1-3.通过throw给协程发送异常
- # 异常被处理,协程可以继续运行
- d.throw(DemoException) # **DemoException handling,Continuning****
- d.send(30) # -->coroutine received: 30
-
- # 1-4.通过close方法,关闭协程
- d.close()
- from inspect import getgeneratorstate
- print(getgeneratorstate(d)) # GEN_CLOSED
-
- # 2.协程测试2
- print('*****2.协程测试2***********')
- d2 = demo_exc_handing()
-
- next(d2) #-->coroutine started
- d2.send(10) # -->coroutine received: 10
- # 通过throw给协程发送一个其无法处理的异常,协程会停止,状态变成'GEN_CLOSED'
- d2.throw(ZeroDivisionError)
- print(getgeneratorstate(d2)) # GEN_CLOSED
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/3/2 10:22
- # @Author : Maple
- # @File : 05-协程返回值.py
- # @Software: PyCharm
- from collections import namedtuple
-
- Result = namedtuple('Result','count average')
-
- """
- 接收协程返回值
- """
-
- def avg():
- sum = 0
- count = 0
- average = None
- while True:
- a = yield
- # 由于协程必须正常终止,外部才能接收到返回值,所以要提供一种机制,保证协程可以正常终止
- if a is None:
- break
- sum += a
- count +=1
- average = sum/count
-
- # 协程正常终止,抛出StopIteration异常,此返回值会绑定在异常的一个属性中
- return Result(count,average)
-
-
- if __name__ == '__main__':
-
- a = avg()
- next(a)
- a.send(10)
- a.send(20)
- a.send(30)
-
- try:
- a.send(None)
- except StopIteration as exc:
- # 协程返回值绑定在异常中的属性中
- result = exc.value
-
- print(result) # Result(count=3, average=20.0)
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/3/2 10:31
- # @Author : Maple
- # @File : 06-委派生成器.py
- # @Software: PyCharm
- from collections import namedtuple
-
- Result = namedtuple('Result','count average')
-
-
- # 子生成器
- def avg():
- sum = 0
- count = 0
- average = None
- while True:
- a = yield
- # 由于协程必须正常终止,外部才能接收到返回值,所以要提供一种机制,保证协程可以正常终止
- if a is None:
- break
- sum += a
- count +=1
- average = sum/count
-
- # 协程正常终止,抛出StopIteration异常,此返回值会绑定在异常的一个属性中
- return Result(count,average)
-
-
- # 委派生成器
- def grouper(results,key):
- while True:
- # results:接收子生成器的返回值
- results[key] = yield from avg()
-
- # 调用方
- def main(data):
- result = {}
- for key,value in data.items():
- # 每次遍历都生成一个新的委派生成器
- group = grouper(result,key)
- # 预激活委派生成器
- next(group)
-
- # 利用委派生成器,给子生成器avg发送数据,
- for data in value:
- group.send(data)
- # 给子生成器发送一个None,终止子生成器(抛出StopIteration异常,yield from能够处理该异常)
- # 最终result会接收到 子生成器返回的值(注意:子生成器产出的值 是直接反馈给调用方,而不是委派生成器)
- group.send(None)
- report(result)
-
- def report(results):
-
- for key,result in sorted(results.items()):
- group,unit = key.split(';')
- print('{:2} {:5} averaging {:.2f}{}'.format(result.count,group,result.average,unit))
-
- if __name__ == '__main__':
-
- data = {
- 'girls;kg' : [40.5,50.2,51.2,40.8],
- 'girls;m': [1.53, 1.61, 1.46, 1.8],
- 'boys;kg': [50.5, 62.4, 71.4, 60.4],
- 'boys;m': [1.63, 1.71, 1.76, 1.8]
- }
-
- """
- 4 boys averaging 61.18kg
- 4 boys averaging 1.72m
- 4 girls averaging 45.67kg
- 4 girls averaging 1.60m
- """
- main(data)
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 2024/3/2 12:58
- # @Author : Maple
- # @File : 07-出租车运营仿真案例.py
- # @Software: PyCharm
- import queue
- import random
- from collections import namedtuple
-
- Event = namedtuple('Event','time ident action')
-
- DEFAULT_NUMBER_OF_TAXIS = 3
- DEFAULT_END_TIME = 180
- SEARCH_DURATION = 5
- TRIP_DURATION = 20
- DEPARTURE_INTERVAL = 5
-
- def taxi_process(ident,trips,start_time =0):
-
- time = yield Event(start_time,ident,'leave garage')
- for i in range(trips):
- time = yield Event(time,ident,'pick up passenger')
- time = yield Event(time,ident,'drop off passenger')
- yield Event(time,ident,'go home')
-
-
-
- class Simulator:
-
- def __init__(self,procs_map):
- self.events = queue.PriorityQueue()
- self.procs = dict(procs_map)
-
- def run(self,end_time):
-
- for _,proc in self.procs.items():
- # 放入每位司机的初始事件(从停车场出发)
- self.events.put(next(proc))
-
- # 仿真的主循环
- sim_time = 0
-
- while sim_time < end_time:
- if self.events.empty():
- print('end simulate')
- break
-
- # 取出start_time最早的事件
- current_event = self.events.get()
- sim_time, ident, previous_action = current_event
- # 输出当前事件
- print('taxi:',ident,ident *' ',current_event)
-
-
- # 当前事件,是归属于哪一位司机
- active_proc = self.procs[ident]
- # 获取该司机下一个事件,并传递下一个事件开始时间
- next_time = sim_time + Caculate_nexttime(previous_action)
- try:
- next_event = active_proc.send(next_time)
- except StopIteration:
- # 如果某位司机的事件已经遍历完毕,就删掉其对应记录
- del self.procs[ident]
- else:
- # 将下一次事件放入事件库
- self.events.put(next_event)
- else:
- msg = '** end stimulation time: {} events pending ***'
- msg.format(self.events.qsize())
-
-
-
- def Caculate_nexttime(previous_action):
- if previous_action in ['leave garage','drop off passenger']:
- # 新状态是四处徘徊
- interval = SEARCH_DURATION
-
- elif previous_action == 'pick up passenger':
- # 新状态是行程开始
- interval = TRIP_DURATION
- elif previous_action == 'go home':
- interval = 1
- else:
- raise ValueError('Unknown previous_action:%s' % previous_action)
- return int(random.expovariate(1/interval)) + 1
-
-
-
-
-
- def main(end_time = DEFAULT_END_TIME,num_taxis = DEFAULT_NUMBER_OF_TAXIS,seed = None):
- if seed is not None:
- random.seed(seed)
-
- # 默认创建的司机事件集合
- """
- proc_map = {
- '0':taxi_process(0,2,0),
- '1':taxi_process(1,4,5),
- '2':taxi_process(2,6,10)
- }
-
- """
- proc_map = {i:taxi_process(i,(i+1) *2 ,i * DEPARTURE_INTERVAL ) for i in range(num_taxis)}
- sim = Simulator(proc_map)
- sim.run(end_time)
-
- if __name__ == '__main__':
-
- main(seed=3)
