• 一个优美的时间片轮转调度算法模拟Python实现


    节选自本人博客:https://www.blog.zeeland.cn/archives/time-slice-rotation-scheduling-algorithm

    Introduction

    先放一下需求吧。

    设计一个有N个进程并发的进程调度程序。每个进程有一个进程控制块(PCB)表示(可以用PCB直接代表进程实体,略去每个进程的程序段和数据段的具体运行)。进程控制块可以包含如下信息:进程名、到达时间、需要运行时间、已用CPU时间、进程状态等等, 并初始化设置一批进程控制块实例,通过对进程控制块实例对象的控制,来模拟进程调度的控制过程。

    选取时间片轮转调度算法模拟实现,每进行一次调度,都打印一次运行进程、就绪队列、以及各个进程的PCB的相关信息,以便进行检查,具体要求如下:

    1. 每个进程的状态可以是就绪W(Wait)、运行R(Run)、阻塞B(Block)、完成F(Finish)四种状态之一。
    2. 进程的到达时间、运行时间为进程初始化时程序员输入的时间。
    3. 就绪进程获得CPU后只能运行一个时间片(时间片由程序员初始化输入),运行后更新已占用CPU时间。
    4. 若运行一个时间片后(或一个时间片内),进程的已占用CPU时间已达到所需要的运行时间,则撤消该进程;若运行一个时间片后进程的已占用CPU时间还未达所需要的运行时间,也就是进程还需要继续运行,则把它插入就绪队列队尾等待CPU。
    5. 重复以上过程,直到所要进程都完成为止。

    Realization

    如何写一个优美的时间片轮转调度算法模拟Python实现?加点面向对象就好了…

    from collections import deque
    
    PROCESS_STATE_ENUM = ['wait','run','block','finish']
    
    class PCB:
        """ 一个PCB(Process Control Block)代表一个Process"""
        def __init__(self, kwargs):
            self.process_name = kwargs['process_name']
            self.arrive_time = kwargs['arrive_time']
            self.need_time = kwargs['need_time']
            self.has_used_time = 0
            self.process_status = PROCESS_STATE_ENUM[0]
    
        def __str__(self):
            return '[PCB info] process_name: {0} arrive_time:{1} need_time:{2} has_used_time:{3} process_status:{4}'.format(self.process_name,\
                    self.arrive_time, self.need_time,self.has_used_time, self.process_status)
    
    class Dispatch:
        """ CPU调度器"""
        def __init__(self, pcb_list: list):
            self.pcb_list = pcb_list
            self.ready_queue = deque()
            self.time_slice = 1
            self.time_stamp = 0
    
        def run(self):
            for pcb in self.pcb_list:
                if pcb.arrive_time == self.time_stamp:
                    self.ready_queue.append(pcb)
                    self.pcb_list.remove(pcb)
    
            self.print_info(statue='begin')
    
            # 干活
            if len(self.ready_queue) != 0:
                # 开始工作,从就绪队列中取一个PCB运行
                pcb = self.ready_queue.popleft()
                pcb.process_status = PROCESS_STATE_ENUM[1]
    
                # 如果一个时间片内可以结束该进程
                if pcb.need_time - pcb.has_used_time < self.time_slice:
                    # 结束
                    pcb.process_status = PROCESS_STATE_ENUM[3]
                    self.time_stamp += pcb.need_time - pcb.has_used_time
                # 一个时间片内没有结束该进程
                else:
                    pcb.has_used_time += self.time_slice
                    pcb.process_status = PROCESS_STATE_ENUM[0]
                    self.ready_queue.append(pcb)
                    self.time_stamp += self.time_slice
            # 休息
            else:
                # 判断空闲的时间内有没有PCB进队列
                current_slice_work_time = 0
                for pcb in self.pcb_list:
                    if self.time_stamp < pcb.arrive_time < self.time_stamp + self.time_slice:
                        current_slice_work_time = pcb.arrive_time - self.time_stamp
                        self.ready_queue.append(pcb)
                        self.pcb_list.remove(pcb)
    
                # 如果有PCB进队列,则提前结束该时间片
                if current_slice_work_time != 0:
                    self.time_stamp += current_slice_work_time
                else:
                    self.time_stamp += self.time_slice
    
            self.print_info(statue='end')
    
            if not self.ready_queue and not self.pcb_list:
                return
            else:
                self.run()
    
        def print_info(self, statue='begin'):
            print('-------------------Dispatch Info {0}------------------'.format(statue))
            print("[time_slice] {0}".format(self.time_slice))
            print("[time_stamp] {0}".format(self.time_stamp))
            # for item in self.pcb_list:
            #     print(item)
            for item in self.ready_queue:
                print(item)
            print('------------------------------------------------------\n')
    
    class Application:
        def __init__(self):
            """" dispatch and all PCB init """
            pcb_info_list = [{'process_name': 'pcb1', 'arrive_time': 1, 'need_time': 0.5},
                             {'process_name': 'pcb2', 'arrive_time': 2, 'need_time': 3},
                             {'process_name': 'pcb3', 'arrive_time': 3, 'need_time': 2},
                             {'process_name': 'pcb4', 'arrive_time': 4, 'need_time': 1},
                             {'process_name': 'pcb5', 'arrive_time': 5, 'need_time': 2}]
            pcb_list = []
            # PCB init
            for item in pcb_info_list:
                pcb_list.append(PCB(item))
    
            self.dispatch = Dispatch(pcb_list)
    
        def run(self):
            self.dispatch.run()
    
    if __name__ == '__main__':
        app = Application()
        app.run()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
  • 相关阅读:
    面试官:说一说如何优雅的关闭线程池,我:shutdownNow,面试官:粗鲁!
    通过Power Platform自定义D365 CE 业务需求 - 4. 使用Power Automate
    dy、ks最新版通用quic协议解决方案
    VUE前端判断是电脑端还是移动端
    js单行代码-----dom
    Android Jetpack系列(九):WorkManager(源码篇)
    appium+python自动化测试
    pycharm的基本使用教程(1)
    【软件基础】pycharm2021.3.2安装汉化和python3.10.1环境配置
    YOLOv8快速复现 训练 SCB-Dataset3-S 官网版本 ultralytics
  • 原文地址:https://blog.csdn.net/linZinan_/article/details/127992856