• 【golang】调度系列之sysmon


    调度系列
    调度系列之goroutine
    调度系列之m
    调度系列之p
    掉地系列之整体介绍

    在golang的调度体系中,除了GMP本身,还有另外一个比较重要的角色sysmon。实际上,除了GMP和sysmon,runtime中还有一个全局的调度器对象。但该对象只是维护一些全局的数据,而不承担实际的调度职责,并不值得单独介绍,感兴趣的同学可以自己了解一下。

    回到sysmon,sysmon是一个管理线程或者说守护线程,其是对GMP调度架构的补充和兜底。通过前面的几篇介绍,可以知道GMP的调度完全是主动协作式的调度。主动协作式的调度性能很高,但是在某些情况下会出现单个goroutine长期占据时间片甚至一直占据时间片的情况。
    比如:

    • 某个goroutine不执行主动调度、不调用系统调用、不做函数调用,就会一直运行直到goroutine退出;
    • 某个goroutine处于syscall状态时也无法触发主动调度,可能会造成该goroutine长时间占据时间片;

    sysmon的作用就是处理类似上面情况,其主要的工作内容有:

    • 定期查看netpoll有无就绪的任务,防止netpoll阻塞队列中的goroutine饥饿;
    • 定期查看是否有p长时间(10ms)处于syscall状态,如有则将p的持有权释放以执行其他g;
    • 定期查看是否有p长时间(10ms)没有调度,如有则对当前m发送信号,触发基于信号的异步抢占调度;

    在main函数启动时,会调用newm函数创建sysmon线程,sysmon作为mstartfn传入。

    // src/runtime/proc.go 145
    // The main goroutine.
    func main() {
        ...
        if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
           systemstack(func() {
              newm(sysmon, nil, -1)
           })
        }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在介绍m的时候,我们提到过,mstart中会先调用mstartfn,然后再获取p并调用schedule函数。由于sysmon函数是循环不返回的,所以对应的m(也就是线程)永远运行sysmon,并且不需要获取p。所以并不是所有的m都需要p才可以运行的。
    接下来,我们看下sysmon的里面具体做了些什么。

    • 进入sysmon可以看到里面是一个死循环,这和我们上面提到的一样。该循环并非一直忙等,而是会根据系统的情况进行延时睡眠,初始的interval是20us,最大的interval是10ms。
    • 在某些特殊的情况,sysmon可以进入更长时间(超过10ms)的睡眠,条件包括:
      • 系统不需要schedtrace。看起来是和调度相关观测的内容,如果需要schedtrace,则sysmon需要及时输出相关数据;
      • 系统处于停滞状态。这个停滞是我自己描述的,不一定准确,包括两种情况:1. 所有的p都是空闲的,此时系统中没有任务执行;2. 系统在等待进入gc状态,马上要stop the world;
        满足上面两个条件,则可最大进行1min的睡眠。1min是最大强制gc时间(2min)的一半。
    • sysmon的活跃状态,首先会坚持netpoll是否超过10ms没有被检查过,这是为了防止netpoll挂载goroutine的饥饿;
    • 然后会进行retake操作,retake的内容就是对所有p进行检查,查看p是否处于syscall或者被一个goroutine占据时间过长(超过10ms),如果有则进行相应的处理;
    • 最后还会进行gc和schedtrace相关的操作;
    // src/runtime.go 5134
    func sysmon() {
        lock(&sched.lock)
        sched.nmsys++
        checkdead()
        unlock(&sched.lock)
    
        lasttrace := int64(0)
        idle := 0 // how many cycles in succession we had not wokeup somebody
        delay := uint32(0)
    
        for {
           if idle == 0 { // start with 20us sleep...
              delay = 20
           } else if idle > 50 { // start doubling the sleep after 1ms...
              delay *= 2
           }
           if delay > 10*1000 { // up to 10ms
              delay = 10 * 1000
           }
           usleep(delay)
    
           now := nanotime()
           if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
              lock(&sched.lock)
              if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
                 syscallWake := false
                 next := timeSleepUntil()
                 if next > now {
                    atomic.Store(&sched.sysmonwait, 1)
                    unlock(&sched.lock)
                    // Make wake-up period small enough
                    // for the sampling to be correct.
                    sleep := forcegcperiod / 2
                    if next-now < sleep {
                       sleep = next - now
                    }
                    shouldRelax := sleep >= osRelaxMinNS
                    if shouldRelax {
                       osRelax(true)
                    }
                    syscallWake = notetsleep(&sched.sysmonnote, sleep)
                    if shouldRelax {
                       osRelax(false)
                    }
                    lock(&sched.lock)
                    atomic.Store(&sched.sysmonwait, 0)
                    noteclear(&sched.sysmonnote)
                 }
                 if syscallWake {
                    idle = 0
                    delay = 20
                 }
              }
              unlock(&sched.lock)
           }
    
           lock(&sched.sysmonlock)
           // Update now in case we blocked on sysmonnote or spent a long time
           // blocked on schedlock or sysmonlock above.
           now = nanotime()
    
           // trigger libc interceptors if needed
           if *cgo_yield != nil {...}
           // poll network if not polled for more than 10ms
           lastpoll := int64(atomic.Load64(&sched.lastpoll))
           if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
              atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
              list := netpoll(0) // non-blocking - returns list of goroutines
              if !list.empty() {
                 incidlelocked(-1)
                 injectglist(&list)
                 incidlelocked(1)
              }
           }
           if GOOS == "netbsd" && needSysmonWorkaround {...}
           if scavenger.sysmonWake.Load() != 0 {
              // Kick the scavenger awake if someone requested it.
              scavenger.wake()
           }
           // retake P's blocked in syscalls
           // and preempt long running G's
           if retake(now) != 0 {
              idle = 0
           } else {
              idle++
           }
           // check if we need to force a GC
           if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
              lock(&forcegc.lock)
              forcegc.idle = 0
              var list gList
              list.push(forcegc.g)
              injectglist(&list)
              unlock(&forcegc.lock)
           }
           if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
              lasttrace = now
              schedtrace(debug.scheddetail > 0)
           }
           unlock(&sched.sysmonlock)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    retake的操作也相对比较好理解。在p的介绍中我们提到过schedtick、syscalltick、sysmontick三个字段,其作用正是为了sysmon的检查。

    sysmontick表示sysmon观测到的调度和系统调用情况,schedtick、syscalltick为实际的调度和系统调用情况。因为sysmon会经常睡眠,所以两者之间会有差异。

    • sysmon在检查所有p的过程中,如果发现sysmontick落后于实际情况,就会以实际情况为准更新sysmontick,同时也不会再做校验。因为sysmon睡眠最大时间为10ms,说明对应的p在10ms内做了调度。
    • 如果sysmontick和实际情况一只,则要看p是否运行一个goroutine超过10ms,如果是,则对m发送信号,触发异步抢占调度;如果p处于syscall状态超过10ms,则将p的持有权释放执行其他g。
    func retake(now int64) uint32 {
        n := 0
        // Prevent allp slice changes. This lock will be completely
        // uncontended unless we're already stopping the world.
        lock(&allpLock)
        // We can't use a range loop over allp because we may
        // temporarily drop the allpLock. Hence, we need to re-fetch
        // allp each time around the loop.
        for i := 0; i < len(allp); i++ {
           _p_ := allp[i]
           if _p_ == nil {
              // This can happen if procresize has grown
              // allp but not yet created new Ps.
              continue
           }
           pd := &_p_.sysmontick
           s := _p_.status
           sysretake := false
           if s == _Prunning || s == _Psyscall {
              // Preempt G if it's running for too long.
              t := int64(_p_.schedtick)
              if int64(pd.schedtick) != t {
                 pd.schedtick = uint32(t)
                 pd.schedwhen = now
              } else if pd.schedwhen+forcePreemptNS <= now {
                 preemptone(_p_)
                 // In case of syscall, preemptone() doesn't
                 // work, because there is no M wired to P.
                 sysretake = true
              }
           }
           if s == _Psyscall {
              // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
              t := int64(_p_.syscalltick)
              if !sysretake && int64(pd.syscalltick) != t {
                 pd.syscalltick = uint32(t)
                 pd.syscallwhen = now
                 continue
              }
              // On the one hand we don't want to retake Ps if there is no other work to do,
              // but on the other hand we want to retake them eventually
              // because they can prevent the sysmon thread from deep sleep.
              if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                 continue
              }
              // Drop allpLock so we can take sched.lock.
              unlock(&allpLock)
              // Need to decrement number of idle locked M's
              // (pretending that one more is running) before the CAS.
              // Otherwise the M from which we retake can exit the syscall,
              // increment nmidle and report deadlock.
              incidlelocked(-1)
              if atomic.Cas(&_p_.status, s, _Pidle) {
                 if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                 }
                 n++
                 _p_.syscalltick++
                 handoffp(_p_)
              }
              incidlelocked(1)
              lock(&allpLock)
           }
        }
        unlock(&allpLock)
        return uint32(n)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    如果觉得本文对您有帮助,可以请博主喝杯咖啡~

  • 相关阅读:
    踩坑记:JSON.parse和JSON.stringify
    力扣:152. 乘积最大子数组(Python3)
    Nacos注册中心
    购买密封测试接头时双方应如何有效的快速沟通
    Eureka的使用
    593. 有效的正方形 : 简单几何运用题
    自动化测试08
    Linux安装zlib、libpng、freetype给交叉编译工具链使用
    Qt通过QMetaObject创建实例
    Nature Microbiology|溃疡性结肠炎的罪魁祸首:普通拟杆菌的蛋白酶
  • 原文地址:https://blog.csdn.net/shanxiaoshuai/article/details/133248396