协程设计-GMP模型
GMP

工作线程M

工作线程是最终运行协程的实体。操作系统中的线程与在运行时代表线程的m结构体进行了绑定:

// go/src/runtime/runtime2.go

type m struct {
    g0      *g     // goroutine with scheduling stack
    tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
    curg          *g       // current running goroutine
    p             puintptr // attached p for executing go code (nil if not executing go code)
    nextp         puintptr
    oldp          puintptr // the p that was attached before executing a syscall
    park          note
  ...
}
curg
g0ggg->g0->g
thread-local storagetlsm.tls

逻辑处理器p

runtime.GOMAXPROCS()

逻辑处理器p通过结构体p进行定义:

type p struct {
    id          int32
    status      uint32 // one of pidle/prunning/...
  schedtick   uint32     // incremented on every scheduler call
    syscalltick uint32     // incremented on every system call
    m           muintptr   // back-link to associated m (nil if idle)
    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    runnext guintptr
  ... 
}
idstatusmm==nil
runqp.runqheadp.runqtailschedt.runq
runnextp.runnextrunnextp.runq

协程g

协程通常分为特殊的调度协程g0以及执行用户代码的普通协程g。无论g0还是g,都通过结构体g进行定义:

// go/src/runtime/runtime2.go

type g struct {
    stack       stack   // offset known to runtime/cgo
    m         *m      // current m; offset known to arm liblink
    sched     gobuf
  ...
}

// Stack describes a Go execution stack.
type stack struct {
    lo uintptr
    hi uintptr
}

type gobuf struct {
    sp   uintptr
    pc   uintptr
    g    guintptr
    ctxt unsafe.Pointer
    ret  uintptr
    lr   uintptr
    bp   uintptr // for framepointer-enabled architectures
}
stackmschedgobuf
schedt
schedt
// go/src/runtime/runtime2.go

type schedt struct {
    lock mutex

    midle        muintptr // idle m's waiting for work
    nmidle       int32    // number of idle m's waiting for work
    nmidlelocked int32    // number of locked m's waiting for work
    mnext        int64    // number of m's that have been created and next M ID
    maxmcount    int32    // maximum number of m's allowed (or die)
    nmsys        int32    // number of system m's not counted for deadlock
    nmfreed      int64    // cumulative number of freed m's

    ngsys uint32 // number of system goroutines; updated atomically

    pidle      puintptr // idle p's
    npidle     uint32
    nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

    // Global runnable queue.
    runq     gQueue
    runqsize int32
  
  // Global cache of dead G's.
    gFree struct {
        lock    mutex
        stack   gList // Gs with stacks
        noStack gList // Gs without stacks
        n       int32
    }
  
    // freem is the list of m's waiting to be freed when their
    // m.exited is set. Linked through m.freelink.
    freem *m
    ...
}

schedtmidlenmidlefreemngsyspidlenpidlerunqrunqsizegFree
schedtrunqschedtlock

GMP详细示图

通过上述说明,我们可以进一步细化GMP模型示图为:

协程调度
g0g->g0->g->g0-g

调度策略

工作线程m需要通过协程调度获得具体可运行的某一协程g。获取协程g的一般策略主要包含三大步:

graph LR
id(1. 查找p本地的局部运行队列) --> id2(2. 查找schedt中的全局运行队列) --> id3(3. 窃取其他p中的局部运行队列)
findRunnable()
// go/src/runtime/proc.go

// Finds a runnable goroutine to execute.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
  ...
  // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
    if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
        lock(&sched.lock)
        gp = globrunqget(_p_, 1)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false, false
        }
    }
  ...
  // local runq
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime, false
    }

    // global runq
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false, false
        }
    }
  ...
    // Spinning Ms: steal work from other Ps.
    //
    // Limit the number of spinning Ms to half the number of busy Ps.
    // This is necessary to prevent excessive CPU consumption when
    // GOMAXPROCS>>1 but the program parallelism is low.
    procs := uint32(gomaxprocs)
    if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
        if !_g_.m.spinning {
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
        }

        gp, inheritTime, tnow, w, newWork := stealWork(now)
        now = tnow
        if gp != nil {
            // Successfully stole.
            return gp, inheritTime, false
        }
    ...
    }
}

获取本地运行队列

runqget()
runnextrunnextrunqrunqheadrunqtail
// go/src/runtime/proc.go

