• Golang GMP解读


    概念梳理

    1. 1 线程

    通常语义中的线程,指的是内核级线程,核心点如下:

    1. 是操作系统最小调度单元;
    2. 创建、销毁、调度交由内核完成,cpu 需完成用户态与内核态间的切换;
    3. 可充分利用多核,实现并行.

    1.2 协程

    协程又称为用户级线程核心点如下:

    1. 与线程存在映射关系,为 M:1,即多个协程对应一个线程
    2. 创建、销毁、调度在用户态完成,对内核透明,所以更轻;
    3. 从属同一个内核级线程,无法并行;一个协程阻塞会导致从属同一线程的所有协程无法执行.

    1.3 Goroutine

    Goroutine,经 Golang 优化后的特殊“协程”,核心点如下:

    1. 与线程存在映射关系,为 M:N,即 goroutine 既有协程M对1的特性,也存在1对1的可能,甚至1对N
    2. 创建、销毁、调度在用户态完成,对内核透明,足够轻便;
    3. 可利用多个线程,实现并行;
    4. 通过调度器的斡旋,实现和线程间的动态绑定和灵活调度;
    5. 栈空间大小可动态扩缩,因地制宜.

    1.4 三种模型的能力对比

    模型依赖内核可并行可应对阻塞栈可动态扩缩
    线程X
    协程XXXX
    goroutineX

    goroutine更像是一个博采众长的存在。实际上,“灵活调度” 一词概括得实在过于简要,Golang 在调度 goroutine 时,针对“如何减少加锁行为”,“如何避免资源不均”等问题都给出了精彩的解决方案,这一切都得益于经典的 “gmp” 模型

    GMP模型

    gmp = goroutine + machine + processor (+ 一套有机组合的机制),下面先单独拆出每个组件进行介绍,最后再总览全局,对 gmp 进行总述

    2.1 g(goroutine)

    1. g 即goroutine,是 golang 中对协程的抽象;
    2. g 有自己的运行栈、状态、以及执行的任务函数(用户通过 go func 指定);
    3. g 需要绑定到 p 才能执行,在 g 的视角中,p 就是它的 cpu.

    2.2 p(processor)

    1. p 即 processor ,是golang中的调度器
    2. p 是 gmp 的中枢,借由 p 承上启下,实现 g 和 m 之间的动态有机结合
    3. 对于 g 而言,p 是其cpu,g 只有被 p 调度才得以执行
    4. 对于 m 而言,p 是其执行代理,为其提供必要信息的同时(可执行的 g,内存分配情况等),并隐藏了复杂的调度细节
    5. p 的数量决定了 g 最大的并行数量。可以由用户通过 GoMaxProcs 设置(但是超过了CPU的核心数则无意义了)

    2.3 m(machine)

    1. m 即 machine ,是golang中线程的抽象
    2. m 不直接执行 g,而是先和 p 绑定,由其代理实现
    3. 借由 p 的存在,m 无需和 g 绑死,也无需记录 g 的状态信息,因此 g 在全生命周期可以实现跨 m 执行

    2.4 GMP(线程-- 使用调度器 --> 使用协程 goroutine)

    GMP宏观模型

    1. M 是线程的抽象;G 是 goroutine;P 是承上启下的调度器;
    2. M调度G前,需要和P绑定;
    3. 全局有多个M和多个P,但同时并行的G的最大数量等于P的数量;
    4. G的存放队列有三类:P的本地队列;全局队列;和wait队列(图中未展示,为io阻塞就绪态goroutine队列);
    5. M调度G时,优先取P本地队列,其次取全局队列,最后取wait队列;这样的好处是,取本地队列时,可以接近于无锁化,减少全局锁竞争;
    6. 为防止不同P的闲忙差异过大,设立work-stealing机制,本地队列为空的P可以尝试从其他P本地队列偷取一半的G补充到自身队列.

    核心数据结构

    gmp 数据结构定义为 runtime/runtime2.go 文件中

    3.1 g

    type g struct {
        // ...
        // m:在 p 的代理,负责执行当前 g 的 m;
        m         *m      
        // ...
        sched     gobuf
        // ...
    }
    type gobuf struct {
        sp   uintptr
        pc   uintptr
        ret  uintptr
        bp   uintptr // for framepointer-enabled architectures
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. m:在 p 的代理,负责执行当前 g 的 m;
    2. sched.sp:保存 CPU 的 rsp 寄存器的值,指向函数调用栈栈顶;
    3. sched.pc:保存 CPU 的 rip 寄存器的值,指向程序下一条执行指令的地址;
    4. sched.ret:保存系统调用的返回值;
    5. sched.bp:保存 CPU 的 rbp 寄存器的值,存储函数栈帧的起始位置.
    g 的生命周期

    生命周期

    const(
      _Gidle = itoa // 0
      _Grunnable // 1
      _Grunning // 2
      _Gsyscall // 3
      _Gwaiting // 4
      _Gdead // 6
      _Gcopystack // 8
      _Gpreempted // 9
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. _Gidle 值为 0,为协程开始创建时的状态,此时尚未初始化完成;
    2. _Grunnable 值 为 1,协程在待执行队列中,等待被执行;
    3. _Grunning 值为 2,协程正在执行,同一时刻一个 p 中只有一个 g 处于此状态;
    4. _Gsyscall 值为 3,协程正在执行系统调用;
    5. _Gwaiting 值为 4,协程处于挂起态,需要等待被唤醒. gc、channel 通信或者锁操作时经常会进入这种状态;
    6. _Gdead 值为 6,协程刚初始化完成或者已经被销毁,会处于此状态;
    7. _Gcopystack 值为 8,协程正在栈扩容流程中;
    8. _Greempted 值为 9,协程被抢占后的状态.

    3.2 m

    type m struct {
        g0      *g     // goroutine with scheduling stack
        // ...
        tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. g0:一类特殊的调度协程,不用于执行用户函数,负责执行 g 之间的切换调度. 与 m 的关系为 1:1;
    2. tls:thread-local storage,线程本地存储,存储内容只对当前线程可见. 线程本地存储的是 m.tls 的地址,m.tls[0] 存储的是当前运行的 g,因此线程可以通过 g 找到当前的 m、p、g0 等信息.

    3.3 p

    type p struct {
        // ...
        runqhead uint32
        runqtail uint32
        runq     [256]guintptr
        
        runnext guintptr
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. runq:本地 goroutine 队列,最大长度伟大256
    2. runqhead:队列头部
    3. runqtail:队列尾部
    4. runnext:下一个可执行的 goroutine

    3.4 schedt

    sched 是全局队列的封装

    type schedt struct {
        // ...
        lock mutex
        // ...
        runq     gQueue
        runqsize int32
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. lock 操作全局对列的锁
    2. runq 全局 goroutine 队列
    3. runqsize 全局队列的长度

    调度流程解析

    4.1 两种 g 的转换

    即 普通任务 g 和调度查找任务 g0 之间的转换
    goroutine 的类型可以分为两类:

    1. 负责调度普通 g 的 g0,执行固定的调度流程,与 m 的关系为一对一;
    2. 负责执行用户函数的普通 g.
      m 通过 p 调度执行的 goroutine 永远在普通 g 和 g0 之间进行切换,当 g0 找到可执行的 g 时,会调用 gogo 方法,调度 g 执行用户定义的任务;当 g 需要主动让渡或被动调度时,会触发 mcall 方法,将执行权重新交还给 g0.
      gogo 和 mcall 可以理解为对偶关系,其定义位于 runtime/stubs.go 文件中.
    func gogo(buf *gobuf)
    // ...
    func mcall(fn func(*g))
    
    • 1
    • 2
    • 3

    4.2 调度类型

    通常,调度指的是由 g0 按照特定策略找到下一个可执行 g 的过程. 而本小节谈及的调度类型是广义上的“调度”,指的是调度器 p 实现从执行一个 g 切换到另一个 g 的过程.

    这种广义“调度”可分为几种类型:

    1. 主动调度
      一种用户主动执行让渡的方式,主要方式是,用户在执行代码中调用了 runtime.Gosched 方法,此时当前 g 会当让出执行权,主动进行队列等待下次被调度执行.
      代码位于 runtime/proc.go
    func Gosched() {
        checkTimeouts()
        mcall(gosched_m)
    }
    
    • 1
    • 2
    • 3
    • 4
    1. 被动调度
      因当前不满足某种执行条件,g 可能会陷入阻塞态无法被调度,直到关注的条件达成后,g才从阻塞中被唤醒,重新进入可执行队列等待被调度.
      常见的被动调度触发方式为因 channel 操作或互斥锁操作陷入阻塞等操作,底层会走进 gopark 方法(例如http的IO多路复用,epoll方式使用的就是gopark来进行挂起操作)
      代码位于 runtime/proc.go
    func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
        // ...
        mcall(park_m)
    }
    
    • 1
    • 2
    • 3
    • 4

    通常 goready 与 gopark 成对出现,能够将 g 从阻塞状态恢复过来的,重新进入等待执行的状态
    源码位于 runtime/proc.go

    func goready(gp *g, traceskip int) {
        systemstack(func() {
            ready(gp, traceskip, true)
        })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 正常调度
      g 中的任务执行完后,g0 会将当前 g 置于死亡状态,发起新一轮的调度

    2. 抢占调度:
      如果 g 执行系统调度时间过长,超过了指定的市场,且全局的 p 资源比较紧缺,此时将 p 和 g 解绑,抢占出来用于其他 g 调度。等 g 完成系统调用后,会重新进入可执行队列中等待被调度
      但是跟前三种调度方式不同的是,其余三个调度方式都是在 m 下的 g0 完成的,抢占调度则不同
      因为发起系统调度时需要打破用户态的边界进入内核,此时 m 也会因系统调用而陷入僵直,无法主动完成抢占调度的行为
      所以Golang进程会有一个全局监控协程 monitor g 的存在,这个 g 会越过 p 直接跟 m 进行绑定,不断轮询对所有的 p 的执行状况进行监控,倘若发现满足抢占调度的条件,则从第三方角度出手干预。主动发起抢占调度动作

    宏观调度流程串联

    调度流程

    1. 以 g0 -> g -> g0 的一轮循环为例进行串联
    2. g0 执行 schedule() 函数,寻找到用于执行的 g
    3. g0 执行 execute() 方法,更新当前 g、p 的状态信息,并调用 gogo 方法,将执行权交给 g
    4. g 因主动让渡(goshce_m())、被动调度( park_m() )、正常结束( goexit0() )等原因,调用 m_call 函数,执行权重新回到 g0手中
    5. g0 执行 schedule() 函数,开启新一轮的循环

    解析 schedule() 搜索可执行 g 的函数

    调度流程的主干方法是位于 runtime/proc.go 中的 schedule 函数,此时的执行权位于 g0 手中:

    func schedule() {
        // ...
        gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
        // ...
        execute(gp, inheritTime)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    findRunable()

    调度流程中,一个非常核心的步骤,就是为 m 寻找到下一个执行的 g,这部分内容位于 runtime/proc.go 的 findRunnable 方法中:

    func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
        _g_ := getg()
    
    top:
        _p_ := _g_.m.p.ptr()
        // ...
        // 判断执行查找到 61 次没有
        if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
        	// 加锁向全局队列进行查找
            lock(&sched.lock)
            gp = globrunqget(_p_, 1)
            // 释放锁
            unlock(&sched.lock)
            if gp != nil {
                // 返回可执行的 g
                return gp, false, false
            }
        }
        
        // ...
        // 尝试从 p 本地队列中进行查找
        if gp, inheritTime := runqget(_p_); gp != nil {
            return gp, inheritTime, false
        }
        
        // ...
        // 判断全局队列长度,尝试从全局队列中进行查找
        if sched.runqsize != 0 {
            lock(&sched.lock)
            gp := globrunqget(_p_, 0)
            unlock(&sched.lock)
            if gp != nil {
                return gp, false, false
            }
        }
    
    	// 尝试获取就绪的网络协议 --> 向 epoll 就绪队列中进行查找
        if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
            if list := netpoll(0); !list.empty() { // non-blocking
                gp := list.pop()
                injectglist(&list)
                casgstatus(gp, _Gwaiting, _Grunnable)
                return gp, false, false
            }
        }
    
    
        // ...
        // 尝试从其余的 p 中偷取一半的 g
        procs := uint32(gomaxprocs)
        if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
            if !_g_.m.spinning {
                _g_.m.spinning = true
                atomic.Xadd(&sched.nmspinning, 1)
            }
            gp, inheritTime, tnow, w, newWork := stealWork(now)
            now = tnow
            if gp != nil {
                // Successfully stole.
                return gp, inheritTime, false
            }
            if newWork {
                // There may be new timer or GC work; restart to
                // discover.
                goto top
            }
            if w != 0 && (pollUntil == 0 || w < pollUntil) {
                // Earlier timer to wait for.
                pollUntil = w
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    调度流程如图:
    g0执行流程

    1. p 每执行 61 次调度,会从全局队列中获取一个 goroutine 进行执行,并将一个全局队列中的 goroutine 填充到当前 p 的本地队列中.
     if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_p_, 1)
            unlock(&sched.lock)
            if gp != nil {
                return gp, false, false
            }
     }
    // 除了查找流程外还会将全局队列中的 g 转移到本地 p
    func globrunqget(_p_ *p, max int32) *g {
        if sched.runqsize == 0 {
            return nil
        }
        // 判断 全局队列长度/p 的数量 + 1 == 每个p可以分到的g的个数
        n := sched.runqsize/gomaxprocs + 1
        if n > sched.runqsize {
        	// 全局队列只有 1 个,则直接提取一个
            n = sched.runqsize
        }
        // 传参 max 最大获取个数,如果 n > max 则只获取 max 个
        if max > 0 && n > max {
            n = max
        }
        // 如果获取个数超过了本地队列的一半,需要考虑能不能存的下
        if n > int32(len(_p_.runq))/2 {
            n = int32(len(_p_.runq)) / 2
        }
    
    	// 将全局队列的长度减去获取到的 g 个数
        sched.runqsize -= n
    
    	// 全局队列循环弹出 g
        gp := sched.runq.pop()
        n--
        for ; n > 0; n-- {
            gp1 := sched.runq.pop()
            // 并将多余的 g 存储到 p 本地队列中
            runqput(_p_, gp1, false)
        }
        return gp
    
    // 本地队列存储全局队列 g 的方法
    func runqput(_p_ *p, gp *g, next bool) {
        // ...
    retry:
    	// 获取本地队列头节点,同时对本地队列加锁
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
        // 获取尾节点
        t := _p_.runqtail
        // 如果尾节点减去头节点 小于本地队列长度 == 本地队列未满
        if t-h < uint32(len(_p_.runq)) {
        	// 直接将 g 插入 队列中
            _p_.runq[t%uint32(len(_p_.runq))].set(gp)
            // 将尾节点索引 + 1,并释放队列
            atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
            return
        }
        // 本地队列满了
        if runqputslow(_p_, gp, h, t) {
            return
        }
        // the queue is not full, now the put above must succeed
        goto retry
    
    
    // 本地队列满了,将获取本地队列的一半放入到全局队列中,帮助本地队列减少压力
    func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    	// 创建本地队列一半 + 1 的数组
        var batch [len(_p_.runq)/2 + 1]*g
        // First, grab a batch from local queue.
        n := t - h
        // 本地队列现有长度的一半
        n = n / 2
        
        // ...
        // for 循环放置
        for i := uint32(0); i < n; i++ {
            batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
        }
        // 释放 p 的存储
        if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
            return false
        }
        // 将新获取到的gp也存储全局队列中
        batch[n] = gp
    
    
        // Link the goroutines.
        for i := uint32(0); i < n; i++ {
        	// for循环将本地队列提取到的 g 转成链表
            batch[i].schedlink.set(batch[i+1])
        }
        var q gQueue
        // 设置头尾节点
        q.head.set(batch[0])
        q.tail.set(batch[n])
    
    
        // Now put the batch on global queue.
        lock(&sched.lock)
        globrunqputbatch(&q, int32(n+1))
        unlock(&sched.lock)
        return true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    1. 尝试从 p 本地队列中获取一个可执行的 goroutine,核心逻辑位于 runqget 方法中:

    需要注意,虽然本地队列是属于 p 独有的,但是由于 work-stealing 机制的存在,其他 p 可能会前来执行窃取动作,因此操作仍需加锁.
    但是,由于窃取动作发生的频率不会太高,因此当前 p 取得锁的成功率是很高的,因此可以说p 的本地队列是接近于无锁化,但没有达到真正意义的无锁.

     if gp, inheritTime := runqget(_p_); gp != nil {
            return gp, inheritTime, false
        }
    
    func runqget(_p_ *p) (gp *g, inheritTime bool) {
    	// 如果当前 runnext 为非空 则直接返回下一个 runnext 即可
        if next != 0 && _p_.runnext.cas(next, 0) {
            return next.ptr(), true
        }
        
        for {
        	// 加锁并获取头尾节点 ==> 虽然本地队列是 p 独有的,但是存在偷 g 的机制,所以还是需要加锁
            h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
            t := _p_.runqtail
            // 如果头节点等于尾节点,则表示 p 为空
            if t == h {
                return nil, false
            }
            // g 存在则取头节点并返回,将头节点设置为下一个 并释放锁
            gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
            if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
                return gp, false
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    1. 倘若本地队列没有可执行的 g,会从全局队列中获取:
    if sched.runqsize != 0 {
    		// 加锁
            lock(&sched.lock)
            // 获取的首节点,不向 p 中存储节点
            gp := globrunqget(_p_, 0)
            // 释放锁
            unlock(&sched.lock)
            if gp != nil {
                return gp, false, false
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 倘若本地队列和全局队列都没有 g,则会获取准备就绪的网络协程:

    需要注意的是,刚获取网络协程时,g 的状态是处于 waiting 的,因此需要先更新为 runnable 状态.

     if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
            if list := netpoll(0); !list.empty() { // non-blocking
                gp := list.pop()
                injectglist(&list)
                // 状态更新
                casgstatus(gp, _Gwaiting, _Grunnable)
                return gp, false, false
            }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. work-stealing: 从其他 p 中偷取 g.
    func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
        pp := getg().m.p.ptr()
    
        ranTimer := false
    
    	// 偷取操作最多只遍历 4 次 p 队列
        const stealTries = 4
        for i := 0; i < stealTries; i++ {
            stealTimersOrRunNextG := i == stealTries-1
            // 为保证窃取行为的公平性,遍历的起点是随机的
            for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
                // ...
            }
        }
    
        return nil, false, now, pollUntil, ranTime
    
    // 偷取操作
    func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
        for {
        	// 因为存在 p 也获取头节点的可能,需要加锁
            h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
            t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
            // 获取长度的一半
            n := t - h
            n = n - n/2
            // 如果长度为 0 
            if n == 0 {
            	// 是否是最后一次遍历
                if stealRunNextG {
                    // Try to steal from _p_.runnext.
                    // 查看是否有下一个要执行的 g
                    if next := _p_.runnext; next != 0 {
                    	// 查询 p 是否允许偷取
                        if _p_.status == _Prunning {
                            // 等待一段执行时间
                            if GOOS != "windows" && GOOS != "openbsd" && GOOS != "netbsd" {
                                usleep(3)
                            } else {
                                osyield()
                            }
                        }
                        // 等待期间已经完成执行则退出
                        if !_p_.runnext.cas(next, 0) {
                            continue
                        }
                        // 不然就偷取
                        batch[batchHead%uint32(len(batch))] = next
                        return 1
                    }
                }
                return 0
            }
            // 偷取一半长度大于自身的一半,退出
            if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
                continue
            }
            // for循环的获取
            for i := uint32(0); i < n; i++ {
                g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
                batch[(batchHead+i)%uint32(len(batch))] = g
            }
            // 释放锁 并 改变头节点
            if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
                return n
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    解析执行 g 函数execute

    当 g0 为 m 寻找到可执行的 g 之后,接下来就开始执行 g. 这部分内容位于 runtime/proc.go 的 execute 方法中:

    func execute(gp *g, inheritTime bool) {
    	// 获取 g
        _g_ := getg()
    
    	// 建立 g 和 m 之间的绑定关系
        _g_.m.curg = gp
        gp.m = _g_.m
        // 修改状态信息
        casgstatus(gp, _Grunnable, _Grunning)
        gp.waitsince = 0
        gp.preempt = false
        gp.stackguard0 = gp.stack.lo + _StackGuard
        // 更新 p 的调度次数,为后续61次调度做好准备
        if !inheritTime {
            _g_.m.p.ptr().schedtick++
        }
        // gogo 将 g0 切换为 g,执行任务
        gogo(&gp.sched)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    主动让渡方法解析

    g 执行主动让渡时,会调用 mcall 方法将执行权归还给 g0,并由 g0 调用 gosched_m 方法,位于 runtime/proc.go 文件中:
    主动让渡流程

    func Gosched() {
        // ...
        // 执行mcall让渡
        mcall(gosched_m)
    }
    
    // 压栈执行 goschedImpl
    func gosched_m(gp *g) {
        goschedImpl(gp)
    }
    
    // 实际让渡流程
    func goschedImpl(gp *g) {
    	// 
        status := readgstatus(gp)
        if status&^_Gscan != _Grunning {
            dumpgstatus(gp)
            throw("bad g status")
        }
        // 改变状态,从running 更改为 runable
        casgstatus(gp, _Grunning, _Grunnable)
        // 解绑 g 和 m
        dropg()
        // 加锁 --> 添加到全局队列中 --> 释放锁
        lock(&sched.lock)
        globrunqput(gp)
        unlock(&sched.lock)
        
        // 开启新的一轮调度
        schedule()
    
    // 解绑函数
    func dropg() {
    	// 获取 g
        _g_ := getg()
        // 解绑操作,g 和 m 分别置空
        setMNoWB(&_g_.m.curg.m, nil)
        setGNoWB(&_g_.m.curg, nil)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    gopark 和 goready

    g 需要被动调度时,会调用 mcall 方法切换至 g0,并调用 park_m 方法将 g 置为阻塞态,执行流程位于 runtime/proc.go 的 gopark 方法当中:

    func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
        // ...
        mcall(park_m)
    }
    
    func park_m(gp *g) {
        _g_ := getg()
    
    	// 修改状态 running 为 waiting
        casgstatus(gp, _Grunning, _Gwaiting)
        // 解绑
        dropg()
        
        // ...
        // 新的一轮查找
        schedule()
    
    // 当因被动调度陷入阻塞态的 g 需要被唤醒时,会由其他协程执行 goready 方法将 g 重新置为可执行的状态,
    // 方法位于 runtime/proc.go .
    func goready(gp *g, traceskip int) {
        systemstack(func() {
            ready(gp, traceskip, true)
        })
    }
    
    // 被动调度如果需要唤醒,则会其他 g 负责将 g 的状态由 waiting 改为 runnable,
    // 然后会将其添加到唤醒者的 p 的本地队列中:
    func ready(gp *g, traceskip int, next bool) {
        // ...
        _g_ := getg()
        // ...
        // 修改状态
        casgstatus(gp, _Gwaiting, _Grunnable)
        // 重新加入 p 队列中
        // 如果队列满了,会连带 g 一起将一半的元素转移到全局队列
        runqput(_g_.m.p.ptr(), gp, next)
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    goexit0 将 g 置于死亡状态

    当 g 执行完成时,会先执行 mcall 方法切换至 g0,然后调用 goexit0 方法,内容为 runtime/proc.go:

    // Finishes execution of the current goroutine.
    func goexit1() {
        // ...
        mcall(goexit0)
    }
    
    // 实际结束方法
    func goexit0(gp *g) {
    	// 获取 g
        _g_ := getg()
        _p_ := _g_.m.p.ptr()
    
    	// 更改状态为 dead
        casgstatus(gp, _Grunning, _Gdead)
        // ...
        // 解绑
        gp.m = nil
        // ...
        // 解绑
        dropg()
    
        // ...
        // 开启新一轮调度
        schedule()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    抢占调度 retake

    抢占调度的执行者不是 g0,而是一个全局的 monitor g,代码位于 runtime/proc.go 的 retake 方法中:

    func retake(now int64) uint32 {
        n := 0
        // 加锁
        lock(&allpLock)
        // 遍历全局的 p 搜索能抢占的目标
        for i := 0; i < len(allp); i++ {
            _p_ := allp[i]
            // p 还没创建
            if _p_ == nil {
                // This can happen if procresize has grown
                // allp but not yet created new Ps.
                continue
            }
            pd := &_p_.sysmontick
            // ...
            // 执行系统调用超过 10 ms
            // p 本地队列有等待执行的 g
            // 当前没有空闲的 p 和 m.
            if s == _Psyscall {            
                // ...
                if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                    continue
                }
                unlock(&allpLock)
                // 抢占调度的步骤
                // 将当前 p 的状态更新为 idle
                if atomic.Cas(&_p_.status, s, _Pidle) {
                    n++
                    _p_.syscalltick++
                    // 然后步入 handoffp 方法中,判断是否需要为 p 寻找接管的 m(因为其原本绑定的 m 正在执行系统调用)
                    handoffp(_p_)
                }
                incidlelocked(1)
                // 抢占调度
                lock(&allpLock)
            }
        }
        unlock(&allpLock)
        return uint32(n)
    }
    
    
    // 判断是否需要 p 接管 m
    func handoffp(_p_ *p) {
    
        if !runqempty(_p_) || sched.runqsize != 0 {
            startm(_p_, false)
            return
        }
        
        if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) {
            startm(_p_, true)
            return
        }
        
        lock(&sched.lock)
        // ...
        if sched.runqsize != 0 {
            unlock(&sched.lock)
            startm(_p_, false)
            return
        }
        // If this is the last running P and nobody is polling network,
        // need to wakeup another M to poll network.
        if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
            unlock(&sched.lock)
            startm(_p_, false)
            return
        }
    
    
        // ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    当以下四个条件满足其一时,则需要为 p 获取新的 m:

    1. 当前 p 本地队列还有待执行的 g;
    2. 全局繁忙(没有空闲的 p 和 m,全局 g 队列为空)
    3. 需要处理网络 socket 读写请求

    获取 m 时,会先尝试获取已有的空闲的 m,若不存在,则会创建一个新的 m.

    func startm(_p_ *p, spinning bool) {
        
        mp := acquirem()
        lock(&sched.lock)
        // ...
        // 获取 m
        nmp := mget()
        if nmp == nil {
        	// 创建 m
            id := mReserveID()
            unlock(&sched.lock)
    
    
            var fn func()
            // ...
            // 绑定 p
            newm(fn, _p_, id)
            // ...
            return
        }
        unlock(&sched.lock)
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    reentersyscall 和 exitsyscall

    在 m 需要执行系统调用前,会先执行位于 runtime/proc.go 的 reentersyscall 的方法:

    func reentersyscall(pc, sp uintptr) {
    	// 此时执行权同样位于 m 的 g0 手中;
        _g_ := getg()
    
        // ...
        // 保存当前 g 的执行环境;
        save(pc, sp)
        _g_.syscallsp = sp
        _g_.syscallpc = pc
        // 将 g 和 p 的状态更新为 syscall;
        casgstatus(_g_, _Grunning, _Gsyscall)
        // ...
    
    	// 解除 p 和 当前 m 之间的绑定,因为 m 即将进入系统调用而导致短暂不可用;
        pp := _g_.m.p.ptr()
        pp.m = 0
        // 将 p 添加到 当前 m 的 oldP 容器当中,后续 m 恢复后,会优先寻找旧的 p 重新建立绑定关系.
        _g_.m.oldp.set(pp)
        _g_.m.p = 0
        // 将 g 和 p 的状态更新为 syscall;
        atomic.Store(&pp.status, _Psyscall)
        // ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.go 的 exitsyscall 函数中,尝试寻找 p 重新开始运作:

    func exitsyscall() {
    	// 方法执行之初,此时的执行权是普通 g.
        _g_ := getg()
        
        // ...
        // 倘若此前设置的 oldp 仍然可用,则重新和 oldP 绑定
        if exitsyscallfast(oldp) {
            // ...
            // 将当前 g 重新置为 running 状态,然后开始执行后续的用户函数;
            casgstatus(_g_, _Gsyscall, _Grunning)
            // ...
            return
        }
    
        // ...
        // old 绑定失败,则调用 mcall 方法切换到 m 的 g0,并执行 exitsyscall0 方法:
        mcall(exitsyscall0)
        // ...
    }
    
    // 
    func exitsyscall0(gp *g) {
    	// 将 g 由系统调用状态切换为可运行态,并解绑 g 和 m 的关系
        casgstatus(gp, _Gsyscall, _Grunnable)
        dropg()
        // 加锁 --> 从全局 p 队列获取可用的 p
        lock(&sched.lock)
        var _p_ *p
        if schedEnabled(gp) {
            _p_, _ = pidleget(0)
        }
        
        var locked bool
        // 如果获取到了,则执行 g:
        if _p_ == nil {
            globrunqput(gp)
        } 
        // 释放锁
        unlock(&sched.lock)
        // 如若无 p 可用,则将 g 添加到全局队列,
        if _p_ != nil {
            acquirep(_p_)
            execute(gp, false) // Never returns.
        }
        
        // ...
        // 当前 m 陷入沉睡. 直到被唤醒后才会继续发起调度.
        stopm()
        schedule() // Never returns.
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
  • 相关阅读:
    python 根据url下载文件使用Python实现
    算法---找出第 N 个二进制字符串中的第 K 位(Kotlin)
    知识库指南4.0|AIGC & Web3 & 元宇宙发展趋势的学习与实践指引
    功能测试自动化测试流程
    【Redis学习笔记】第二章【2.3】Redis数据类型--list
    云原生优缺点分析
    C51--PWN-舵机控制
    富格林:警觉诱导黑幕避免亏损
    深圳信用贷款之路:申请了10次都被拒!这三步帮你逆袭银行贷款!
    [De1CTF 2019]SSRF Me
  • 原文地址:https://blog.csdn.net/HideonHacker/article/details/138157150