简单的sleep(缺少锁)——lost wakeup问题
broken_sleep(uint64_t chan) {
// need acquire(&p->lk) here.
p -> state = SLEEPING;
p -> chan = chan;
swtch();
}
wakeup的实现⬇️
wakeup函数中会查询进程表单中的所有进程。
wakeup(uint64_t chan) {
for each p in procs[]
if (p -> state == SLEEP && p ->chan == chan) {
p -> chan == chan
p -> state == RUNNABLE
}
}
真正的sleep函数⬇️
// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
acquire(&p->lock); //DOC: sleeplock1
release(lk);
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
release(&p->lock);
acquire(lk);
}
释放了本进程打开的文件和目录;
将状态设置为ZOMBIE;
将父进程的wait函数(sleep),唤醒(wakeup);
调用sched() 函数,释放CPU的占用权利,并永不返回。
直到子进程exit的最后,它都没有释放所有的资源,因为它还在运行的过程中,所以不能释放这些资源。相应的其他的进程,也就是父进程,释放了运行子进程代码所需要的资源。这样的设计可以让我们极大的精简exit的实现。
等待子进程退出;(sleep,等待子进程的exit将它wakeup)
在进程列表中找到父进程是本进程,并且状态是ZOMBIE的进程,释放它的资源,包括⬇️;
(1) trapframe;
(2) pagetable;
(3) 各种进程状态清零:sz,pid,parent,name,chan,killed,state,xstate;
其中:sz = size,页表大小; chan = channel,wakeup和sleep的信号标记(用来唯一标识wakeup应该唤醒哪个进程的);
(4) 当父进程完成了清理进程的所有资源,子进程的状态会被设置成UNUSED。之后,fork系统调用才能重用进程在进程表单的位置。
wait不仅是为了父进程方便的知道子进程退出,wait实际上也是进程退出的一个重要组成部分。——完成了很多资源释放的过程,并且真正将state,sz,name,parent等清零
严格来说kill只有两件事: p->killed = 1; SLEEPING -> RUNNABLE && ret from sleep.
总共两条:
关于第二点的原因:一个进程切换出去的时候,如果仍然持有锁,那么将可能引起换入进程无法获取资源,导致死锁。
场景如下👇
我们有进程P1,P1的内核线程持有了p->lock以外的其他锁,这些锁可能是在使用磁盘,UART,console过程中持有的。之后内核线程在持有锁的时候,通过调用switch/yield/sched函数出让CPU,这会导致进程P1持有了锁,但是进程P1又不在运行。
假设我们在一个只有一个CPU核的机器上,进程P1调用了switch函数将CPU控制转给了调度器线程,调度器线程发现还有一个进程P2的内核线程正在等待被运行,所以调度器线程会切换到运行进程P2。假设P2也想使用磁盘,UART或者console,它会对P1持有的锁调用acquire,这是对于同一个锁的第二个acquire调用。当然这个锁现在已经被P1持有了,所以这里的acquire并不能获取锁。假设这里是spinlock,那么进程P2会在一个循环里不停的“旋转”并等待锁被释放。但是很明显进程P2的acquire不会返回,所以即使进程P2稍后愿意出让CPU,P2也没机会这么做。之所以没机会是因为P2对于锁的acquire调用在直到锁释放之前都不会返回,而唯一锁能被释放的方式就是进程P1恢复执行并在稍后release锁,但是这一步又还没有发生,因为进程P1通过调用switch函数切换到了P2,而P2又在不停的“旋转”并等待锁被释放。这是一种死锁,它会导致系统停止运行。
——注意,acquire,swith,release等都是发生在内核中,并且acquire等待锁的过程中,关闭了中断(acquire函数中第一件事情就是关闭中断,之后再“自旋”等待锁释放。你或许会想,为什么不能先“自旋”等待锁释放,再关闭中断?因为这样会有一个短暂的时间段锁被持有了但是中断没有关闭,在这个时间段内的设备的中断处理程序可能会引起死锁。),因此无法通过计时器中断来让线程出让CPU了。
当你在写一个线程的代码时,有些场景需要等待一些特定的事件,或者不同的线程之间需要交互。
假设我们有一个Pipe,并且我正在从Pipe中读数据。但是Pipe当前又没有数据,所以我需要等待一个Pipe非空的事件。
假设我在读取磁盘,我会告诉磁盘控制器请读取磁盘上的特定块。这或许要花费较长的时间,尤其当磁碟需要旋转时
(通常是毫秒级别),磁盘才能完成读取。而执行读磁盘的进程需要等待读磁盘结束的事件。类似的,一个Unix进程可以调用wait函数。这个会使得调用进程等待任何一个子进程退出。所以这里父进程有意的在等待另一个进程产生的事件。
while (pipe_buf_is_empty);
这种写法是有它的可取之处的——当我们要等待的事件极有可能在0.1微秒之内发生,这样的循环等待或许是最好的实现方式。
但是当运行时间是一个很长的时间(比如几毫秒,甚至几分钟时间),那我们不想一直循环浪费被来可以用来完成其他任务的CPU时间。——这个时候通过类似于swtch函数的机制,出让CPU,直到等待的事件发生。
这个在4. 如何避免lost wakeup中细讲。
以下代码是本节课专用,xv6系统中的代码与之有所不同。
// transmit buf[].
void
uartwrite(char buf[], int n)
{
acquire(&uart_tx_lock);
int i = 0;
while (i < n) {
while (tx_done == 0) {
// UART is busy sending a character.
// wait for it to interrupt.
sleep(&tx_chan, &uart_tx_lock);
}
WriteReg(THR, buf[i]);
i += 1;
tx_done = 0;
}
release(&uart_tx_lock);
}
当shell需要输出时会调用write系统调用最终走到uartwrite函数中,这个函数会在循环中将buf中的字符一个一个的向UART硬件写入。这是一种经典的设备驱动实现风格。UART硬件一次只能接受一个字符的传输,而通常来说会有很多字符需要写到UART硬件。你可以向UART硬件写入一个字符,并等待UART硬件说:好的我完成了传输上一个字符并且准备好了传输下一个字符,之后驱动程序才可以写入下一个字符。因为这里的硬件可能会非常慢,或许每秒只能传输1000个字符,所以我们在两个字符之间的等待时间可能会很长。而1毫秒在现在计算机上是一个非常非常长的时间,它可能包含了数百万条指令时间,所以我们不想通过循环来等待UART完成字符传输,我们想通过一个更好的方式来等待。如大多数操作系统一样,XV6也的确存在更好的等待方式。
UART硬件会在完成传输一个字符后,触发一个中断。所以UART驱动中除了uartwrite函数外,还有名为uartintr的中断处理程序。这个中断处理程序会在UART硬件触发中断时由trap.c代码调用。
// handle a uart interrupt, raised because input has
// arrived, or the uart is ready for more output, or
// both. called from devintr().
void
uartintr(void)
{
acquire(&uart_tx_lock);
if((ReadReg(LSR) & LSR_TX_IDLE) == 0){
// UART finished transmitting; wakeup any sending thread.
tx_done = 1;
wakeup(&tx_chan);
}
release(&uart_tx_lock);
// read and process incoming characters.
while(1){
int c = uartgetc();
if(c == -1)
break;
consoleintr(c);
}
}
中断处理程序会在最开始读取UART对应的memory mapped register,并检查其中表明传输完成的相应的标志位,也就是LSR_TX_IDLE标志位。如果这个标志位为1,代码会将tx_done设置为1,并调用wakeup函数。这个函数会使得uartwrite中的sleep函数恢复执行,并尝试发送一个新的字符。所以这里的机制是,如果一个线程需要等待某些事件,比如说等待UART硬件愿意接收一个新的字符,线程调用sleep函数并等待一个特定的条件。当特定的条件满足时,代码会调用wakeup函数。这里的sleep函数和wakeup函数是成对出现的。我们之后会看sleep函数的具体实现,它会做很多事情最后再调用switch函数来出让CPU。
这里有件事情需要注意,sleep和wakeup函数需要通过某种方式链接到一起。也就是说,如果我们调用wakeup函数,我们只想唤醒正在等待刚刚发生的特定事件的线程。所以,sleep函数和wakeup函数都带有一个叫做sleep channel的参数。我们在调用wakeup的时候,需要传入与调用sleep函数相同的sleep channel。不过sleep和wakeup函数只是接收表示了sleep channel的64bit数值,它们并不关心这个数值代表什么。当我们调用sleep函数时,我们通过一个sleep channel表明我们等待的特定事件,当调用wakeup时我们希望能传入相同的数值来表明想唤醒哪个线程。
学生提问:进程会在写入每个字符时候都被唤醒一次吗?
Robert教授:在这个我出于演示目的而特别改过的UART驱动中,传输每个字符都会有一个中断,所以你是对的,对于buffer中的每个字符,我们都会等待UART可以接收下一个字符,之后写入一个字符,将tx_done设置为0,回到循环的最开始并再次调用sleep函数进行睡眠状态,直到tx_done为1。当UART传输完了这个字符,uartintr函数会将tx_done设置为1,并唤醒uartwrite所在的线程。所以对于每个字符都有调用一次sleep和wakeup,并占用一次循环。
UART实际上支持一次传输4或者16个字符,所以一个更有效的驱动会在每一次循环都传输16个字符给UART,并且中断也是每16个字符触发一次。更高速的设备,例如以太网卡通常会更多个字节触发一次中断。
对于sleep函数,有一个有趣的参数,我们需要将一个锁作为第二个参数传入,这背后是一个大的故事,我后面会介绍背后的原因。总的来说,不太可能设计一个sleep函数并完全忽略需要等待的事件。所以很难写一个通用的sleep函数,只是睡眠并等待一些特定的事件,并且这也很危险,因为可能会导致lost wakeup,而几乎所有的Coordination机制都需要处理lost wakeup的问题。在sleep接口中,我们需要传入一个锁是一种稍微丑陋的实现,我在稍后会再介绍。
在解释sleep函数为什么需要一个锁使用作为参数传入之前,我们先来看看假设我们有了一个更简单的不带锁作为参数的sleep函数,会有什么样的结果。这里的结果就是lost wakeup。
假设我们设计的sleep函数只有一个chan作为传入参数,没有传入锁的设计,那么它将不能正常工作,我们称之为broken_wakeup。
现在我们可以设计sleep函数如下⬇️(需要修改进程状态,保留chan,并进行线程切换出让CPU)
broken_sleep(uint64_t chan) {
// need acquire(&p->lk) here.
p -> state = SLEEPING;
p -> chan = chan;
swtch();
}
上面的代码有啥问题?——很显然,进程状态修改和swtch并不是一个原子的操作,因此要对p进行加锁操作。(这里在进程切换的博客中已经反复强调了)。
wakeup(uint64_t chan) {
for each p in procs[]
if (p -> state == SLEEP && p ->chan == chan) {
p -> chan == chan
p -> state == RUNNABLE
}
}
下面介绍UART驱动函数如何使用我们刚写出来的sleep和wakeup,并展示问题所在。
注意,我们的uartwrite是演示函数,在这里面,每次只传输一个字符。
int done; // 标志位,标识上一次传输是否完成
void uartwrite(char buf[]) {
for each c in buf:
while (!done):
sleep(&tx_chan);
send(c);
done = 0;
}
⬆️对每个字符,检查是否done,如果没有,那就sleep,如果已经done,就开始传输本字符,并将done置为0.
void uartintr(void) {
done = 1;
wakeup(&tx_chan);
}
⬆️这里就很简单,就是将done置为1,并且wakeup。
uartintr函数很简单,在开始加锁,结尾解锁。
void uartintr(void) {
lock;
done = 1;
wakeup(&tx_chan);
unlock;
}
但是uartwrite?
int done; // 标志位,标识上一次传输是否完成
void uartwrite(char buf[]) {
for each c in buf:
lock;
while (!done):
sleep(&tx_chan);
send(c);
done = 0;
unlock;
}
我们能从while not done的循环退出的唯一可能是中断处理程序将done设置为1。但是如果我们为整个代码段都加锁的话,中断处理程序就不能获取锁了,中断程序会不停“自旋”并等待锁释放。而锁被uartwrite持有,在done设置为1之前不会释放。而done只有在中断处理程序获取锁之后才可能设置为1。所以我们不能在发送每个字符的整个处理流程都加锁。
int done; // 标志位,标识上一次传输是否完成
void uartwrite(char buf[]) {
for each c in buf:
lock;
while (!done):
unlock;
sleep(&tx_chan);
lock;
send(c);
done = 0;
unlock;
}
在传输字符的最开始获取锁,因为我们需要保护共享变量done,但是在调用sleep函数之前释放锁。这样中断处理程序就有可能运行并且设置done标志位为1。之后在sleep函数返回时,再次获取锁。
uartwrite⬇️
// transmit buf[].
void
uartwrite(char buf[], int n)
{
acquire(&uart_tx_lock);
int i = 0;
while (i < n) {
while (tx_done == 0) {
// UART is busy sending a character.
// wait for it to interrupt.
// sleep(&tx_chan, &uart_tx_lock);
release(&uart_tx_lock);
broken_sleep(&tx_chan);
acquire(&uart_tx_lock);
}
WriteReg(THR, buf[i]);
i += 1;
tx_done = 0;
}
release(&uart_tx_lock);
}
uartintr⬇️
// handle a uart interrupt, raised because input has
// arrived, or the uart is ready for more output, or
// both. called from devintr().
void
uartintr(void)
{
acquire(&uart_tx_lock);
if((ReadReg(LSR) & LSR_TX_IDLE) == 0){
// UART finished transmitting; wakeup any sending thread.
tx_done = 1;
wakeup(&tx_chan);
}
release(&uart_tx_lock);
}
结果:启动xv6的过程输出如下⬇️
➜ xv6-riscv git:(riscv) make qemu
qemu-system-riscv64 -machine virt -bios none -kernel kernel/kernel -m 128M -smp 3 -nographic -global virtio-mmio.force-legacy=false -drive file=fs.img,if=none,format=raw,id=x0 -device virtio-blk-device,drive=x0,bus=virtio-mmio-bus.0
xv6 kernel is booting
hart 1 starting
hart 2 starting
init: sta
原本应该输出⬇️
➜ xv6-riscv git:(riscv) make qemu
qemu-system-riscv64 -machine virt -bios none -kernel kernel/kernel -m 128M -smp 3 -nographic -global virtio-mmio.force-legacy=false -drive file=fs.img,if=none,format=raw,id=x0 -device virtio-blk-device,drive=x0,bus=virtio-mmio-bus.0
xv6 kernel is booting
hart 1 starting
hart 2 starting
init: starting sh
$
如果我输入任意字符,剩下的字符就能输出。这里发生了什么?
这里的问题必然与之前修改的代码相关。在前面的代码中,sleep之前释放了锁,但是在释放锁和broken_sleep之间可能会发生中断。
release(&uart_tx_lock);
// INTERRUPT
broken_sleep(&tx_chan);
acquire(&uart_tx_lock);
一旦释放了锁,当前CPU的中断会被重新打开。因为这是一个多核机器,所以中断可能发生在任意一个CPU核。在上面代码标记的位置,其他CPU核上正在执行UART的中断处理程序,并且正在acquire函数中等待当前锁释放。所以一旦锁被释放了,另一个CPU核就会获取锁,并发现UART硬件完成了发送上一个字符,之后会设置tx_done为1,最后再调用wakeup函数,并传入tx_chan。目前为止一切都还好,除了一点:现在写线程还在执行并位于release和broken_sleep之间,也就是写线程还没有进入SLEEPING状态,所以中断处理程序中的wakeup并没有唤醒任何进程,因为还没有任何进程在tx_chan上睡眠。之后写线程会继续运行,调用broken_sleep,将进程状态设置为SLEEPING,保存sleep channel。但是中断已经发生了,wakeup也已经被调用了。所以这次的broken_sleep,没有人会唤醒它,因为wakeup已经发生过了。这就是lost wakeup问题。
继续解释为什么输入一个字符又可以输出了?——因为输入的时候触发了uart的输入中断,恰好可以发送wakeup。
首先总结一下需求:
release(&uart_tx_lock);
// INTERRUPT
broken_sleep(&tx_chan);
acquire(&uart_tx_lock);
因此,我们的sleep可以设计如下⬇️
即使sleep不知道你在等待什么事件,但是他还是需要知道你在等待什么数据,并且传图一个用来保护你正在等待数据的锁。sleep函数需要特定的条件才能执行,而sleep自己又不需要知道这个条件是什么。
// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
acquire(&p->lock); //DOC: sleeplock1
release(lk);
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
release(&p->lock);
acquire(lk);
}
和我们的简单wakeup没有什么不同,其实就是先lock,然后如果匹配了channel并且是SLEEPING就设置成RUNNABLE,最后unlock
// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {
if(p != myproc()){
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
}
// transmit buf[].
void
uartwrite(char buf[], int n)
{
acquire(&uart_tx_lock);
int i = 0;
while (i < n) {
while (tx_done == 0) {
// UART is busy sending a character.
// wait for it to interrupt.
sleep(&tx_chan, &uart_tx_lock);
}
WriteReg(THR, buf[i]);
i += 1;
tx_done = 0;
}
release(&uart_tx_lock);
}
uartwrite在最开始获取了sleep的condition lock,并且一直持有condition lock直到调用sleep函数。所以它首先获取了condition lock,之后检查condition(注,也就是tx_done等于0),之后在持有condition lock的前提下调用了sleep函数。此时wakeup不能做任何事情,wakeup现在甚至都不能被调用直到调用者能持有condition lock。所以现在wakeup必然还没有执行。
sleep函数在释放condition lock之前,先获取了进程的锁。在释放了condition lock之后,wakeup就可以被调用了,但是除非wakeup获取了进程的锁,否则wakeup不能查看进程的状态。所以,在sleep函数中释放了condition lock之后,wakeup也还没有执行。
在持有进程锁的时候,将进程的状态设置为SLEEPING并记录sleep channel,之后再调用sched函数,这个函数中会再调用switch函数,此时sleep函数中仍然持有了进程的锁,wakeup仍然不能做任何事情。
如果你还记得的话,当我们从当前线程切换走时,调度器线程中会释放前一个进程的锁。所以在调度器线程释放进程锁之后,wakeup才能终于获取进程的锁,发现它正在SLEEPING状态,并唤醒它。
这里的效果是由之前定义的一些规则确保的,这些规则包括了:
这样的话,我们就不会再丢失任何一个wakeup,也就是说我们修复了lost wakeup的问题。
前面主要是uart的传输字符的场景,接下来看一下pipe场景下的sleep & wakeup实现。
int
piperead(struct pipe *pi, uint64 addr, int n)
{
int i;
struct proc *pr = myproc();
char ch;
acquire(&pi->lock);
while(pi->nread == pi->nwrite && pi->writeopen){ //DOC: pipe-empty
if(killed(pr)){
release(&pi->lock);
return -1;
}
sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i++){ //DOC: piperead-copy
if(pi->nread == pi->nwrite)
break;
ch = pi->data[pi->nread++ % PIPESIZE];
if(copyout(pr->pagetable, addr + i, &ch, 1) == -1)
break;
}
wakeup(&pi->nwrite); //DOC: piperead-wakeup
release(&pi->lock);
return i;
}
简单来说,piperead就是
int
pipewrite(struct pipe *pi, uint64 addr, int n)
{
int i = 0;
struct proc *pr = myproc();
acquire(&pi->lock);
while(i < n){
if(pi->readopen == 0 || killed(pr)){
release(&pi->lock);
return -1;
}
if(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full
wakeup(&pi->nread);
sleep(&pi->nwrite, &pi->lock);
} else {
char ch;
if(copyin(pr->pagetable, &ch, addr + i, 1) == -1)
break;
pi->data[pi->nwrite++ % PIPESIZE] = ch;
i++;
}
}
wakeup(&pi->nread);
release(&pi->lock);
return i;
}
pipewrite会向pipe的缓存写数据,并最后在piperead所等待的sleep channel上调用wakeup。而我们想要避免这样的风险:在piperead函数检查发现没有字节可以读取,到piperead函数调用sleep函数之间,另一个CPU调用了pipewrite函数。因为这样的话,另一个CPU会向pipe写入数据并在piperead进程进入SLEEPING之前调用wakeup,进而产生一次lost wakeup。
在pipe的代码中,pipewrite和piperead都将sleep包装在一个while循环中。piperead中的循环等待pipe的缓存为非空(pipewrite中的循环等待的是pipe的缓存不为full)。之所以要将sleep包装在一个循环中,是因为可能有多个进程在读取同一个pipe。如果一个进程向pipe中写入了一个字节,这个进程会调用wakeup进而同时唤醒所有在读取同一个pipe的进程。但是因为pipe中只有一个字节并且总是有一个进程能够先被唤醒,哦,这正好提醒了我有关sleep我忘记了一些非常关键的事情。sleep函数中最后一件事情就是重新获取condition lock。所以调用sleep函数的时候,需要对condition lock上锁(注,在sleep函数内部会对condition lock解锁),在sleep函数返回时会重新对condition lock上锁。这样第一个被唤醒的线程会持有condition lock,而其他的线程在重新对condition lock上锁的时候会在锁的acquire函数中等待。
那个幸运的进程(注,这里线程和进程描述的有些乱,但是基本意思是一样的,当说到线程时是指进程唯一的内核线程)会从sleep函数中返回,之后通过检查可以发现pi->nwrite比pi->nread大1,所以进程可以从piperead的循环中退出,并读取一个字节,之后pipe缓存中就没有数据了。之后piperead函数释放锁并返回。接下来,第二个被唤醒的线程,它的sleep函数可以获取condition lock并返回,但是通过检查发现pi->nwrite等于pi->nread(注,因为唯一的字节已经被前一个进程读走了),所以这个线程以及其他所有的等待线程都会重新进入sleep函数。所以这里也可以看出,几乎所有对于sleep的调用都需要包装在一个循环中,这样从sleep中返回的时候才能够重新检查condition是否还符合。
sleep和wakeup的规则稍微有点复杂。因为你需要向sleep展示你正在等待什么数据,你需要传入锁并遵循一些规则,某些时候这些规则还挺烦人的。另一方面sleep和wakeup又足够灵活,因为它们并不需要理解对应的condition,只是需要有个condition和保护这个condition的锁。
除了sleep&wakeup之外,还有一些其他的更高级的Coordination实现方式。例如今天课程的阅读材料中的semaphore,它的接口就没有那么复杂,你不用告诉semaphore有关锁的信息。而semaphore的调用者也不需要担心lost wakeup的问题,在semaphore的内部实现中考虑了lost wakeup问题。因为定制了up-down计数器,所以semaphore可以在不向接口泄露数据的同时(注,也就是不需要向接口传递condition lock),处理lost wakeup问题。semaphore某种程度来说更简单,尽管它也没那么通用,如果你不是在等待一个计数器,semaphore也就没有那么有用了。这也就是为什么我说sleep和wakeup更通用的原因。
exit是关于进程关闭(另一个可以关闭进程的函数是kill),exit代码如下⬇️,它主要做了:
值得注意的一点是,执行exit并没有将自己的进程包含的 内存
,栈
等资源释放,仅仅是释放了文件描述符和文件目录,并且将状态设置为ZOMBIE,并且wakeup了父进程的wait。
// Exit the current process. Does not return.
// An exited process remains in the zombie state
// until its parent calls wait().
void
exit(int status)
{
struct proc *p = myproc();
if(p == initproc)
panic("init exiting");
// Close all open files.
for(int fd = 0; fd < NOFILE; fd++){
if(p->ofile[fd]){
struct file *f = p->ofile[fd];
fileclose(f);
p->ofile[fd] = 0;
}
}
begin_op();
iput(p->cwd);
end_op();
p->cwd = 0;
acquire(&wait_lock);
// Give any children to init.
reparent(p);
// Parent might be sleeping in wait().
wakeup(p->parent);
acquire(&p->lock);
p->xstate = status;
p->state = ZOMBIE;
release(&wait_lock);
// Jump into the scheduler, never to return.
sched();
panic("zombie exit");
}
通过Unix的exit和wait系统调用的说明,我们可以知道如果一个进程exit了,并且它的父进程调用了wait系统调用,父进程的wait会返回。wait函数的返回表明当前进程的一个子进程退出了。所以接下来我们看一下wait系统调用的实现。
整个wait几乎都是建立在一个for循环上,主要操作是,找到一个进程的父进程是自己并且状态是ZOMBIE的进程,调用freeproc函数。
// Wait for a child process to exit and return its pid.
// Return -1 if this process has no children.
int
wait(uint64 addr)
{
struct proc *pp;
int havekids, pid;
struct proc *p = myproc();
acquire(&wait_lock);
for(;;){
// Scan through table looking for exited children.
havekids = 0;
for(pp = proc; pp < &proc[NPROC]; pp++){
if(pp->parent == p){
// make sure the child isn't still in exit() or swtch().
acquire(&pp->lock);
havekids = 1;
if(pp->state == ZOMBIE){
// Found one.
pid = pp->pid;
if(addr != 0 && copyout(p->pagetable, addr, (char *)&pp->xstate,
sizeof(pp->xstate)) < 0) {
release(&pp->lock);
release(&wait_lock);
return -1;
}
freeproc(pp);
release(&pp->lock);
release(&wait_lock);
return pid;
}
release(&pp->lock);
}
}
// No point waiting if we don't have any children.
if(!havekids || killed(p)){
release(&wait_lock);
return -1;
}
// Wait for a child to exit.
sleep(p, &wait_lock); //DOC: wait-sleep
}
}
再来看看freeproc函数⬇️
释放trapframe;
释放pagetable;
清零state,chan,killed,pid,sz,parent,name等;
如果我们需要释放进程内核栈,那么也应该在这里释放。但是因为内核栈的guard page,我们没有必要再释放一次内核栈。不管怎样,当进程还在exit函数中运行时,任何这些资源在exit函数中释放都会很难受,所以这些资源都是由父进程释放的。
// free a proc structure and the data hanging from it,
// including user pages.
// p->lock must be held.
static void
freeproc(struct proc *p)
{
if(p->trapframe)
kfree((void*)p->trapframe);
p->trapframe = 0;
if(p->pagetable)
proc_freepagetable(p->pagetable, p->sz);
p->pagetable = 0;
p->sz = 0;
p->pid = 0;
p->parent = 0;
p->name[0] = 0;
p->chan = 0;
p->killed = 0;
p->xstate = 0;
p->state = UNUSED;
}
wait不仅是为了父进程方便的知道子进程退出,wait实际上也是进程退出的一个重要组成部分。在Unix中,对于每一个退出的进程,都需要有一个对应的wait系统调用,这就是为什么当一个进程退出时,它的子进程需要变成init进程的子进程。init进程的工作就是在一个循环中不停调用wait,因为每个进程都需要对应一个wait,这样它的父进程才能调用freeproc函数,并清理进程的资源。
除了exit以外,kill也可以让一个进程退出。实际上kill基本上不做任何事情。
// Kill the process with the given pid.
// The victim won't exit until it tries to return
// to user space (see usertrap() in trap.c).
int
kill(int pid)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++){
acquire(&p->lock);
if(p->pid == pid){
p->killed = 1;
if(p->state == SLEEPING){
// Wake process from sleep().
p->state = RUNNABLE;
}
release(&p->lock);
return 0;
}
release(&p->lock);
}
return -1;
}
p->killed == 1自动exit的时机有哪些(什么时候会检查p->killed == 1)?
kill系统调用并不是真正的立即停止进程的运行,它更像是这样:如果进程在用户空间,那么下一次它执行系统调用它就会退出,又或者目标进程正在执行用户代码,当时下一次定时器中断或者其他中断触发了,进程才会退出。所以从一个进程调用kill,到另一个进程真正退出,中间可能有很明显的延时。
但是问题来了,如果一个进程不在运行怎们办?——比如一个进程SLEEPING状态,但是可能几天后才会被唤醒咋整?
——其实从代码中就可以看到,其实是,如果进程状态为SLEEPING,killed将会把它的状态设置为RUNNABLE,并且让进程从sleep中返回。
所以对于SLEEPING状态的进程,如果它被kill了,它会被直接唤醒,包装了sleep的循环会检查进程的killed标志位,最后再调用exit。
值得注意的是,还有一些操作,逻辑上可能是原子的,这些操作需要做的就是,在操作的过程中,不去检查p->killed == 1,因此这些过程中进程不会退出。比如一些文件操作virtio_disk.c
:磁盘驱动中的sleep循环,这个循环中就没有检查进程的killed标志位。
// Wait for virtio_disk_intr() to say request has finished.
while(b->disk == 1) {
sleep(b, &disk.vdisk_lock);
}