在Go1.14前,goroutine是协作式调度(cooperative)的,和通常意义上的coroutine差不多,只是不需要用户手动yield,由Go编译器自动在函数调用前插入栈检测的命令。
sysmongstackguard0stackPreemptgSPg.stackguard0morestackmorestacknewstack// runtime/stack.go
//go:nowritebarrierrec
func newstack() {
thisg := getg()
// ...
gp := thisg.m.curg
// ...
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
if preempt {
// cannot preempt
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
if preempt {
// do something ...
// Act like goroutine called runtime.Gosched.
casgstatus(gp, _Gwaiting, _Grunning)
gopreempt_m(gp) // never return
}
// something about moving stack ...
}
for { i++ }从Go1.14起,goroutine是抢占式调度(preemptive)的,采用signal yielding的方法,不太像协程,毕竟不是协作式调度,而更像用户态线程。
SIGURG这么做有两个问题,
- 我们最好不要直接在handler中做调度;
- 用户态线程依赖TLS
先说2吧,其实很多库函数依赖TLS来避免锁/提高性能,如malloc等。用户线程依赖OS线程的TLS,所以一个OS线程可以执行多个用户线程,但一个用户线程只能由最初执行它的OS线程来执行。这个很好解决,比如新创建的用户线程放入global queue中,被抢占的用户线程放到OS线程本地的queue中。
mmappthread_createsched()sched()相关论文:
Go大致也是这么做的,M对应OS线程,G对应用户线程,看一下sysmon线程中对G进行抢占的部分代码:
// runtime/proc.go
func sysmon() {
// ...
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
// ...
now = nanotime()
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// ...
}
}
retakepreemptone// runtime/proc.go
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
// ...
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
// ...
}
unlock(&allpLock)
return uint32(n)
}
preemptonegpreemptMm// runtime/proc.go
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a goroutine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
preemptMsignalMsigPreemptSIGURGfunc preemptM(mp *m) {
// On Darwin, don't try to preempt threads during exec.
// Issue #41702.
if GOOS == "darwin" || GOOS == "ios" {
execLock.rlock()
}
if atomic.Cas(&mp.signalPending, 0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, 1)
}
// If multiple threads are preempting the same M, it may send many
// signals to the same M such that it hardly make progress, causing
// live-lock problem. Apparently this could happen on darwin. See
// issue #37741.
// Only send a signal if there isn't already one pending.
signalM(mp, sigPreempt)
}
if GOOS == "darwin" || GOOS == "ios" {
execLock.runlock()
}
}
不同平台下的signal handler不一样,我们看一下*nix下的:
// runtime/signal_unix.go
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
_g_ := getg()
c := &sigctxt{info, ctxt}
// ...
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
// ...
}
ctxt unsafe.Pointerucontext_t *// runtime/signal_unix.go
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
}
pushCall// runtime/signal_amd64.go
func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
// Make it look like we called target at resumePC.
sp := uintptr(c.rsp())
sp -= goarch.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = resumePC
c.set_rsp(uint64(sp))
c.set_rip(uint64(targetPC))
}
push resumePCriptargetPCctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)newpcripasyncPreemptnewpcPCrip// runtime/preempt_amd64.s
TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
PUSHQ BP
MOVQ SP, BP
// Save flags before clobbering them
PUSHFQ
// obj doesn't understand ADD/SUB on SP, but does understand ADJSP
ADJSP $368
// But vet doesn't know ADJSP, so suppress vet stack checking
NOP SP
// ...
// save general purpose registers
// ...
#ifdef GOOS_darwin
CMPB internal∕cpu·X86+const_offsetX86HasAVX(SB), $0
JE 2(PC)
VZEROUPPER
#endif
// ...
// save floating point registers
// ...
CALL ·asyncPreempt2(SB)
// ...
// restore floating point registers
// ...
// ...
// restore general purpose registers
// ...
ADJSP $-368
POPFQ
POPQ BP
RETasyncPreempt2// runtime/preempt.go
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
gp.preemptStopmcallg0preemptParkgopreempt_mdropg()schedule()mgREFERENCE
有错误欢迎指正。