func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
    next := _p_.runnext
    // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
    // because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
    // Hence, there's no need to retry this CAS if it falls.
    if next != 0 && _p_.runnext.cas(next, 0) {
        return next.ptr(), true
    }

    for {
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

协程调度时由于总是优先查找局部运行队列中的协程g,如果只是循环往复的地执行局部队列中的g,那么全局队列中的g可能一个都不会被调度到。因此,为了保证调度的公平性,p中每执行61次调度,就会优先从全局队列中获取一个g到当前p中执行:

// go/src/runtime/proc.go

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
  ...
    if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
        lock(&sched.lock)
        gp = globrunqget(_p_, 1)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false, false
        }
    }
  ...
}

获取全局运行队列

runqput
// go/src/runtime/proc.go

// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(_p_ *p, max int32) *g {
    assertLockHeld(&sched.lock)

    if sched.runqsize == 0 {
        return nil
    }

    n := sched.runqsize/gomaxprocs + 1
    if n > sched.runqsize {
        n = sched.runqsize
    }
    if max > 0 && n > max {
        n = max
    }
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n

    gp := sched.runq.pop()
    n--
    for ; n > 0; n-- {
        gp1 := sched.runq.pop()
        runqput(_p_, gp1, false)
    }
    return gp
}

协程窃取

allp []*p
// go/src/runtime/proc.go

// stealWork attempts to steal a runnable goroutine or timer from any P.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
    pp := getg().m.p.ptr()

    ranTimer := false

    const stealTries = 4
    for i := 0; i < stealTries; i++ {
        stealTimersOrRunNextG := i == stealTries-1

        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                // GC work may be available.
                return nil, false, now, pollUntil, true
            }
            p2 := allp[enum.position()]
            if pp == p2 {
                continue
            }
            ...
            // Don't bother to attempt to steal if p2 is idle.
            if !idlepMask.read(enum.position()) {
                if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
                    return gp, false, now, pollUntil, ranTimer
                }
            }
        }
    }
  ...
}
runqstealrunqgrab
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    t := _p_.runqtail
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
    if n == 0 {
        return nil
    }
    n--
    gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
    if n == 0 {
        return gp
    }
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    if t-h+n >= uint32(len(_p_.runq)) {
        throw("runqsteal: runq overflow")
    }
    atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
    return gp
}

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
        n := t - h
        n = n - n/2
        ...
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
            batch[(batchHead+i)%uint32(len(batch))] = g
        }
        if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
            return n
        }
    }
}

调度时机

调度策略让我们知道了协程是如何调度的,下面继续说明什么时候会发生协程调度。

主动调度

runtime.Gosched()
_Grunning_Grunnabledropg()globrunqput()schedule()
// go/src/runtime/proc.go

// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {
    checkTimeouts()
    mcall(gosched_m) //
}

// Gosched continuation on g0.
func gosched_m(gp *g) {
    ...
    goschedImpl(gp) //
}

func goschedImpl(gp *g) {
    ...
    casgstatus(gp, _Grunning, _Grunnable)
    dropg() //
    lock(&sched.lock)
    globrunqput(gp)
    unlock(&sched.lock)

    schedule()
}

// dropg removes the association between m and the current goroutine m->curg (gp for short).
func dropg() {
    _g_ := getg()

    setMNoWB(&_g_.m.curg.m, nil)
    setGNoWB(&_g_.m.curg, nil)
}

被动调度

gopark()gopark()park_m()
_Grunning_Gwaitingdropg()waitunlockffalseschedule()
// go/src/runtime/proc.go

// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    ...
    mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
    ...
    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()

    if fn := _g_.m.waitunlockf; fn != nil {
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            ...
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}
_Gwaiting_Gwaiting_Grunnablegoready()ready()
// go/src/runtime/proc.go

func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    ...
    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next)
    wakep()
    ...
}

抢占调度

go应用程序在启动时会开启一个特殊的线程来执行系统监控任务,系统监控运行在一个独立的工作线程m上,该线程不用绑定逻辑处理器p。系统监控每隔10ms会检测是否有准备就绪的网络协程,并放置到全局队列中。

大于10ms大于20微秒retake()
// go/src/runtime/proc.go

// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
            }
        }
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
      t := int64(_p_.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
      ...
    }
    unlock(&allpLock)
    return uint32(n)
}