前言
最近在读《Go程序员面试笔试宝典》,看到第12章《调度机制》,铺天盖地的源码和汇编代码,读起来还是挺吃力的。花了两天时间啃完这一章,对于Go的抢占机制,我有了新的收获,故写下这篇文章,与各位Gopher共同学习交流一下。若有哪里理解不到位,还望大佬们及时指正。如果本文对你有所帮助的话,帮忙点个赞哦~
正文
sysmon函数
sysmonsysmonnetpoolretakeforcegcscavengeretake// src/runtime/proc.go
func sysmon() {
//...
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
//...
}
retake// src/runtime/proc.go
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
retake抢占进行系统调用的P
如果一个处于_Psyscall状态的P不想被抢占,必须同时满足以下条件:
runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now翻译成中文就是:
sysmon代码中的注释是这么说的:
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
sysmonsysmonidleretakeidleidledelay if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
retakesysmonsysmon_Pidlehandoffp(_p_)抢占运行时间过长的P
我想首先介绍一下Go的抢占式调度发展历程,分为三个阶段:
- Go1.1:不支持抢占式调度,程序只能依靠goroutine主动让出CPU资源才能触发调度;
- Go1.2:引入了基于协作的抢占式调度;
- Go1.14:引入了基于信号的抢占式调度。
Go1.1遇到的一个问题在于,某些goroutine可以长时间地占用线程,造成其他goroutine饥饿。
Go1.2通过引入基于协作的抢占式调度,缓解了这个问题,来看看他们是怎么做的:
_Prunning_Psyscallpreemptone(_p_)// src/runtime/proc.go
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a goroutine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
preemptone(_p_)gp.preemptgp.stackguard0stackPreemptSIGURGstackPreemptstackguard0stackPreemptmorestack_noctxt// src/runtime/asm_amd64.s
// morestack but not preserving ctxt.
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
MOVL $0, DX
JMP runtime·morestack(SB)
morestack_noctxtmorestackTEXT runtime·morestack(SB),NOSPLIT,$0-0
// ...
MOVQ (g_sched+gobuf_sp)(BX), SP
CALL runtime·newstack(SB)
CALL runtime·abort(SB) // crash if newstack returns
RET
MOVQ (g_sched+gobuf_sp)(BX), SPnewstacknewstack// src/runtime/stack.go
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
if preempt {
if !canPreemptM(thisg.m) {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
gp.stackguard0stackPreemptcanPreemptM// src/runtime/preempt.go
func canPreemptM(mp *m) bool {
return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
gogo(&gp.sched)gopreempt_m(gp)// src/runtime/proc.go
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
// 修改gp的状态为_Grunnable
casgstatus(gp, _Grunning, _Grunnable)
// 解除m和g的关系
dropg()
lock(&sched.lock)
// 将gp放入全局可运行队列
globrunqput(gp)
unlock(&sched.lock)
schedule()
}
preemptone(_p_)来看这样一段代码:
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(1)
go func() {
for {
}
}()
time.Sleep(time.Millisecond)
fmt.Println("finished.")
}
finished.preemptone(_p_)SIGURGif preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
preemptMSupporteddebug.asyncpreemptoffGODEBUG=asyncpreemptoff=1preemptM(mp)// src/runtime/proc.go
func preemptM(mp *m) {
// ...
signalM(mp, sigPreempt) //const sigPreempt = _SIGURG
// ...
}
signalM()mp// src/runtime/os_darwin.go
func signalM(mp *m, sig int) {
pthread_kill(pthread(mp.procid), uint32(sig))
}
_SIGURG_SIGURG// src/runtime/signal_unix.go
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
_g_ := getg()
c := &sigctxt{info, ctxt}
// ...
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
// ...
}
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(funcPC(asyncPreempt), newpc)
}
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
}
_SIGURGdoSigPreempt(gp, c)ctxt.pushCallasyncPreemptasyncPreemptasyncPreemptasyncPreemptasyncPreempt2()gpfunc asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
小结
sysmon_SIGURGsighandler