关注我的公众号YueTan进行交流探讨
欢迎关注我的APS仓库:https://github.com/yuetan1988/python-lekin
APS系列入门
本次解读的代码来自:
https://github.com/samy-barrech/Flexible-Job-Shop-Scheduling-Problem/tree/master/app
关于生产排程的问题。
6 6 1
6 1 3 1 1 1 3 1 2 6 1 4 7 1 6 3 1 5 6
6 1 2 8 1 3 5 1 5 10 1 6 10 1 1 10 1 4 4
6 1 3 5 1 4 4 1 6 8 1 1 9 1 2 1 1 5 7
6 1 2 5 1 1 5 1 3 5 1 4 3 1 5 8 1 6 9
6 1 3 9 1 2 3 1 5 5 1 6 4 1 1 3 1 4 1
6 1 2 3 1 4 3 1 6 9 1 1 10 1 5 4 1 3 1
第一行:代表6个job,6个机器,每个工序平均需要1台机器
之后的6行代表其中的6job,第二行是第一个job,需要6个工序来完成,第一个工序可以1台机器完成,机器3,需要1个单位时间。
注意从scheduler里记录已完成job, 待完成job; job里记录已完成actitity, 未完成activity; activity里记录已完成op, 待完成op
1 parse => jobs_list, machines_list
2 heuristic = Heuristic().select
3 Scheduler(machine_list, jobs_list)
4 Scheduler.run(heuristic)
def run(self, heuristic, verbose=True):
# Disable print if verbose is False
if not verbose:
sys.stdout = None
current_step = 0
while len(self.__jobs_to_be_done) > 0: # 只要有未完成工作
current_step += 1
best_candidates = heuristic(self.__jobs_to_be_done, self.__max_operations, current_step) # 不同规则返回最佳candidate,一次完成一个工作
for id_machine, candidates in best_candidates.items(): # 最佳候选里,是关于多个机器,其能承受最大工序的
machine = self.__machines[id_machine - 1] # id转化为对象
for activity, operation in candidates:
if not (machine.is_working_at_max_capacity() or activity.is_pending): # 符合条件的分配给机器
machine.add_operation(activity, operation)
for machine in self.__machines: # 机器时间开始流动
machine.work()
for job in self.__jobs_to_be_done: # 检查更新job是否完整状态,更新未完成的job
if job.is_done:
self.__jobs_to_be_done = list(
filter(lambda element: element.id_job != job.id_job, self.__jobs_to_be_done))
self.__jobs_done.append(job)
print(colored("[SCHEDULER]", "green"), "Done in " + str(current_step) + " units of time")
# Reenable stdout
if not verbose:
sys.stdout = self.__original_stdout
return current_step
对于每个job的排产位于heuristic函数中进行
class Heuristics:
# When a choice between multiple operations is available, always pick the first one
@staticmethod
def select_first_operation(jobs_to_be_done, max_operations, _):
best_candidates = {}
for job in jobs_to_be_done: # 从待完成的job里找到best_candidate
current_activity = job.current_activity # 向下选择activity/ operation,由于activity要按顺序完成,选择每个job当前
best_operation = current_activity.shortest_operation # 当前里最短的工序
if best_candidates.get(best_operation.id_machine) is None: # 如果最佳候选里暂时没有这台机器的安排
best_candidates.update({best_operation.id_machine: [(current_activity, best_operation)]})
elif len(best_candidates.get(best_operation.id_machine)) < max_operations: # 如果最佳候选还能继续安排工作,加上去
best_candidates.get(best_operation.id_machine).append((current_activity, best_operation))
else: # 机器已经满载了,那么就从里面选
list_operations = best_candidates.get(best_operation.id_machine)
for key, (_, operation) in enumerate(list_operations):
if operation.duration < best_operation.duration: # 原本候选集里比最新这个时间更短的,pop一个出去
list_operations.pop(key)
break
if len(list_operations) < max_operations: # 如果有pop,就把最新的加上去。否则其实还是原本的候选
list_operations.append((current_activity, best_operation))
return best_candidates