SimPy: Simulating Real-World Processes With Python
仿真环境:电影院仿真
目标:减少顾客的平均等待时间,少于10分钟
在开始仿真前,先思考这个仿真过程,顾客在坐下来看电影前需要经过哪些步骤
这些步骤中又一些是可以控制的,比如有多少雇员在卖票或者卖小零食,有一些步骤需要依赖之前的数据进行预测,比如有多少顾客到达,接下来开始仿真过程,首先导入需要的库
import random
import simpy
import statistics
记录优化目标:找到雇员的最佳数量,使所有顾客的平均等待时间小于10分钟,使用列表存储顾客等待时间
wait_times = []
构建仿真第一步是分析系统蓝图,也就是你的整个环境中会发生的事,会从某个地方移动到另一个地方的人或物,环境可以是任何类型的系统,如银行,洗车所,安全检查等,在本例中,环境是一个电影院,因此定义类名:Theater
class Theater(object):
def __init__(self):
pass
现在开始思考电影院的组成部分,首先肯定有theater本身,也就是你的environment,之后,你会用simpy的一些函数补充theater使其更像真实的环境,现在只需要简单将其添加到类定义中
class Theater(object):
def __init__(self, env):
self.env = env
接下来继续思考电影院还会有什么,通过之前分析的步骤可以知道,当顾客到达的时候,他们需要在指定位置排队,然后有收银员帮助顾客购票,也就是说,环境里有以下两件事:
cashiers可以看成电影院提供给顾客的资源resource,他们帮助顾客完成买票这个进程process,但是目前你不知道在仿真环境里有多少cashiers,实际上,这就是你需要解决的问题,顾客等待时间也取决于cashier数量,你可以把这个未知数量称为num_cashiers,实际取值可以之后筛选,现在你只知道cashier是theater环境必不可少的部分,将其添加到类定义中
class Theater(object):
def __init__(self, env, num_cashiers):
self.env = env
self.cashier = simpy.Resource(env, num_cashiers)
这样就增加了一个新参数num_cashiers,创建了一个资源self.cashier,使用simpy.Resource()表示在某时刻有多少资源。还需要考虑的是cashier帮顾客买票是需要时间的,可以通过历史数据得出买一张票大概需要1-3分钟,那么如何在simpy中表示这个过程?只需要使用timeout这个事件
yield self.env.timeout(random.randint(1, 3))
env.timeout()告诉simpy在经过指定时间后去触发某个事件,在本例中这个事件就是顾客买到票。现在把这个事件封装到函数里
class Theater(object):
def __init__(self, env, num_cashiers):
self.env = env
self.cashier = simpy.Resource(env, num_cashiers)
def purchase_ticket(self, moviegoer):
yield self.env.timeout(random.randint(1, 3))
触发purchase_ticket()事件的事顾客,因此需要传入moviegoer。目前为止就定义了一个有时间限制的资源,以及与它相关的进程,除了cashier,还有两类资源需要定义
假设经过历史数据得知,一位Usher检查一次票只需要3秒,一位Server交易一次需要1-5分钟,这俩类资源的处理过程如同上面的cashier,代码类似
class Theater(object):
def __init__(self, env, num_cashiers, num_ushers, num_servers):
self.env = env
self.cashier = simpy.Resource(env, num_cashiers)
self.usher = simpy.Resource(env, num_ushers)
self.server = simpy.Resource(env, num_servers)
def purchase_ticket(self, moviegoer):
yield self.env.timeout(random.randint(1, 3))
def check_ticket(self, moviegoer):
yield self.env.timeout(3 / 60)
def sell_food(self, moviegoer):
yield self.env.timeout(random.randint(1, 5))
目前为止已经通过定义类组建完了environment,有资源和进程,现在开始模拟一位顾客使用它们,当一位moviegoer到达影院时,就会开始申请各种资源,直到顾客要做的事件全部完成。
def go_to_movies(env, moviegoer, theater):
# Moviegoer到达theater
arrival_time = env.now
需要传入三个参数:
使用env.now可以获取每位moviegoer到达时间。moviegoer在theater中的每个process都有对应的requests来申请资源,例如第一个process是purchase_ticket(),需要使用一个cashier资源,moviegoer需要申请获取cashier才能执行process
cashier是一类共享资源,这意味着许多moviegoer使用相同的cashier,但是同一时刻,一个cashier只能帮助一位moviegoer,因此需要一些等待时间。
def go_to_movies(env, moviegoer, theater):
# Moviegoer到达theater
arrival_time = env.now
with theater.cashier.request() as request:
yield request
yield env.process(theater.purchase_ticket(moviegoer))
资源使用完后必须被释放release(),此处使用with语句,资源就会自动释放。当一个cashier空闲,moviegoer就会花一些时间买票,env.process()告诉仿真进入到Theater实例中运行purchase_ticket()。在检票这里也是一样的流程:request,use,release。但是顾客买零食是随机可选的,这种不确定性的事件可以使用随机数来表示。
def go_to_movies(env, moviegoer, theater):
# Moviegoer到达theater
arrival_time = env.now
with theater.cashier.request() as request:
yield request
yield env.process(theater.purchase_ticket(moviegoer))
with theater.usher.request() as request:
yield request
yield env.process(theater.check_ticket(moviegoer))
if random.choice([True, False]):
with theater.server.request() as request:
yield request
yield env.process(theater.sell_food(moviegoer))
仿真的目的是为了确定cashier,usher,server数量,减少顾客等待时间,因此关键在于记录顾客从到达到入座经过了多长时间
def go_to_movies(env, moviegoer, theater):
# Moviegoer到达theater
arrival_time = env.now
with theater.cashier.request() as request:
yield request
yield env.process(theater.purchase_ticket(moviegoer))
with theater.usher.request() as request:
yield request
yield env.process(theater.check_ticket(moviegoer))
if random.choice([True, False]):
with theater.server.request() as request:
yield request
yield env.process(theater.sell_food(moviegoer))
wait_times.append(env.now - arrival_time)
这里也可以用单独的列表departure_time存储离开时间,但是没有必要,这只是重复的代码,DRP(do not repeat yourself)
现在,你需要定义一个函数来运行仿真,run_theater()会实例化theater,不停的生成moviegoers直到仿真停止
def run_theater(env, num_cashiers, num_servers, num_ushers):
theater = Theater(env, num_cashiers, num_ushers, num_servers)
传入之前定义的三个参数:
这些参数决定了仿真环境的配置。假设在仿真开始的时候就有一些moviegoers在电影院等候,现实生活中也可能会有电影院还没开门的时候就有人等着。设置初始时刻有3位moviegoer在排队等待买票
def run_theater(env, num_cashiers, num_servers, num_ushers):
theater = Theater(env, num_cashiers, num_ushers, num_servers)
for moviegoer in range(3):
env.process(go_to_movies(env, moviegoer, theater))
使用range()生成3位moviegoer,然后使用env.process()告诉仿真准备按流程流动moviegoer,剩下的moviegoer会在某时刻到达theater,因此这个函数应该能不停的在仿真运行期间生成新的moviegoer。假设每12秒(0.2分钟)到达一位顾客,使用timeout表示时间间隔
def run_theater(env, num_cashiers, num_servers, num_ushers):
theater = Theater(env, num_cashiers, num_ushers, num_servers)
for moviegoer in range(3):
env.process(go_to_movies(env, moviegoer, theater))
while True:
yield env.timeout(0.2)
moviegoer += 1
env.process(go_to_movies(env, moviegoer, theater))
运行完wait_times列表记录了所有顾客的等待时间,对数据进行处理,计算均值,输出值用每分每秒表示
def calculate_wait_time(wait_times):
average_wait = statistics.mean(wait_times)
minutes, frac_minutes = divmod(average_wait, 1)
seconds = frac_minutes * 60
return round(minutes), round(seconds)
之前的环境取决于以下三个变量的值:
仿真的优势在于你可以随意测试这些参数改变仿真场景,可以单独设置函数来获取仿真配置参数
def get_user_input():
num_cashiers = input('cashier number: ')
num_ushers = input('ushers number: ')
num_servers = input('servers number: ')
params = [num_cashiers, num_ushers, num_servers]
# 检查输入是否错误
if all(str(i).isdigit() for i in params):
params = [int(x) for x in params]
else:
print('input wrong, simulation start with default value')
params = [1, 1, 1]
return params
最后一步就是创建主函数
def main():
# setup
random.seed(42)
num_cashiers, num_ushers, num_servers = get_user_input()
# run
env = simpy.Environment()
env.process(run_theater(env, num_cashiers, num_ushers, num_servers))
env.run(until=90)
# output
mins, secs = calculate_wait_time(wait_times)
print( "Running simulation...",
f"\nThe average wait time is {mins} minutes and {secs} seconds.")
最后回顾一下定义的类和函数
if __name__ == '__main__':
main()
cashier number: 2
ushers number: 2
servers number: 2
Running simulation...
The average wait time is 36 minutes and 44 seconds.
class Theater(object):
def __init__(self, env, num_cashiers, num_ushers, num_servers):
self.env = env
self.cashier = simpy.Resource(env, num_cashiers)
self.usher = simpy.Resource(env, num_ushers)
self.server = simpy.Resource(env, num_servers)
def purchase_ticket(self, moviegoer):
yield self.env.timeout(random.randint(1, 3))
def check_ticket(self, moviegoer):
yield self.env.timeout(3 / 60)
def sell_food(self, moviegoer):
yield self.env.timeout(random.randint(1, 5))
def go_to_movies(env, moviegoer, theater):
# Moviegoer到达theater
arrival_time = env.now
with theater.cashier.request() as request:
yield request
yield env.process(theater.purchase_ticket(moviegoer))
with theater.usher.request() as request:
yield request
yield env.process(theater.check_ticket(moviegoer))
if random.choice([True, False]):
with theater.server.request() as request:
yield request
yield env.process(theater.sell_food(moviegoer))
# Moviegoer离开theater
wait_times.append(env.now - arrival_time)
def run_theater(env, num_cashiers, num_servers, num_ushers):
theater = Theater(env, num_cashiers, num_ushers, num_servers)
for moviegoer in range(3):
env.process(go_to_movies(env, moviegoer, theater))
while True:
yield env.timeout(0.2)
moviegoer += 1
env.process(go_to_movies(env, moviegoer, theater))
def calculate_wait_time(wait_times):
average_wait = statistics.mean(wait_times)
minutes, frac_minutes = divmod(average_wait, 1)
seconds = frac_minutes * 60
return round(minutes), round(seconds)
def get_user_input():
num_cashiers = input('cashier number: ')
num_ushers = input('ushers number: ')
num_servers = input('servers number: ')
params = [num_cashiers, num_ushers, num_servers]
# 检查输入是否错误
if all(str(i).isdigit() for i in params):
params = [int(x) for x in params]
else:
print('input wrong, simulation start with default value')
params = [1, 1, 1]
return params
def main():
# setup
random.seed(42)
num_cashiers, num_ushers, num_servers = get_user_input()
# run
env = simpy.Environment()
env.process(run_theater(env, num_cashiers, num_ushers, num_servers))
env.run(until=90)
# output
mins, secs = calculate_wait_time(wait_times)
print( "Running simulation...",
f"\nThe average wait time is {mins} minutes and {secs} seconds.")
if __name__ == '__main__':
main()
cashier number: 2
ushers number: 3
servers number: 1
Running simulation...
The average wait time is 36 minutes and 43 seconds.