很多gopher懂GPM,更多gopher不懂GPM!相关项目地址在(转载自https://github.com/zboya/golang_runtime_reading,下文若有任何错误的地方欢迎指出。
- 深入golang runtime的调度
- 理解调度器启动
- 相关术语
- 主要源码文件
- 调度基本组件
- G(goroutine)
- P(processor)
- M(machine)
- GPM的关系示意图
- 整体流程概览
- go进程的启动
- runtime调度器的启动
- getg() 函数
- g0和m0
- mstartfn
- 真正的调度函数 schedule
- runtime.main 的执行
- 理解调度器启动
- 调度机制
- 调度架构概览图
- 基本思想
- 类比模型
- 关于G
- G的结构
- G的新建
- G的栈
- G的状态
- 关于P
- P的结构
- P的分配
- P的状态
- 关于M
- M的结构
- M的新建
- 辛勤工作的M
- M的状态
- M的管理
- 抢占的实现
- 调度跟踪信息可视化
- ref
理解调度器启动
package main
相关术语
runtimeschedulerTLSspinningsystemstackmcallasmcgocall
主要源码文件
runtime/asm_amd64.s 进程启动时调用的一些汇编函数,可以理解为进程的入口。
runtime/runtime2.go 主要是g、m、p、schedt的数据结构和g、p的状态定义,还定义了一些全局变量。
runtime/proc.go 主要是调度器逻辑代码的实现
调度基本组件
为了方便理解这里先介绍一下GPM的概念。
G(goroutine)
- 调度系统的最基本单位goroutine,存储了goroutine的执行stack信息、goroutine状态以及goroutine的任务函数等。
- 在G的眼中只有P,P就是运行G的“CPU”。
- 相当于两级线程
g的任务函数
每个g的实例都有任务函数,如下代码
userFunc := func() {
fmt.Println("hi boya!")
}
go userFunc()
goroutinegoroutineuserFunc
P(processor)
- P表示逻辑processor,代表线程M的执行的上下文。
- P的最大作用是其拥有的各种G对象队列、链表、cache和状态。
- P的数量也代表了golang的执行并发度,即有多少goroutine可以同时运行
正确理解P
这里的p虽然表示逻辑处理器,但P并不执行任何代码,对G来说,P相当于CPU核,G只有绑定到P才能被调度。 对M来说,P提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等
M(machine)
- M代表着真正的执行计算资源,可以认为它就是os thread(系统线程)。
- M是真正调度系统的执行者,每个M就像一个勤劳的工作者,总是从各种队列中找到可运行的G,而且这样M的可以同时存在多个。
- M在绑定有效的P后,进入调度循环,而且M并不保留G状态,这是G可以跨M调度的基础。
简单理解linux的线程概念
linux线程本质是轻量级进程,线程的特点是资源共享,意思就是线程之间共享进程的内存空间,所以一个线程可以访问另一个线程的局部变量, 线程也有自己独立的堆栈,用户的代码逻辑可以在这些堆栈上执行。当用户启动一个线程时,需要指定一个让该线程执行的起始函数和该函数的参数, 那么线程要做的事情就是去执行用户指定的函数。而且在线程里可以创建另一个线程。
GPM的关系示意图
通过连线可以看出来,M和P有关联,G和P有关联,但M和G没有关联,这是为何G可以在不同的M上执行的本质原因。
整体流程概览
这里先整理一下整个流程,如下图:
go进程的启动
进程的本质是代码区的指令不断执行,驱使动态数据区和静态数据区产生数据变化。- 来自图解编译原理
main包asm_amd64.smain包runtime mainruntime·rt0_goruntime·rt0_go
runtime/asm_amd64.s
// runtime·rt0_go
// 程序刚启动的时候必定有一个线程启动(主线程)
// 将当前的栈和资源保存在g0
// 将该线程保存在m0
// tls: Thread Local Storage
// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// m0和g0互相绑定
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
// 处理args
CALL runtime·args(SB)
// os初始化, os_linux.go
CALL runtime·osinit(SB)
// 调度系统初始化, proc.go
CALL runtime·schedinit(SB)
// 创建一个goroutine,然后开启执行程序
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
PUSHQ $0 // arg size
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M
// 启动线程,并且启动调度系统
CALL runtime·mstart(SB)
runtime/os_linux.go
// 获取cpu的数量
func osinit() {
ncpu = getproccount()
}
runtime/proc.go
// 调度系统的初始化
func schedinit() {
...
// 确认P的个数
// 默认等于cpu个数,可以通过GOMAXPROCS环境变量更改
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 调整P的个数,这里是新分配procs个P
// 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
proc.go
// The bootstrap sequence is:
//
// call osinit
// call schedinit
// make & queue new G
// call runtime·mstart
//
runtime·osinitruntime·schedinitruntime·newprocmain goroutineruntime.mainruntime·mstart
runtime·mstartmain goroutine
runtime调度器的启动
上面我们看到入口函数是调用 `runtime·mstart(SB) 来启动的,那么就分析一下 mstart 做了什么?
runtime/proc.go
func mstart() {
// 这里获取的g是g0,在系统堆栈
_g_ := getg()
...
mstart1(0)
...
}
// dummy一直为0,给getcallersp当参数
func mstart1(dummy int32) {
_g_ := getg()
// 确保g是系统栈上的g0
// 调度器只在g0上执行
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
...
// 初始化m,主要是设置线程的备用信号堆栈和信号掩码
minit()
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
// 如果当前g的m是初始m0,执行mstartm0()
if _g_.m == &m0 {
// 对于初始m,需要一些特殊处理,主要是设置系统信号量的处理函数
mstartm0()
}
// 如果有m的起始任务函数,则执行,比如 sysmon 函数
// 对于m0来说,是没有 mstartfn 的
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m.helpgc != 0 {
_g_.m.helpgc = 0
stopm()
} else if _g_.m != &m0 { // 如果不是m0,需要绑定p
// 绑定p
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// 进入调度,而且不会在返回
schedule()
}
mstartmstart1mstart1_g_ := getg()ggg0g0gmm0m
mstart1
1. getg() 做了什么?
2. 检查 g 是否正确,不是 g0 直接退出,那这个 g0 是什么呢? m0 又是什么?
3. mstartfn 对应的实际函数是啥?
schedule
getg() 函数
如果你看过runtime的源码,当你去查看 getg() 函数的定义时,你有可能会疑惑,为何这个函数没有body?
runtime/stubs.go
// getg returns the pointer to the current g.
// The compiler rewrites calls to this function into instructions
// that fetch the g directly (from TLS or from the dedicated register).
func getg() *g
这里只有函数的声明,却没有函数的实体,是因为这个函数的实体是编译器重写的,也就是编译器中才定义了该函数的实体。笔者也没有深究 getg 的实体具体是什么, 所以根据注释和自己的理解来解释这个函数做了什么。
getggetg().m.curggetg()getg() == getg().m.curg
g0和m0
g0 和 m0 在runtime中是比较重要的概念这里讲一下,它们到底什么?
var (
m0 m
g0 g
raceprocctx0 uintptr
)
到这里我们应该知道了g0和m0是什么了? m0代表主线程、g0代表了线程的堆栈。调度都是在系统堆栈上跑的,也就是一定要跑在 g0 上,所以 mstart1 函数才检查是不是在g0上, 因为接下来就要执行调度程序了。
mstartfn
mstartfnmstartfn
newm(sysmon, nil)
newm(mhelpgc, nil)
newm(templateThread, nil)
mspinning
mstartfnmstartfn
schedule
真正的调度函数 schedule
schedule
runtime/proc.go
func schedule() {
_g_ := getg()
...
top:
// 如果当前GC需要停止整个世界(STW), 则调用gcstopm休眠当前的M
if sched.gcwaiting != 0 {
// 为了STW,停止当前的M
gcstopm()
// STW结束后回到 top
goto top
}
...
var gp *g
var inheritTime bool
...
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 每隔61次调度,尝试从全局队列种获取G
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 从p的本地队列中获取
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
// 想尽办法找到可运行的G,找不到就不用返回了
gp, inheritTime = findrunnable() // blocks until work is available
}
...
// println("execute goroutine", gp.goid)
// 找到了g,那就执行g上的任务函数
execute(gp, inheritTime)
}
findrunnable
executeexecuterunqgetmain goroutineexecutemain goroutinemain goroutine
runtime.main 的执行
main goroutineruntimemainmain
// The main goroutine.
func main() {
// 获取 main goroutine
g := getg()
...
// 在系统栈上运行 sysmon
systemstack(func() {
// 分配一个新的m,运行sysmon系统后台监控
// (定期垃圾回收和调度抢占)
newm(sysmon, nil)
})
...
// 确保是主线程
if g.m != &m0 {
throw("runtime.main not on m0")
}
// runtime 内部 init 函数的执行,编译器动态生成的。
runtime_init() // must be before defer
...
// gc 启动一个goroutine进行gc清扫
gcenable()
...
// 执行init函数,编译器动态生成的,
// 包括用户定义的所有的init函数。
// make an indirect call,
// as the linker doesn't know the address of
// the main package when laying down the runtime
fn := main_init
fn()
...
// 真正的执行main func in package main
// make an indirect call,
// as the linker doesn't know the address of
// the main package when laying down the runtime
fn = main_main
fn()
...
// 退出程序
exit(0)
// 为何这里还需要for循环?
// 下面的for循环一定会导致程序崩掉,这样就确保了程序一定会退出
for {
var x *int32
*x = 0
}
}
mainsysmonruntime_initfunc init()main_init
接下来就是真的执行 main 包中的main函数了,这里 main 函数由用户定义其具体内容,如果该 main 函数结束了,就会执行 exit(0) 来退出进程。 若是 exit(0) 后,进程没退出,由接下来的代码确保进程一会退出,就是for循环一直访问非法地址,正常情况下,一但出现非法地址访问,系统就会把该进程杀死,用这样的方法来确保进程退出。
调度机制
上面讲述了runtime启动用户代码main函数的过程,但是如果用户在代码中新建了很多goroutine,runtime又是如何管理的呢?
调度架构概览图
golang源码剖析
+-------------------- sysmon ---------------//------+
| |
| |
+---+ +---+-------+ +--------+ +---+---+
go func() ---> | G | ---> | P | local | <=== balance ===> | global | <--//--- | P | M |
+---+ +---+-------+ +--------+ +---+---+
| | |
| +---+ | |
+----> | M | <--- findrunnable ---+--- steal <--//--+
+---+
|
mstart
|
+--- execute <----- schedule
| |
| |
+--> G.fn --> goexit --+
1. go func() 语气创建G。
2. 将G放入P的本地队列(或者平衡到全局队列)。
3. 唤醒或新建M来执行任务。
4. 进入调度循环
5. 尽力获取可执行的G,并执行
6. 清理现场并且重新进入调度循环
sysmon
接下来详细看一下golang的调度器在进程中是怎么起来的。
基本思想
复用线程
goroutines本身就是运行在一组线程之上,不需要频繁的创建、销毁线程,而是对线程的复用。在调度器中复用线程还有2个体现:
1. work stealing,当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。
2. hand off,当本线程因为G进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。
利用并行
Go的一个强项就是天生支持并发,并发包含了并行的实现,并行度由GOMAXPROCS来控制,默认等于cpu数,当GOMAXPROCS大于1时,就最多有GOMAXPROCS个线程同时处于运行状态,这些线程可能分布在多个CPU核上同时运行,这样就可以充分利用多核。
调度器的两小策略
-
全局G队列 在调度器中有全局G队列,用来均衡每个P上面G的数量。
-
一定的抢占 当一个goroutine占用cpu超过10ms,会被抢占,防止其他goroutine饿死。
类比模型
为了更方便的理解调度器,我拿一个现实的例子来比喻,假设有一个物流公司,里面有司机、车、货物,类比M、P、G,当有用户需要运输货物(G)时,司机能得到货物,但是此时司机(M)是无法运输货物的,因为他还没有车(P),所以司机先从车库中找到一辆空闲的车,然后再将货物运到目的地,此时一次货物的运输完成,就像完成用户的模块逻辑代码。然后公司越开越大,现在有多个司机,也有多辆车,当然要运输的货物也很多。这里的每个司机都很勤劳,他们日夜的在运输货物,除非实在没有货物来运了,他们才会去休息(M休眠)。为了避免有些司机很轻松没事干,而有些司机忙不过来,当司机发现自己没活干了,他就会去向别的车里面拿一些货物过来,帮忙运输(M偷其他P里面的G)。
关于G
G是调度的对象。G存储了goroutine的执行stack信息、goroutine状态以及goroutine的任务函数等,但它并不执行用户代码。
G的结构
type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
// 简单数据结构,lo 和 hi 成员描述了栈的下界和上界内存地址
stack stack // offset known to runtime/cgo
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
// 当前的m
m *m // current m; offset known to arm liblink
// goroutine切换时,用于保存g的上下文
sched gobuf
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
stktopsp uintptr // expected sp at top of stack, to check in traceback
// 用于传递参数,睡眠时其他goroutine可以设置param,唤醒时该goroutine可以获取
param unsafe.Pointer // passed parameter on wakeup
atomicstatus uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
// 唯一的goroutine的ID
goid int64
// g被阻塞的大体时间
waitsince int64 // approx time when the g become blocked
waitreason string // if status==Gwaiting
schedlink guintptr
// 标记是否可抢占
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
paniconfault bool // panic (instead of crash) on unexpected fault address
preemptscan bool // preempted g does scan for gc
gcscandone bool // g has scanned stack; protected by _Gscan bit in status
gcscanvalid bool // false at start of gc cycle, true if G has not run since last scan; TODO: remove?
throwsplit bool // must not split stack
raceignore int8 // ignore race detection events
sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine
sysexitticks int64 // cputicks when syscall has returned (for tracing)
traceseq uint64 // trace event sequencer
tracelastp puintptr // last P emitted an event for this goroutine
// G被锁定只在这个m上运行
lockedm muintptr
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
// 调用者的 PC/IP
gopc uintptr // pc of go statement that created this goroutine
// 任务函数
startpc uintptr // pc of goroutine function
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // cached timer for time.Sleep
selectDone uint32 // are we participating in a select and did someone win the race?
// Per-G GC state
// gcAssistBytes is this G's GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
}
allgs
G的新建
go func() { ... }
runtimenewprocsystemstacknewproc1newproc1malgschedrunqputmain goroutine
// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
// Cannot split the stack because it assumes that the arguments
// are available sequentially after &fn; they would not be
// copied if a stack split occurred.
//go:nosplit
// 新建一个goroutine,
// 用fn + PtrSize 获取第一个参数的地址,也就是argp
// 用siz - 8 获取pc地址
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
pc := getcallerpc()
// 用g0的栈创建G对象
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, pc)
})
}
// Create a new g running fn with narg bytes of arguments starting
// at argp. callerpc is the address of the go statement that created
// this. The new g is put on the queue of g's waiting to run.
// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
// callerpc是newproc函数的pc
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
// print("fn=", fn.fn, " argp=", argp, " narg=", narg, " callerpc=", callerpc, "\n")
_g_ := getg() // g0
// 任务函数如果为nil,则抛出异常
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
// 禁用抢占,因为它可以在本地var中保存p
_g_.m.locks++ // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7
// We could allocate a larger initial stack if necessary.
// Not worth it: this is almost always an error.
// 4*sizeof(uintreg): extra space added below
// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
// 如果函数的参数大小比2048大的话,直接panic
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw("newproc: function arguments too large for new goroutine")
}
// 从m中获取p
_p_ := _g_.m.p.ptr()
// 尝试从gfree list获取g,包括本地和全局list
newg := gfget(_p_)
// 如果没获取到g,则新建一个
if newg == nil {
// 分配栈为 2k 大小的G对象
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead
// 添加到allg数组,防止gc扫描清除掉
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
// 此时获取的g一定得是 _Gdead
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
// 参数大小+稍微一点空间
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
// 新协程的栈顶计算,将栈顶减去参数占用的空间
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR { // amd64平台下 usesLR 为 false
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
// 如果有参数
if narg > 0 {
// copy参数到栈上
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
// 这是一个堆栈到堆栈的副本。如果启用了写入屏障并且源堆栈为灰色(目标始终为黑色),
// 则执行屏障复制。我们在* memmove之后执行此操作,因为目标堆栈上可能有垃圾。
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
// We're in the prologue, so it's always stack map index 0.
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
}
}
// 初始化G的gobuf,保存sp,pc,任务函数等
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
// 保存goexit的地址到sched.pc,后面会调节 goexit 作为任务函数返回后执行的地址,所以goroutine结束后会调用goexit
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
// sched.g保存当前新的G
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 将当前的pc压入栈,保存g的任务函数为pc
gostartcallfn(&newg.sched, fn)
// gopc保存newproc的pc
newg.gopc = callerpc
// 任务函数的地址
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
// 判断g的任务函数是不是runtime系统的任务函数,是则sched.ngsys加1
if isSystemGoroutine(newg) {
atomic.Xadd(&sched.ngsys, +1)
}
newg.gcscanvalid = false
// 更改当前g的状态为_Grunnable
casgstatus(newg, _Gdead, _Grunnable)
if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
// 生成唯一的goid
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
}
if trace.enabled {
// 如果启动了go trace,记录go create事件
traceGoCreate(newg, newg.startpc)
}
// 将当前新生成的g,放入队列
runqput(_p_, newg, true)
// 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
// 如果还有空闲P,那么新建M来运行G
wakep()
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
}
}
G的栈
stacksizestackguard0stackguard1
// 新分配一个g对象,并申请好 stacksize 大小的栈,此时g的状态为_Gidle
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
stacksize = round2(_StackSystem + stacksize)
systemstack(func() {
// 调用 stackalloc 分配栈
newg.stack = stackalloc(uint32(stacksize))
})
// 设置 stackguard
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
}
return newg
}
G的状态
_Grunnable
|
v
schedule <------------+ goexit
| ^
| |
| |
+--> execute --> G.fn -+
// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg()
// gp的状态置为_Gdead
casgstatus(gp, _Grunning, _Gdead)
...
// 状态重置
gp.m = nil
// G和M是否锁定
locked := gp.lockedm != 0
// G和M解除锁定
gp.lockedm = 0
_g_.m.lockedg = 0
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = ""
gp.param = nil
gp.labels = nil
gp.timer = nil
...
// Note that gp's stack scan is now "valid" because it has no
// stack.
gp.gcscanvalid = true
// 处理G和M的清除工作
dropg()
...
// 将G放入P的G空闲链表
gfput(_g_.m.p.ptr(), gp)
...
// 再次计入调度
schedule()
}
关于P
P是逻辑CPU,它最主要的是本身有G的队列,保存着可运行的G。
P的结构
type p struct {
lock mutex
// id也是allp的数组下标
id int32
status uint32 // one of pidle/prunning/...
// 单向链表,指向下一个P的地址
link puintptr
// 每调度一次加1
schedtick uint32 // incremented on every scheduler call
// 每一次系统调用加1
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
// 回链到关联的m
m muintptr // back-link to associated m (nil if idle)
mcache *mcache
racectx uintptr
deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
deferpoolbuf [5][32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
// goroutine的ID的缓存
goidcache uint64
goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock.
// 可运行的goroutine的队列
runqhead uint32
runqtail uint32
runq [256]guintptr
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
// 下一个运行的g,优先级最高
runnext guintptr
// Available G's (status == Gdead)
gfree *g
gfreecnt int32
sudogcache []*sudog
sudogbuf [128]*sudog
...
}
P的分配
schedinitprocresizeGOMAXPROCSruntime.GOMAXPROCSruntime.GOMAXPROCS
func schedinit() {
...
// 让procs等于cpu个数
procs := ncpu
// 如果环境变量设置了 GOMAXPROCS 且大于0,那么将 procs 更改
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 调整P的个数,这里是新分配procs个P
// 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
// 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整
func procresize(nprocs int32) *p {
old := gomaxprocs
// 如果 gomaxprocs <=0 抛出异常
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
// 如果开启trace,记录事件
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
// 更新全局状态统计
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// Grow allp if necessary.
if nprocs > int32(len(allp)) {
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
// 分配nprocs个*p
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
...
}
allpruntime.GOMAXPROCS
P的状态
P的状态换这里也不详细讲述,对于理解调度来说,知道M需要绑定P才能执行G即可,这个规则有一个例外,就是 sysmon , 它不需要P,它是直接运行在M上,相当于GM模型。sysmon 更像C语言的线程上的任务函数。
关于M
M是系统线程的抽象,是一个勤劳的工作者,它拥有真正的计算资源,执行代码。
M的结构
type m struct {
// 用来执行调度指令的 goroutine
g0 *g // goroutine with scheduling stack
morebuf gobuf // gobuf arg to morestack
divmod uint32 // div/mod denominator for arm - known to liblink
// Fields not known to debuggers.
procid uint64 // for debuggers, but offset not hard-coded
// 处理信号的goroutine
gsignal *g // signal-handling g
goSigStack gsignalStack // Go-allocated signal handling stack
sigmask sigset // storage for saved signal mask
// thread-local storage
tls [6]uintptr // thread-local storage (for x86 extern register)
mstartfn func()
// 当前运行的goroutine
curg *g // current running goroutine
caughtsig guintptr // goroutine running during fatal signal
// 关联p和执行的go代码
p puintptr // attached p for executing go code (nil if not executing go code)
nextp puintptr
id int64
// 状态
mallocing int32
throwing int32
preemptoff string // if != "", keep curg running on this m
// locks表示该M是否被锁的状态,M被锁的状态下该M无法执行gc
locks int32
softfloat int32
dying int32
profilehz int32
helpgc int32
// 是否自旋,自旋就表示M正在找G来运行
spinning bool // m is out of work and is actively looking for work
// m是否被阻塞
blocked bool // m is blocked on a note
// m是否在执行写屏蔽
inwb bool // m is executing a write barrier
newSigstack bool // minit on C thread called sigaltstack
printlock int8
// m在执行cgo吗
incgo bool // m is executing a cgo call
freeWait uint32 // if == 0, safe to free g0 and delete m (atomic)
fastrand [2]uint32
needextram bool
traceback uint8
// 当前cgo调用的数目
ncgocall uint64 // number of cgo calls in total
// cgo调用的总数
ncgo int32 // number of cgo calls currently in progress
cgoCallersUse uint32 // if non-zero, cgoCallers in use temporarily
cgoCallers *cgoCallers // cgo traceback if crashing in cgo call
park note
// 用于链接allm
alllink *m // on allm
schedlink muintptr
// 当前m的内存缓存
mcache *mcache
// 锁定g在当前m上执行,而不会切换到其他m,一般cgo调用或者手动调用LockOSThread()才会有值
lockedg guintptr
// thread创建的栈
createstack [32]uintptr // stack that created this thread.
freglo [16]uint32 // d[i] lsb and f[i]
freghi [16]uint32 // d[i] msb and f[i+16]
fflag uint32 // floating point compare flags
// 用户锁定M的标记
lockedExt uint32 // tracking for external LockOSThread
// runtime 内部锁定M的标记
lockedInt uint32 // tracking for internal lockOSThread
nextwaitm muintptr // next m waiting for lock
waitunlockf unsafe.Pointer // todo go func(*g, unsafe.pointer) bool
waitlock unsafe.Pointer
waittraceev byte
waittraceskip int
startingtrace bool
syscalltick uint32
thread uintptr // thread handle
freelink *m // on sched.freem
// these are here because they are too large to be on the stack
// of low-level NOSPLIT functions.
libcall libcall
libcallpc uintptr // for cpu profiler
libcallsp uintptr
libcallg guintptr
syscall libcall // stores syscall parameters on windows
mOS
}
M的新建
newmnewosprocnewosprocclonemstartmstartmstart
// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
// 创建一个新的m,它将从fn或者调度程序开始
// fn需要是静态的,而不是堆分配的闭包。
func newm(fn func(), _p_ *p) {
// 根据fn和p和绑定一个m对象
mp := allocm(_p_, fn)
// 设置当前m的下一个p为_p_
mp.nextp.set(_p_)
mp.sigmask = initSigmask
if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
// 我们处于锁定的M或可能由C启动的线程。此线程的内核状态可能很奇怪(用户可能已将其锁定为此目的)。
// 我们不想将其克隆到另一个线程中。相反,请求一个已知良好的线程为我们创建线程。
lock(&newmHandoff.lock)
if newmHandoff.haveTemplateThread == 0 {
throw("on a locked thread with no template thread")
}
mp.schedlink = newmHandoff.newm
newmHandoff.newm.set(mp)
if newmHandoff.waiting {
newmHandoff.waiting = false
notewakeup(&newmHandoff.wake)
}
unlock(&newmHandoff.lock)
return
}
// 真正的分配os thread
newm1(mp)
}
func newm1(mp *m) {
// 对cgo的处理
...
execLock.rlock() // Prevent process clone.
// 创建一个系统线程,并且传入该 mp 绑定的 g0 的栈顶指针
// 让系统线程执行 mstart 函数,后面的逻辑都在 mstart 函数中
newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
execLock.runlock()
}
newosproc
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrier
// 分配一个系统线程,且完成 g0 和 g0上的栈分配
// 传入 mstart 函数,让线程执行 mstart
func newosproc(mp *m, stk unsafe.Pointer) {
...
// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
// stk 是 g0.stack.hi,也就是说 g0 的堆栈是当前这个系统线程的堆栈,也被称为系统堆栈
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
...
}
辛勤工作的M
schedulescheduleschedulefindrunnable
func schedule() {
_g_ := getg()
...
top:
// 如果当前GC需要停止整个世界(STW), 则调用gcstopm休眠当前的M
if sched.gcwaiting != 0 {
// 为了STW,停止当前的M
gcstopm()
// STW结束后回到 top
goto top
}
...
var gp *g
var inheritTime bool
...
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 每隔61次调度,尝试从全局队列种获取G
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 从p的本地队列中获取
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
// 想尽办法找到可运行的G,找不到就不用返回了
gp, inheritTime = findrunnable() // blocks until work is available
}
...
// println("execute goroutine", gp.goid)
// 找到了g,那就执行g上的任务函数
execute(gp, inheritTime)
}
findrunnableGC markgcBgMarkWorkergcBgMarkWorkerstopm
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
// 找到一个可以运行的G,不找到就让M休眠,然后等待唤醒,直到找到一个G返回
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
// 此处和handoffp中的条件必须一致:如果findrunnable将返回G运行,则handoffp必须启动M.
top:
// 当前m绑定的p
_p_ := _g_.m.p.ptr()
...
// local runq
// 再尝试从本地队列中获取G
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 尝试从全局队列中获取G
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从网络IO轮询器中找到就绪的G,把这个G变为可运行的G
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if gp := netpoll(false); gp != nil { // non-blocking
// netpoll returns list of goroutines linked by schedlink.
// 如果找到的可运行的网络IO的G列表,则把相关的G插入全局队列
injectglist(gp.schedlink.ptr())
// 更改G的状态为_Grunnable,以便下次M能找到这些G来执行
casgstatus(gp, _Gwaiting, _Grunnable)
// goroutine trace事件记录-unpark
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// Steal work from other P's.
procs := uint32(gomaxprocs)
// 如果其他P都是空闲的,就不从其他P哪里偷取G了
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop
}
// 如果当前的M没在自旋 且 空闲P的数目小于正在自旋的M个数的2倍,那么让该M进入自旋状态
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
// 如果M为非自旋,那么设置为自旋状态
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// 随机选一个P,尝试从这P中偷取一些G
for i := 0; i < 4; i++ { // 尝试四次
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 从allp[enum.position()]偷去一半的G,并返回其中的一个
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
// 当前的M找不到G来运行。如果此时P处于 GC mark 阶段
// 那么此时可以安全的扫描和黑化对象,和返回 gcBgMarkWorker 来运行
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
// 设置gcMarkWorkerMode 为 gcMarkWorkerIdleMode
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
// 获取gcBgMarkWorker goroutine
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp
// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
// 再次从全局队列中获取G
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
// 将当前对M和P解绑
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
// 将p放入p空闲链表
pidleput(_p_)
unlock(&sched.lock)
wasSpinning := _g_.m.spinning
// M取消自旋状态
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
// 再次检查所有的P,有没有可以运行的G
for _, _p_ := range allpSnapshot {
// 如果p的本地队列有G
if !runqempty(_p_) {
lock(&sched.lock)
// 获取另外一个空闲P
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
// 如果P不是nil,将M绑定P
acquirep(_p_)
// 如果是自旋,设置M为自旋
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// 返回到函数开头,从本地p获取G
goto top
}
break
}
}
// Check for idle-priority GC work again.
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
lock(&sched.lock)
_p_ = pidleget()
if _p_ != nil && _p_.gcBgMarkWorker == 0 {
pidleput(_p_)
_p_ = nil
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// Go back to idle GC check.
goto stop
}
}
// poll network
// 再次检查netpoll
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
gp := netpoll(true) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if gp != nil {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
injectglist(gp.schedlink.ptr())
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(gp)
}
}
// 实在找不到G,那就休眠吧
// 且此时的M一定不是自旋状态
stopm()
goto top
}
M的状态
mstart
|
v
+------+ 找不到可执行任务 +-------+
|unspin| ----------------------------> |spining|
| | <---------------------------- | |
+------+ 找到可执行任务 +-------+
^ | stopm
| wakep v
notewakeup <------------------------- notesleep
详见下面的解释。
M的管理
调度器的思想是尽量复用线程,它需要在保持足够多的运行工作线程以利用可用的硬件并行性和运行过多的工作线程之间取得平衡。 所以调度器就需要管理什么时候启动M,什么时候停掉M(休眠),但是需要注意的是M一旦在runtime里创建,是不会销毁的,具体可以看issue_14592。
自旋findrunnablem.spinningsched.nmspinningm.spinningsched.nmspinningfindrunnable
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
//
// 如果当前的M没在自旋 且 正在自旋的M个数的2倍>=正在忙的p的个数时,不让该M进入自旋状态
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
// 如果M为非自旋,那么设置为自旋状态
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
...
wasSpinning := _g_.m.spinning
// M取消自旋状态
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
// 再次检查所有的P,有没有可以运行的G
for _, _p_ := range allpSnapshot {
// 如果p的本地队列有G
if !runqempty(_p_) {
lock(&sched.lock)
// 获取另外一个空闲P
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
// 如果P不是nil,将M绑定P
acquirep(_p_)
// 如果是自旋,设置M为自旋
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// 返回到函数开头,从本地p获取G
goto top
}
break
}
}
...
// 实在找不到G,那就休眠吧
// 且此时的M一定不是自旋状态
stopm()
goto top
sched.nmspinning*2 >= (procs -sched.npidle)
stopmwakepwakep
wakep
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
// 尝试获取一个M来运行可运行的G
func wakep() {
// be conservative about spinning threads
// 如果有其他的M处于自旋状态,那么就不管了,直接返回
// 因为自旋的M会拼命找G来运行的,就不新找一个M(劳动者)来运行了。
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
新建goroutine的相关代码:
// 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
// 唤醒M或者新建M
wakep()
}
goroutine准备好相关代码:
// Mark gp ready to run.
// 将gp的状态更改为_Grunnable,以便调度器调度执行
// 并且如果next==true,那么设置为优先级最高,并尝试wakep
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
_g_.m.locks++ // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
// 如果有空闲P且没有自旋的M。
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
// 唤醒M或者新建M
wakep()
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
}
}
startm
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
// startm是启动一个M,先尝试获取一个空闲P,如果获取不到则返回
// 获取到P后,在尝试获取M,如果获取不到就新建一个M
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
// 如果P为nil,则尝试获取一个空闲P
if _p_ == nil {
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
// 获取一个空闲的M
mp := mget()
unlock(&sched.lock)
if mp == nil {
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
// 如果获取不到,则新建一个,新建完成后就立即返回
newm(fn, _p_)
return
}
// 到这里表示获取到了一个空闲M
if mp.spinning { // 从midle中获取的mp,不应该是spinning状态,获取的都是经过stopm的,stopm之前都会推出spinning
throw("startm: m is spinning")
}
if mp.nextp != 0 { // 这个位置是要留给参数_p_的,stopm中如果被唤醒,则关联nextp和m
throw("startm: m has p")
}
if spinning && !runqempty(_p_) { // spinning状态的M是在本地和全局都获取不到工作的情况,不能与spinning语义矛盾
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning //标记该M是否在自旋
mp.nextp.set(_p_) // 暂存P
notewakeup(&mp.park) // 唤醒M
}
startm_p_nil_p_nilhandoffp
抢占的实现
sysmon
_Psyscallsysmon
"all goroutines are asleep - deadlock!"
sysmonsysmonretake
// Always runs without a P, so write barriers are not allowed.
func sysmon() {
lock(&sched.lock)
sched.nmsys++
// 判断程序是否死锁
checkdead()
unlock(&sched.lock)
...
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
// 休眠delay us
usleep(delay)
...
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
// 如果超过10ms都没进行 netpoll ,那么强制执行一次 netpoll,
// 并且如果获取到了可运行的G,那么插入全局列表。
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // non-blocking - returns list of goroutines
if gp != nil {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(gp)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
// 抢夺阻塞时间长的syscall的G
if retake(now) != 0 {
idle = 0
} else {
idle++
}
...
}
}
retake
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
} else if s == _Prunning {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
if pd.schedwhen+forcePreemptNS > now {
continue
}
preemptone(_p_)
}
}
unlock(&allpLock)
return uint32(n)
}
retakehandoffpstartmp.status
preemptonegp.preempt = truegoschedImpl_Grunnablemorestack() -> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()
自此抢占原理已明了,第一种情况是为了防止系统调用的G,导致P里面的其他G无法调度,也就是饿死的状态。 第二种情况是为何防止一个G运行太长时间,而导致P里面的其他G饿死的状态。
调度跟踪信息可视化
开启 schedtrace
SCHED 0ms: gomaxprocs=4 idleprocs=2 threads=4 spinningthreads=1 idlethreads=0 runqueue=0 [0 0 0 0]
SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 idlethreads=2 runqueue=0 [189 197 231 142]
SCHED 2004ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 idlethreads=1 runqueue=0 [54 45 38 86]
SCHED 3011ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 idlethreads=2 runqueue=2 [85 0 67 111]
SCHED 4018ms: gomaxprocs=4 idleprocs=3 threads=9 spinningthreads=0 idlethreads=4 runqueue=0 [0 0 0 0]
SCHED 0ms: gomaxprocs=4 idleprocs=3 threads=3 spinningthreads=0 idlethreads=0 runqueue=0 gcwaiting=0 nmidlelocked=0 stopwait=0 sysmonwait=0
P0: status=1 schedtick=0 syscalltick=0 m=0 runqsize=0 gfreecnt=0
P1: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
P2: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
P3: status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
M2: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=1 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1
M1: p=-1 curg=17 mallocing=0 throwing=0 preemptoff= locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=17
M0: p=0 curg=1 mallocing=0 throwing=0 preemptoff= locks=2 dying=0 helpgc=0 spinning=false blocked=false lockedg=1
G1: status=2(stack growth) m=0 lockedm=0
G17: status=3() m=1 lockedm=1
G2: status=1() m=-1 lockedm=-1