• 6.S081-10 阻塞和唤醒+进程退出 - {Sleep & Week up}+{exit,wait,kill}


    6.S081-10 阻塞和唤醒+进程退出 - {Sleep & Week up}+{exit,wait,kill}

    文章目录

    0. 简单总结

    0.1 sleep & wakeup总结

    简单的sleep(缺少锁)——lost wakeup问题

    • 修改状态为SLEEPING;
    • 保存channel;
    • 线程切换,出让CPU;
    broken_sleep(uint64_t chan) {
      // need acquire(&p->lk) here.
    	p -> state = SLEEPING;
    	p -> chan = chan;
    	swtch();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    wakeup的实现⬇️

    wakeup函数中会查询进程表单中的所有进程。

    • 如果是SLEEPING并且对应的channer是当前wakeup的参数:将状态置为RUNNABLE。
    wakeup(uint64_t chan) {
    	for each p in procs[] 
    		if (p -> state == SLEEP && p ->chan == chan) {
    			p -> chan == chan
    			p -> state == RUNNABLE
    		}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    真正的sleep函数⬇️

    • 释放传入参数的锁release(lk);——这样中断才能获取到锁。
    • 因为我们不能让wakeup在release(lk)之后被执行,因此在release(lk)之前还要对进程加锁。 ——wakeup在唤醒一个进程前,需要先获取进程的锁。所以在整个时间uartwrite检查条件之前到sleep函数中调用sched函数之间,这个线程一直持有了保护sleep条件的锁或者p->lock。
    // Atomically release lock and sleep on chan.
    // Reacquires lock when awakened.
    void
    sleep(void *chan, struct spinlock *lk)
    {
      struct proc *p = myproc();
      
      // Must acquire p->lock in order to
      // change p->state and then call sched.
      // Once we hold p->lock, we can be
      // guaranteed that we won't miss any wakeup
      // (wakeup locks p->lock),
      // so it's okay to release lk.
    
      acquire(&p->lock);  //DOC: sleeplock1
      release(lk);
    
      // Go to sleep.
      p->chan = chan;
      p->state = SLEEPING;
    
      sched();
    
      // Tidy up.
      p->chan = 0;
    
      // Reacquire original lock.
      release(&p->lock);
      acquire(lk);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    0.2 进程退出的总结

    • 0.2.1 exit仅仅做了4件事 (关闭文件——ZOMBIE——wakeup parent——sched)

      1. 释放了本进程打开的文件和目录;

      2. 将状态设置为ZOMBIE;

      3. 将父进程的wait函数(sleep),唤醒(wakeup);

      4. 调用sched() 函数,释放CPU的占用权利,并永不返回。

    直到子进程exit的最后,它都没有释放所有的资源,因为它还在运行的过程中,所以不能释放这些资源。相应的其他的进程,也就是父进程,释放了运行子进程代码所需要的资源。这样的设计可以让我们极大的精简exit的实现。

    • 0.2.2 wait才是真正的释放资源的地方

      1. 等待子进程退出;(sleep,等待子进程的exit将它wakeup)

      2. 在进程列表中找到父进程是本进程,并且状态是ZOMBIE的进程,释放它的资源,包括⬇️;
        (1) trapframe;
        (2) pagetable;
        (3) 各种进程状态清零:sz,pid,parent,name,chan,killed,state,xstate;

        其中:sz = size,页表大小; chan = channel,wakeup和sleep的信号标记(用来唯一标识wakeup应该唤醒哪个进程的);

        (4) 当父进程完成了清理进程的所有资源,子进程的状态会被设置成UNUSED。之后,fork系统调用才能重用进程在进程表单的位置。

    wait不仅是为了父进程方便的知道子进程退出,wait实际上也是进程退出的一个重要组成部分。——完成了很多资源释放的过程,并且真正将state,sz,name,parent等清零

    • 0.2.3 kill几乎什么都不做

      严格来说kill只有两件事: p->killed = 1; SLEEPING -> RUNNABLE && ret from sleep.

      1. 设置p->killed = 1;
      2. 如果进程是SLEEPING,将会被设置为RUNNABLE,并且从sleep中返回;
      3. 进程在一些地方自动检查if p->killed == 1, exit;
      4. 备注:如果需要保持某些操作的原子性,可以在操作过程中不仅行检查;常见的检查地点:一些sleep调用返回的地方,比如piperead的sleep返回之后,比如执行系统调用之前,比如时钟中断之后,比如系统调用从OS返回时…

    0.3 备注:以下⬇️三篇博客,可能对理解本节课有帮助。

    1. 场景介绍:线程切换中锁的限制

    总共两条:

    • 线程切换的过程中需要一直持有p->lock;——即yield函数中首先加锁,然后改变状态,然后调用swtch等(上节课的内容 6.S081 附加Lab3 线程切换——源代码实现(trap,yeild,context,Scheduler))——主要是怕切换成runnable状态后被其他CPU拿去运行。——但是其实这时候context等还没保存呢。
    • XV6中,不允许进程在执行switch函数的过程中,持有任何其他的锁。所以,进程在调用switch函数的过程中,必须要持有p->lock(注,也就是进程对应的proc结构体中的锁),但是同时又不能持有任何其他的锁。

    关于第二点的原因:一个进程切换出去的时候,如果仍然持有锁,那么将可能引起换入进程无法获取资源,导致死锁。

    场景如下👇

    我们有进程P1,P1的内核线程持有了p->lock以外的其他锁,这些锁可能是在使用磁盘,UART,console过程中持有的。之后内核线程在持有锁的时候,通过调用switch/yield/sched函数出让CPU,这会导致进程P1持有了锁,但是进程P1又不在运行。

    假设我们在一个只有一个CPU核的机器上,进程P1调用了switch函数将CPU控制转给了调度器线程,调度器线程发现还有一个进程P2的内核线程正在等待被运行,所以调度器线程会切换到运行进程P2。假设P2也想使用磁盘,UART或者console,它会对P1持有的锁调用acquire,这是对于同一个锁的第二个acquire调用。当然这个锁现在已经被P1持有了,所以这里的acquire并不能获取锁。假设这里是spinlock,那么进程P2会在一个循环里不停的“旋转”并等待锁被释放。但是很明显进程P2的acquire不会返回,所以即使进程P2稍后愿意出让CPU,P2也没机会这么做。之所以没机会是因为P2对于锁的acquire调用在直到锁释放之前都不会返回,而唯一锁能被释放的方式就是进程P1恢复执行并在稍后release锁,但是这一步又还没有发生,因为进程P1通过调用switch函数切换到了P2,而P2又在不停的“旋转”并等待锁被释放。这是一种死锁,它会导致系统停止运行。

    ——注意,acquire,swith,release等都是发生在内核中,并且acquire等待锁的过程中,关闭了中断acquire函数中第一件事情就是关闭中断,之后再“自旋”等待锁释放。你或许会想,为什么不能先“自旋”等待锁释放,再关闭中断?因为这样会有一个短暂的时间段锁被持有了但是中断没有关闭,在这个时间段内的设备的中断处理程序可能会引起死锁。),因此无法通过计时器中断来让线程出让CPU了。

    2. Sleep&Wakeup 接口函数接口

    当你在写一个线程的代码时,有些场景需要等待一些特定的事件,或者不同的线程之间需要交互。

    • 假设我们有一个Pipe,并且我正在从Pipe中读数据。但是Pipe当前又没有数据,所以我需要等待一个Pipe非空的事件。

    • 假设我在读取磁盘,我会告诉磁盘控制器请读取磁盘上的特定块。这或许要花费较长的时间,尤其当磁碟需要旋转时
      (通常是毫秒级别),磁盘才能完成读取。而执行读磁盘的进程需要等待读磁盘结束的事件。

    • 类似的,一个Unix进程可以调用wait函数。这个会使得调用进程等待任何一个子进程退出。所以这里父进程有意的在等待另一个进程产生的事件。

    2.1 如何实现线程/进程等待某一特定事件发生的代码?

    2.1.1 一种最简单的思想(类似于spinlock)⬇️

    while (pipe_buf_is_empty);
    
    • 1

    这种写法是有它的可取之处的——当我们要等待的事件极有可能在0.1微秒之内发生,这样的循环等待或许是最好的实现方式。

    但是当运行时间是一个很长的时间(比如几毫秒,甚至几分钟时间),那我们不想一直循环浪费被来可以用来完成其他任务的CPU时间。——这个时候通过类似于swtch函数的机制,出让CPU,直到等待的事件发生。

    2.1.2 Coordination:出让CPU,直到等待事件发生了,才去恢复执行。Unix中使用的是Sleep & Wakeup这种方式。

    这个在4. 如何避免lost wakeup中细讲。

    2.2 Sleep & Wakeup代码使用实例⬇️

    以下代码是本节课专用,xv6系统中的代码与之有所不同。

    // transmit buf[].
    void
    uartwrite(char buf[], int n) 
    {
    	acquire(&uart_tx_lock);
    	
    	int i = 0;
    	while (i < n) {
    		while (tx_done == 0) {
    			// UART is busy sending a character.
    			// wait for it to interrupt.
    			sleep(&tx_chan, &uart_tx_lock);
    		}
    		
    		WriteReg(THR, buf[i]);
    		i += 1;
    		tx_done = 0;
    	}
    	
    	release(&uart_tx_lock);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    当shell需要输出时会调用write系统调用最终走到uartwrite函数中,这个函数会在循环中将buf中的字符一个一个的向UART硬件写入。这是一种经典的设备驱动实现风格。UART硬件一次只能接受一个字符的传输,而通常来说会有很多字符需要写到UART硬件。你可以向UART硬件写入一个字符,并等待UART硬件说:好的我完成了传输上一个字符并且准备好了传输下一个字符,之后驱动程序才可以写入下一个字符。因为这里的硬件可能会非常慢,或许每秒只能传输1000个字符,所以我们在两个字符之间的等待时间可能会很长。而1毫秒在现在计算机上是一个非常非常长的时间,它可能包含了数百万条指令时间,所以我们不想通过循环来等待UART完成字符传输,我们想通过一个更好的方式来等待。如大多数操作系统一样,XV6也的确存在更好的等待方式。

    UART硬件会在完成传输一个字符后,触发一个中断。所以UART驱动中除了uartwrite函数外,还有名为uartintr的中断处理程序。这个中断处理程序会在UART硬件触发中断时由trap.c代码调用。

    // handle a uart interrupt, raised because input has
    // arrived, or the uart is ready for more output, or
    // both. called from devintr().
    void
    uartintr(void)
    {
      acquire(&uart_tx_lock);
      if((ReadReg(LSR) & LSR_TX_IDLE) == 0){
        // UART finished transmitting; wakeup any sending thread.
        tx_done = 1;
        wakeup(&tx_chan);
      }
      release(&uart_tx_lock);
      
      // read and process incoming characters.
      while(1){
        int c = uartgetc();
        if(c == -1)
          break;
        consoleintr(c);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    中断处理程序会在最开始读取UART对应的memory mapped register,并检查其中表明传输完成的相应的标志位,也就是LSR_TX_IDLE标志位。如果这个标志位为1,代码会将tx_done设置为1,并调用wakeup函数。这个函数会使得uartwrite中的sleep函数恢复执行,并尝试发送一个新的字符。所以这里的机制是,如果一个线程需要等待某些事件,比如说等待UART硬件愿意接收一个新的字符,线程调用sleep函数并等待一个特定的条件。当特定的条件满足时,代码会调用wakeup函数。这里的sleep函数和wakeup函数是成对出现的。我们之后会看sleep函数的具体实现,它会做很多事情最后再调用switch函数来出让CPU。

    这里有件事情需要注意,sleep和wakeup函数需要通过某种方式链接到一起。也就是说,如果我们调用wakeup函数,我们只想唤醒正在等待刚刚发生的特定事件的线程。所以,sleep函数和wakeup函数都带有一个叫做sleep channel的参数。我们在调用wakeup的时候,需要传入与调用sleep函数相同的sleep channel。不过sleep和wakeup函数只是接收表示了sleep channel的64bit数值,它们并不关心这个数值代表什么。当我们调用sleep函数时,我们通过一个sleep channel表明我们等待的特定事件,当调用wakeup时我们希望能传入相同的数值来表明想唤醒哪个线程。

    学生提问:进程会在写入每个字符时候都被唤醒一次吗?
    Robert教授:在这个我出于演示目的而特别改过的UART驱动中,传输每个字符都会有一个中断,所以你是对的,对于buffer中的每个字符,我们都会等待UART可以接收下一个字符,之后写入一个字符,将tx_done设置为0,回到循环的最开始并再次调用sleep函数进行睡眠状态,直到tx_done为1。当UART传输完了这个字符,uartintr函数会将tx_done设置为1,并唤醒uartwrite所在的线程。所以对于每个字符都有调用一次sleep和wakeup,并占用一次循环。

    UART实际上支持一次传输4或者16个字符,所以一个更有效的驱动会在每一次循环都传输16个字符给UART,并且中断也是每16个字符触发一次。更高速的设备,例如以太网卡通常会更多个字节触发一次中断。

    对于sleep函数,有一个有趣的参数,我们需要将一个锁作为第二个参数传入,这背后是一个大的故事,我后面会介绍背后的原因。总的来说,不太可能设计一个sleep函数并完全忽略需要等待的事件。所以很难写一个通用的sleep函数,只是睡眠并等待一些特定的事件,并且这也很危险,因为可能会导致lost wakeup,而几乎所有的Coordination机制都需要处理lost wakeup的问题。在sleep接口中,我们需要传入一个锁是一种稍微丑陋的实现,我在稍后会再介绍。

    3. Lost wakeup —— 不给sleep传入锁

    在解释sleep函数为什么需要一个锁使用作为参数传入之前,我们先来看看假设我们有了一个更简单的不带锁作为参数的sleep函数,会有什么样的结果。这里的结果就是lost wakeup。

    3.1 简单sleep的实现(不传入锁)

    假设我们设计的sleep函数只有一个chan作为传入参数,没有传入锁的设计,那么它将不能正常工作,我们称之为broken_wakeup。

    现在我们可以设计sleep函数如下⬇️(需要修改进程状态,保留chan,并进行线程切换出让CPU

    broken_sleep(uint64_t chan) {
      // need acquire(&p->lk) here.
    	p -> state = SLEEPING;
    	p -> chan = chan;
    	swtch();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 修改状态为SLEEPING;
    • 保存channel;
    • 线程切换,出让CPU;

    上面的代码有啥问题?——很显然,进程状态修改和swtch并不是一个原子的操作,因此要对p进行加锁操作。(这里在进程切换的博客中已经反复强调了)。

    3.2 对应wakeup的实现

    wakeup(uint64_t chan) {
    	for each p in procs[] 
    		if (p -> state == SLEEP && p ->chan == chan) {
    			p -> chan == chan
    			p -> state == RUNNABLE
    		}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • wakeup函数中会查询进程表单中的所有进程。
      • 如果是SLEEPING并且对应的channer是当前wakeup的参数:将状态置为RUNNABLE。

    3.3 现实演示,回到UART驱动函数

    下面介绍UART驱动函数如何使用我们刚写出来的sleep和wakeup,并展示问题所在。

    3.3.1 uartwrite()

    注意,我们的uartwrite是演示函数,在这里面,每次只传输一个字符。

    int done;		// 标志位,标识上一次传输是否完成
    
    void uartwrite(char buf[]) {
    	for each c in buf:
      	while (!done):
          sleep(&tx_chan);
      	send(c);
      	done = 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    ⬆️对每个字符,检查是否done,如果没有,那就sleep,如果已经done,就开始传输本字符,并将done置为0.

    3.3.2 uartintr()

    void uartintr(void) {
    	done = 1;
      wakeup(&tx_chan);
    }
    
    • 1
    • 2
    • 3
    • 4

    ⬆️这里就很简单,就是将done置为1,并且wakeup。

    3.4 broken_sleep的问题 + 使用实例:——缺少锁。

    • done是共享数据,需要为done加锁;
    • 两个函数都要访问UART硬件,通常来说让两个线程并发的访问memory mapped register是错误的行为。

    3.4.1 在哪儿加锁?

    uartintr函数很简单,在开始加锁,结尾解锁。

    void uartintr(void) {
      lock;
    	done = 1;
      wakeup(&tx_chan);
      unlock;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    但是uartwrite?

    方案1 对每个字符的传递过程加锁——一定会死锁
    int done;		// 标志位,标识上一次传输是否完成
    
    void uartwrite(char buf[]) {
    	for each c in buf:
      	lock;
      	while (!done):
          sleep(&tx_chan);
      	send(c);
      	done = 0;
      	unlock;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    我们能从while not done的循环退出的唯一可能是中断处理程序将done设置为1。但是如果我们为整个代码段都加锁的话,中断处理程序就不能获取锁了,中断程序会不停“自旋”并等待锁释放。而锁被uartwrite持有,在done设置为1之前不会释放。而done只有在中断处理程序获取锁之后才可能设置为1。所以我们不能在发送每个字符的整个处理流程都加锁。

    方案2 sleep之前释放锁,wakeup之后再加一次锁 —— 仍然不可以(看现实演示)
    int done;		// 标志位,标识上一次传输是否完成
    
    void uartwrite(char buf[]) {
    	for each c in buf:
      	lock;
      	while (!done):
      		unlock;
          sleep(&tx_chan);
          lock;
      	send(c);
      	done = 0;
      	unlock;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在传输字符的最开始获取锁,因为我们需要保护共享变量done,但是在调用sleep函数之前释放锁。这样中断处理程序就有可能运行并且设置done标志位为1。之后在sleep函数返回时,再次获取锁。

    3.4.2 breken_sleep现实演示(按照上述方案2)

    uartwrite⬇️

    // transmit buf[].
    void
    uartwrite(char buf[], int n) 
    {
    	acquire(&uart_tx_lock);
    	
    	int i = 0;
    	while (i < n) {
    		while (tx_done == 0) {
    			// UART is busy sending a character.
    			// wait for it to interrupt.
    			
    			// sleep(&tx_chan, &uart_tx_lock);
    			release(&uart_tx_lock);
    			broken_sleep(&tx_chan);
    			acquire(&uart_tx_lock);
    		}
    		
    		WriteReg(THR, buf[i]);
    		i += 1;
    		tx_done = 0;
    	}
    	
    	release(&uart_tx_lock);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    uartintr⬇️

    // handle a uart interrupt, raised because input has
    // arrived, or the uart is ready for more output, or
    // both. called from devintr().
    void
    uartintr(void)
    {
      acquire(&uart_tx_lock);
      if((ReadReg(LSR) & LSR_TX_IDLE) == 0){
        // UART finished transmitting; wakeup any sending thread.
        tx_done = 1;
        wakeup(&tx_chan);
      }
      release(&uart_tx_lock);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    结果:启动xv6的过程输出如下⬇️

    ➜  xv6-riscv git:(riscv) make qemu
    qemu-system-riscv64 -machine virt -bios none -kernel kernel/kernel -m 128M -smp 3 -nographic -global virtio-mmio.force-legacy=false -drive file=fs.img,if=none,format=raw,id=x0 -device virtio-blk-device,drive=x0,bus=virtio-mmio-bus.0
    
    xv6 kernel is booting
    
    hart 1 starting
    hart 2 starting
    init: sta
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    原本应该输出⬇️

    ➜  xv6-riscv git:(riscv) make qemu
    qemu-system-riscv64 -machine virt -bios none -kernel kernel/kernel -m 128M -smp 3 -nographic -global virtio-mmio.force-legacy=false -drive file=fs.img,if=none,format=raw,id=x0 -device virtio-blk-device,drive=x0,bus=virtio-mmio-bus.0
    
    xv6 kernel is booting
    
    hart 1 starting
    hart 2 starting
    init: starting sh
    $
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    如果我输入任意字符,剩下的字符就能输出。这里发生了什么?

    这里的问题必然与之前修改的代码相关。在前面的代码中,sleep之前释放了锁,但是在释放锁和broken_sleep之间可能会发生中断。

    			release(&uart_tx_lock);
    			
    			// INTERRUPT 
    			
    			broken_sleep(&tx_chan);
    			acquire(&uart_tx_lock);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    一旦释放了锁,当前CPU的中断会被重新打开。因为这是一个多核机器,所以中断可能发生在任意一个CPU核。在上面代码标记的位置,其他CPU核上正在执行UART的中断处理程序,并且正在acquire函数中等待当前锁释放。所以一旦锁被释放了,另一个CPU核就会获取锁,并发现UART硬件完成了发送上一个字符,之后会设置tx_done为1,最后再调用wakeup函数,并传入tx_chan。目前为止一切都还好,除了一点:现在写线程还在执行并位于release和broken_sleep之间,也就是写线程还没有进入SLEEPING状态,所以中断处理程序中的wakeup并没有唤醒任何进程,因为还没有任何进程在tx_chan上睡眠。之后写线程会继续运行,调用broken_sleep,将进程状态设置为SLEEPING,保存sleep channel。但是中断已经发生了,wakeup也已经被调用了。所以这次的broken_sleep,没有人会唤醒它,因为wakeup已经发生过了。这就是lost wakeup问题。

    继续解释为什么输入一个字符又可以输出了?——因为输入的时候触发了uart的输入中断,恰好可以发送wakeup。

    4. 如何避免lost wakeup

    首先总结一下需求:

    release(&uart_tx_lock);
    // INTERRUPT 
    broken_sleep(&tx_chan);
    acquire(&uart_tx_lock);
    
    • 1
    • 2
    • 3
    • 4
    • 不能在sleep的时候还持有锁,不然会产生死锁;
    • 不能在release之后和sleep之间存在间隔,不然会产生lost wakeup问题;

    因此,我们的sleep可以设计如下⬇️

    即使sleep不知道你在等待什么事件,但是他还是需要知道你在等待什么数据,并且传图一个用来保护你正在等待数据的锁。sleep函数需要特定的条件才能执行,而sleep自己又不需要知道这个条件是什么。

    4.1 真正的sleep函数⬇️

    • 释放传入参数的锁release(lk);——这样中断才能获取到锁。
    • 因为我们不能让wakeup在release(lk)之后被执行,因此在release(lk)之前还要对进程加锁。 ——wakeup在唤醒一个进程前,需要先获取进程的锁。所以在整个时间uartwrite检查条件之前到sleep函数中调用sched函数之间,这个线程一直持有了保护sleep条件的锁或者p->lock。
    // Atomically release lock and sleep on chan.
    // Reacquires lock when awakened.
    void
    sleep(void *chan, struct spinlock *lk)
    {
      struct proc *p = myproc();
      
      // Must acquire p->lock in order to
      // change p->state and then call sched.
      // Once we hold p->lock, we can be
      // guaranteed that we won't miss any wakeup
      // (wakeup locks p->lock),
      // so it's okay to release lk.
    
      acquire(&p->lock);  //DOC: sleeplock1
      release(lk);
    
      // Go to sleep.
      p->chan = chan;
      p->state = SLEEPING;
    
      sched();
    
      // Tidy up.
      p->chan = 0;
    
      // Reacquire original lock.
      release(&p->lock);
      acquire(lk);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    4.2 真正的wakeup函数⬇️

    和我们的简单wakeup没有什么不同,其实就是先lock,然后如果匹配了channel并且是SLEEPING就设置成RUNNABLE,最后unlock

    // Wake up all processes sleeping on chan.
    // Must be called without any p->lock.
    void
    wakeup(void *chan)
    {
      struct proc *p;
    
      for(p = proc; p < &proc[NPROC]; p++) {
        if(p != myproc()){
          acquire(&p->lock);
          if(p->state == SLEEPING && p->chan == chan) {
            p->state = RUNNABLE;
          }
          release(&p->lock);
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    4.3 对应的uart驱动实现

    // transmit buf[].
    void
    uartwrite(char buf[], int n) 
    {
    	acquire(&uart_tx_lock);
    	
    	int i = 0;
    	while (i < n) {
    		while (tx_done == 0) {
    			// UART is busy sending a character.
    			// wait for it to interrupt.
    			sleep(&tx_chan, &uart_tx_lock);
    		}
    		
    		WriteReg(THR, buf[i]);
    		i += 1;
    		tx_done = 0;
    	}
    	
    	release(&uart_tx_lock);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    uartwrite在最开始获取了sleep的condition lock,并且一直持有condition lock直到调用sleep函数。所以它首先获取了condition lock,之后检查condition(注,也就是tx_done等于0),之后在持有condition lock的前提下调用了sleep函数。此时wakeup不能做任何事情,wakeup现在甚至都不能被调用直到调用者能持有condition lock。所以现在wakeup必然还没有执行。

    sleep函数在释放condition lock之前,先获取了进程的锁。在释放了condition lock之后,wakeup就可以被调用了,但是除非wakeup获取了进程的锁,否则wakeup不能查看进程的状态。所以,在sleep函数中释放了condition lock之后,wakeup也还没有执行。

    在持有进程锁的时候,将进程的状态设置为SLEEPING并记录sleep channel,之后再调用sched函数,这个函数中会再调用switch函数,此时sleep函数中仍然持有了进程的锁,wakeup仍然不能做任何事情。

    如果你还记得的话,当我们从当前线程切换走时,调度器线程中会释放前一个进程的锁。所以在调度器线程释放进程锁之后,wakeup才能终于获取进程的锁,发现它正在SLEEPING状态,并唤醒它。

    这里的效果是由之前定义的一些规则确保的,这些规则包括了:

    • 调用sleep时需要持有condition lock,这样sleep函数才能知道相应的锁。
    • sleep函数只有在获取到进程的锁p->lock之后,才能释放condition lock。
    • wakeup需要同时持有两个锁才能查看进程。

    这样的话,我们就不会再丢失任何一个wakeup,也就是说我们修复了lost wakeup的问题。

    5. pipe中的sleep和wakeup

    前面主要是uart的传输字符的场景,接下来看一下pipe场景下的sleep & wakeup实现。

    int
    piperead(struct pipe *pi, uint64 addr, int n)
    {
      int i;
      struct proc *pr = myproc();
      char ch;
    
      acquire(&pi->lock);
      while(pi->nread == pi->nwrite && pi->writeopen){  //DOC: pipe-empty
        if(killed(pr)){
          release(&pi->lock);
          return -1;
        }
        sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
      }
      for(i = 0; i < n; i++){  //DOC: piperead-copy
        if(pi->nread == pi->nwrite)
          break;
        ch = pi->data[pi->nread++ % PIPESIZE];
        if(copyout(pr->pagetable, addr + i, &ch, 1) == -1)
          break;
      }
      wakeup(&pi->nwrite);  //DOC: piperead-wakeup
      release(&pi->lock);
      return i;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    简单来说,piperead就是

    • 首先获取pi -> lock; —— 保护进程(同时也是sleep传入的condition)
    • piperead需要等待的condition是pipe中有数据,而这个condition就是pi->nwrite大于pi->nread,也就是写入pipe的字节数大于被读取的字节数。
    • 如果这个condition不满足,那么piperead会调用sleep函数,并等待condition发生。同时piperead会将condition lock也就是pi->lock作为参数传递给sleep函数,以确保不会发生lost wakeup。
    int
    pipewrite(struct pipe *pi, uint64 addr, int n)
    {
      int i = 0;
      struct proc *pr = myproc();
    
      acquire(&pi->lock);
      while(i < n){
        if(pi->readopen == 0 || killed(pr)){
          release(&pi->lock);
          return -1;
        }
        if(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full
          wakeup(&pi->nread);
          sleep(&pi->nwrite, &pi->lock);
        } else {
          char ch;
          if(copyin(pr->pagetable, &ch, addr + i, 1) == -1)
            break;
          pi->data[pi->nwrite++ % PIPESIZE] = ch;
          i++;
        }
      }
      wakeup(&pi->nread);
      release(&pi->lock);
    
      return i;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    pipewrite会向pipe的缓存写数据,并最后在piperead所等待的sleep channel上调用wakeup。而我们想要避免这样的风险:在piperead函数检查发现没有字节可以读取,到piperead函数调用sleep函数之间,另一个CPU调用了pipewrite函数。因为这样的话,另一个CPU会向pipe写入数据并在piperead进程进入SLEEPING之前调用wakeup,进而产生一次lost wakeup。

    在pipe的代码中,pipewrite和piperead都将sleep包装在一个while循环中。piperead中的循环等待pipe的缓存为非空(pipewrite中的循环等待的是pipe的缓存不为full)。之所以要将sleep包装在一个循环中,是因为可能有多个进程在读取同一个pipe。如果一个进程向pipe中写入了一个字节,这个进程会调用wakeup进而同时唤醒所有在读取同一个pipe的进程。但是因为pipe中只有一个字节并且总是有一个进程能够先被唤醒,哦,这正好提醒了我有关sleep我忘记了一些非常关键的事情。sleep函数中最后一件事情就是重新获取condition lock。所以调用sleep函数的时候,需要对condition lock上锁(注,在sleep函数内部会对condition lock解锁),在sleep函数返回时会重新对condition lock上锁。这样第一个被唤醒的线程会持有condition lock,而其他的线程在重新对condition lock上锁的时候会在锁的acquire函数中等待。

    那个幸运的进程(注,这里线程和进程描述的有些乱,但是基本意思是一样的,当说到线程时是指进程唯一的内核线程)会从sleep函数中返回,之后通过检查可以发现pi->nwrite比pi->nread大1,所以进程可以从piperead的循环中退出,并读取一个字节,之后pipe缓存中就没有数据了。之后piperead函数释放锁并返回。接下来,第二个被唤醒的线程,它的sleep函数可以获取condition lock并返回,但是通过检查发现pi->nwrite等于pi->nread(注,因为唯一的字节已经被前一个进程读走了),所以这个线程以及其他所有的等待线程都会重新进入sleep函数。所以这里也可以看出,几乎所有对于sleep的调用都需要包装在一个循环中,这样从sleep中返回的时候才能够重新检查condition是否还符合。

    sleep和wakeup的规则稍微有点复杂。因为你需要向sleep展示你正在等待什么数据,你需要传入锁并遵循一些规则,某些时候这些规则还挺烦人的。另一方面sleep和wakeup又足够灵活,因为它们并不需要理解对应的condition,只是需要有个condition和保护这个condition的锁。

    除了sleep&wakeup之外,还有一些其他的更高级的Coordination实现方式。例如今天课程的阅读材料中的semaphore,它的接口就没有那么复杂,你不用告诉semaphore有关锁的信息。而semaphore的调用者也不需要担心lost wakeup的问题,在semaphore的内部实现中考虑了lost wakeup问题。因为定制了up-down计数器,所以semaphore可以在不向接口泄露数据的同时(注,也就是不需要向接口传递condition lock),处理lost wakeup问题。semaphore某种程度来说更简单,尽管它也没那么通用,如果你不是在等待一个计数器,semaphore也就没有那么有用了。这也就是为什么我说sleep和wakeup更通用的原因。

    6. exit

    exit是关于进程关闭(另一个可以关闭进程的函数是kill),exit代码如下⬇️,它主要做了:

    • 关闭所有已经打开的文件;
    • 关闭Current directory(cwd);
    • reparent将该进程的子进程的父进程全都设置成init进程;
    • wakeup(p->parent); —— 进程可能在wait子进程;
    • 如果该进程父进程不存在,那么将为该进程重新制定父进程为init进程;
    • 将进程设置为ZOMBIE —— 现在进程还没有完全释放它的资源,所以它还不能被重用。重用:我们期望在最后,进程的所有状态都可以被一些其他无关的fork系统调用复用,但是目前我们还没有到那一步。
    • 调用sched函数进入到调度器线程,并且永不返回;
    • 进程的状态是ZOMBIE,并且进程不会再运行,因为调度器只会运行RUNNABLE进程。同时进程资源也并没有完全释放,如果释放了进程的状态应该是UNUSED。但是可以肯定的是进程不会再运行了,因为它的状态是ZOMBIE。所以调度器线程会决定运行其他的进程。

    值得注意的一点是,执行exit并没有将自己的进程包含的 内存等资源释放,仅仅是释放了文件描述符和文件目录,并且将状态设置为ZOMBIE,并且wakeup了父进程的wait。

    // Exit the current process.  Does not return.
    // An exited process remains in the zombie state
    // until its parent calls wait().
    void
    exit(int status)
    {
      struct proc *p = myproc();
    
      if(p == initproc)
        panic("init exiting");
    
      // Close all open files.
      for(int fd = 0; fd < NOFILE; fd++){
        if(p->ofile[fd]){
          struct file *f = p->ofile[fd];
          fileclose(f);
          p->ofile[fd] = 0;
        }
      }
    
      begin_op();
      iput(p->cwd);
      end_op();
      p->cwd = 0;
    
      acquire(&wait_lock);
    
      // Give any children to init.
      reparent(p);
    
      // Parent might be sleeping in wait().
      wakeup(p->parent);
      
      acquire(&p->lock);
    
      p->xstate = status;
      p->state = ZOMBIE;
    
      release(&wait_lock);
    
      // Jump into the scheduler, never to return.
      sched();
      panic("zombie exit");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    7. wait

    通过Unix的exit和wait系统调用的说明,我们可以知道如果一个进程exit了,并且它的父进程调用了wait系统调用,父进程的wait会返回。wait函数的返回表明当前进程的一个子进程退出了。所以接下来我们看一下wait系统调用的实现。

    整个wait几乎都是建立在一个for循环上,主要操作是,找到一个进程的父进程是自己并且状态是ZOMBIE的进程,调用freeproc函数。

    // Wait for a child process to exit and return its pid.
    // Return -1 if this process has no children.
    int
    wait(uint64 addr)
    {
      struct proc *pp;
      int havekids, pid;
      struct proc *p = myproc();
    
      acquire(&wait_lock);
    
      for(;;){
        // Scan through table looking for exited children.
        havekids = 0;
        for(pp = proc; pp < &proc[NPROC]; pp++){
          if(pp->parent == p){
            // make sure the child isn't still in exit() or swtch().
            acquire(&pp->lock);
    
            havekids = 1;
            if(pp->state == ZOMBIE){
              // Found one.
              pid = pp->pid;
              if(addr != 0 && copyout(p->pagetable, addr, (char *)&pp->xstate,
                                      sizeof(pp->xstate)) < 0) {
                release(&pp->lock);
                release(&wait_lock);
                return -1;
              }
              freeproc(pp);
              release(&pp->lock);
              release(&wait_lock);
              return pid;
            }
            release(&pp->lock);
          }
        }
    
        // No point waiting if we don't have any children.
        if(!havekids || killed(p)){
          release(&wait_lock);
          return -1;
        }
        
        // Wait for a child to exit.
        sleep(p, &wait_lock);  //DOC: wait-sleep
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    再来看看freeproc函数⬇️

    • 释放trapframe;

    • 释放pagetable;

    • 清零state,chan,killed,pid,sz,parent,name等;

    如果我们需要释放进程内核栈,那么也应该在这里释放。但是因为内核栈的guard page,我们没有必要再释放一次内核栈。不管怎样,当进程还在exit函数中运行时,任何这些资源在exit函数中释放都会很难受,所以这些资源都是由父进程释放的。

    // free a proc structure and the data hanging from it,
    // including user pages.
    // p->lock must be held.
    static void
    freeproc(struct proc *p)
    {
      if(p->trapframe)
        kfree((void*)p->trapframe);
      p->trapframe = 0;
      if(p->pagetable)
        proc_freepagetable(p->pagetable, p->sz);
      p->pagetable = 0;
      p->sz = 0;
      p->pid = 0;
      p->parent = 0;
      p->name[0] = 0;
      p->chan = 0;
      p->killed = 0;
      p->xstate = 0;
      p->state = UNUSED;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    wait不仅是为了父进程方便的知道子进程退出,wait实际上也是进程退出的一个重要组成部分。在Unix中,对于每一个退出的进程,都需要有一个对应的wait系统调用,这就是为什么当一个进程退出时,它的子进程需要变成init进程的子进程。init进程的工作就是在一个循环中不停调用wait,因为每个进程都需要对应一个wait,这样它的父进程才能调用freeproc函数,并清理进程的资源。

    8. kill

    除了exit以外,kill也可以让一个进程退出。实际上kill基本上不做任何事情。

    • 从进程表中找到需要killed的进程,将他的p->killed置为1;
    • 目标进程会自动执行exit的系统调用;
    // Kill the process with the given pid.
    // The victim won't exit until it tries to return
    // to user space (see usertrap() in trap.c).
    int
    kill(int pid)
    {
      struct proc *p;
    
      for(p = proc; p < &proc[NPROC]; p++){
        acquire(&p->lock);
        if(p->pid == pid){
          p->killed = 1;
          if(p->state == SLEEPING){
            // Wake process from sleep().
            p->state = RUNNABLE;
          }
          release(&p->lock);
          return 0;
        }
        release(&p->lock);
      }
      return -1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    p->killed == 1自动exit的时机有哪些(什么时候会检查p->killed == 1)?

    • 执行系统调用之前,会检查,如果p->killed == 1就自动exit;
    • 执行系统调用,从内核返回后,也会检查…;
    • 中断的时候也会检查;
      为什么执行系统调用期间不行?——在内核执行的时候,如果killed,可能会导致一些操作只做了一部分,导致一些操作不再是原子操作。

    8.1 kill的特点——只是设置killed,会在其他地方检查p->killed == 1,自动exit

    kill系统调用并不是真正的立即停止进程的运行,它更像是这样:如果进程在用户空间,那么下一次它执行系统调用它就会退出,又或者目标进程正在执行用户代码,当时下一次定时器中断或者其他中断触发了,进程才会退出。所以从一个进程调用kill,到另一个进程真正退出,中间可能有很明显的延时。

    8.2 SLEEPING状态的程序被killed了——会被设置为RUNNABLE,并且从sleep中返回

    但是问题来了,如果一个进程不在运行怎们办?——比如一个进程SLEEPING状态,但是可能几天后才会被唤醒咋整?

    ——其实从代码中就可以看到,其实是,如果进程状态为SLEEPING,killed将会把它的状态设置为RUNNABLE,并且让进程从sleep中返回。

    所以对于SLEEPING状态的进程,如果它被kill了,它会被直接唤醒,包装了sleep的循环会检查进程的killed标志位,最后再调用exit。

    8.3 一些磁盘操作等原子操作怎么保证?

    值得注意的是,还有一些操作,逻辑上可能是原子的,这些操作需要做的就是,在操作的过程中,不去检查p->killed == 1,因此这些过程中进程不会退出。比如一些文件操作virtio_disk.c:磁盘驱动中的sleep循环,这个循环中就没有检查进程的killed标志位。

    // Wait for virtio_disk_intr() to say request has finished.
    while(b->disk == 1) {
      sleep(b, &disk.vdisk_lock);
    }
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    品牌百度百科应该怎样创建?编辑品牌百度百科词条的秘籍!
    ajax请求
    c++模板
    SWAT-MODFLOW耦合
    【图像处理-计算机视觉学习路线】个人记录
    物联网通信协议-MQTT及使用python实现
    Linux C语言编译报错:undefined reference to `sem_init‘(编译时加 -lpthread)
    「Verilog学习笔记」求两个数的差值
    【router-view】切换组件 深刻理解用法 vue的设计思想
    『Spring Boot Actuator & Spring Boot Admin』 实现应用监控管理
  • 原文地址:https://blog.csdn.net/ahundredmile/article/details/126822062