前言

最近在读《Go程序员面试笔试宝典》,看到第12章《调度机制》,铺天盖地的源码和汇编代码,读起来还是挺吃力的。花了两天时间啃完这一章,对于Go的抢占机制,我有了新的收获,故写下这篇文章,与各位Gopher共同学习交流一下。若有哪里理解不到位,还望大佬们及时指正。如果本文对你有所帮助的话,帮忙点个赞哦~

正文

sysmon函数

sysmon
sysmonnetpoolretakeforcegcscavengeretake
// src/runtime/proc.go
func sysmon() {
    //...
        // retake P's blocked in syscalls
        // and preempt long running G's
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
    //...
}
retake
// src/runtime/proc.go

// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world.
    lock(&allpLock)
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
            }
        }
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}
retake

抢占进行系统调用的P

如果一个处于_Psyscall状态的P不想被抢占,必须同时满足以下条件:

runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now

翻译成中文就是:

sysmon

代码中的注释是这么说的:

// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
sysmonsysmonidleretakeidleidledelay
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)
retakesysmon
sysmon_Pidlehandoffp(_p_)

抢占运行时间过长的P

我想首先介绍一下Go的抢占式调度发展历程,分为三个阶段:

  • Go1.1:不支持抢占式调度,程序只能依靠goroutine主动让出CPU资源才能触发调度;
  • Go1.2:引入了基于协作的抢占式调度;
  • Go1.14:引入了基于信号的抢占式调度。

Go1.1遇到的一个问题在于,某些goroutine可以长时间地占用线程,造成其他goroutine饥饿。

Go1.2通过引入基于协作的抢占式调度,缓解了这个问题,来看看他们是怎么做的:

_Prunning_Psyscallpreemptone(_p_)
// src/runtime/proc.go
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }

    gp.preempt = true

    // Every call in a goroutine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    gp.stackguard0 = stackPreempt

    // Request an async preemption of this P.
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        preemptM(mp)
    }

    return true
}
preemptone(_p_)
gp.preemptgp.stackguard0stackPreemptSIGURG
stackPreemptstackguard0stackPreemptmorestack_noctxt
// src/runtime/asm_amd64.s
// morestack but not preserving ctxt.
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
    MOVL    $0, DX
    JMP runtime·morestack(SB)
morestack_noctxtmorestack
TEXT runtime·morestack(SB),NOSPLIT,$0-0
    // ...
    MOVQ    (g_sched+gobuf_sp)(BX), SP
    CALL    runtime·newstack(SB)
    CALL    runtime·abort(SB)   // crash if newstack returns
    RET
MOVQ   (g_sched+gobuf_sp)(BX), SPnewstack
newstack
// src/runtime/stack.go
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
    if preempt {
        if !canPreemptM(thisg.m) {
            // Let the goroutine keep running for now.
            // gp->preempt is set, so it will be preempted next time.
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }
    }
gp.stackguard0stackPreemptcanPreemptM
// src/runtime/preempt.go
func canPreemptM(mp *m) bool {
    return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
gogo(&gp.sched)
gopreempt_m(gp)
// src/runtime/proc.go
func gopreempt_m(gp *g) {
    if trace.enabled {
        traceGoPreempt()
    }
    goschedImpl(gp)
}

func goschedImpl(gp *g) {
    status := readgstatus(gp)
    if status&^_Gscan != _Grunning {
        dumpgstatus(gp)
        throw("bad g status")
    }
    // 修改gp的状态为_Grunnable
    casgstatus(gp, _Grunning, _Grunnable)
    // 解除m和g的关系
    dropg()
    lock(&sched.lock)
    // 将gp放入全局可运行队列
    globrunqput(gp)
    unlock(&sched.lock)

    schedule()
}
preemptone(_p_)

来看这样一段代码:

package main

import (
        "fmt"
        "runtime"
        "time"
)

func main() {
        runtime.GOMAXPROCS(1)
        go func() {
                for  {

                }
        }()
        time.Sleep(time.Millisecond)
        fmt.Println("finished.")
}
finished.
preemptone(_p_)SIGURG
if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        preemptM(mp)
    }
preemptMSupporteddebug.asyncpreemptoff
GODEBUG=asyncpreemptoff=1
preemptM(mp)
// src/runtime/proc.go
func preemptM(mp *m) {
    // ...
        signalM(mp, sigPreempt) //const sigPreempt = _SIGURG
    // ...
}
signalM()mp
// src/runtime/os_darwin.go
func signalM(mp *m, sig int) {
    pthread_kill(pthread(mp.procid), uint32(sig))
}
_SIGURG
_SIGURG
// src/runtime/signal_unix.go
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    _g_ := getg()
    c := &sigctxt{info, ctxt}

    // ...

    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        // Might be a preemption signal.
        doSigPreempt(gp, c)
        // Even if this was definitely a preemption signal, it
        // may have been coalesced with another signal, so we
        // still let it through to the application.
    }

    // ...
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
    // Check if this G wants to be preempted and is safe to
    // preempt.
    if wantAsyncPreempt(gp) {
        if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
            // Adjust the PC and inject a call to asyncPreempt.
            ctxt.pushCall(funcPC(asyncPreempt), newpc)
        }
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
    atomic.Store(&gp.m.signalPending, 0)

    if GOOS == "darwin" || GOOS == "ios" {
        atomic.Xadd(&pendingPreemptSignals, -1)
    }
}
_SIGURGdoSigPreempt(gp, c)ctxt.pushCallasyncPreemptasyncPreemptasyncPreemptasyncPreemptasyncPreempt2()gp
func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    if gp.preemptStop {
        mcall(preemptPark)
    } else {
        mcall(gopreempt_m)
    }
    gp.asyncSafePoint = false
}

小结

sysmon_SIGURGsighandler

参考文章