1 Runtime简介
Go语言是互联网时代的C,因为其语法简洁易学,对高并发拥有语言级别的亲和性。而且不同于虚拟机的方案。Go通过在编译时嵌入平台相关的系统指令可直接编译为对应平台的机器码,同时嵌入Go Runtime,在运行时实现自身的调度算法和各种并发控制方案,避免进入操作系统级别的进程/线程上下文切换,以及通过原子操作、自旋、信号量、全局哈希表、等待队列多种技术避免进入操作系统级别锁,以此来提升整体性能。
Go的runtime是与用户代码一起打包在一个可执行文件中,是程序的一部分,而不是向Java需要单独安装,与程序独立。所以用户代码与runtime代码在执行时没有界限都是函数调用。在Go语言中的关键字编译时会变成runtime中的函数调用。
Go Runtime核心主要涉及三大部分:内存分配、调度算法、垃圾回收;本篇文章我们主要介绍GMP调度原理。关于具体应该叫GPM还是GMP,我更倾向于成为GMP,因为在runtime代码中经常看到如下调用:
1 buf := &getg().m.p.ptr().wbBuf
其中getg代表获取当前正在运行的g即goroutine,m代表对应的逻辑处理器,p是逻辑调度器;所以我们还是称为GMP。
2 GMP概览
下面这个图虽然有些抽象(不如花花绿绿的图片),确是目前我看到对整个调度算法设计的重要概念覆盖最全的。
1 +-------------------- sysmon ---------------//------+
2 | |
3 | |
4 +---+ +---+-------+ +--------+ +---+---+
5 go func() ---> | G | ---> | P | local | <=== balance ===> | global | <--//--- | P | M |
6 +---+ +---+-------+ +--------+ +---+---+
7 | | |
8 | +---+ | |
9 +----> | M | <--- findrunnable ---+--- steal <--//--+
10 +---+
11 |
12 mstart
13 |
14 +--- execute <----- schedule
15 | |
16 | |
17 +--> G.fn --> goexit --+
我们来看下其中的三大主要概念:
- G:Groutine协程,拥有运行函数的指针、栈、上下文(指的是sp、bp、pc等寄存器上下文以及垃圾回收的标记上下文),在整个程序运行过程中可以有无数个,代表一个用户级代码执行流(用户轻量级线程);
- P:Processor,调度逻辑处理器,同样也是Go中代表资源的分配主体(内存资源、协程队列等),默认为机器核数,可以通过GOMAXPROCS环境变量调整
- M:Machine,代表实际工作的执行者,对应到操作系统级别的线程;M的数量会比P多,但不会太多,最大为1w个。
其中G分为三类:
- 主协程,用来执行用户main函数的协程
- 主协程创建的协程,也是P调度的主要成员
- G0,每个M都有一个G0协程,他是runtime的一部分,G0是跟M绑定的,主要用来执行调度逻辑的代码,所以不能被抢占也不会被调度(普通G也可以执行runtime_procPin禁止抢占),G0的栈是系统分配的,比普通的G栈(2KB)要大,不能扩容也不能缩容
- sysmon协程,sysmon协程也是runtime的一部分,sysmon协程直接运行在M不需要P,主要做一些检查工作如:检查死锁、检查计时器获取下一个要被触发的计时任务、检查是否有ready的网络调用以恢复用户G的工作、检查一个G是否运行时间太长进行抢占式调度。
M分为两类:
- 普通M,用来与P绑定执行G中任务
- m0:Go程序是一个进程,进程都有一个主线程,m0就是Go程序的主线程,通过一个与其绑定的G0来执行runtime启动加载代码;一个Go程序只有一个m0
- 运行sysmon的M,主要用来运行sysmon协程。
刚才说道P是用来调度G的执行,所以每个P都有自己的一个G的队列,当G队列都执行完毕后,会从global队列中获取一批G放到自己的本地队列中,如果全局队列也没有待运行的G,则P会再从其他P中窃取一部分G放到自己的队列中。而调度的时机一般有三种:
runtime.Goshed
Go的协程调度与操作系统线程调度区别主要存在四个方面:
- 调度发生地点:Go中协程的调度发生在runtime,属于用户态,不涉及与内核态的切换;一个协程可以被切换到多个线程执行
- 上下文切换速度:协程的切换速度远快于线程,不需要经过内核与用户态切换,同时需要保存的状态和寄存器非常少;线程切换速度为1-2微秒,协程切换速度为0.2微秒左右
- 调度策略:线程调度大部分都是抢占式调度,操作系统通过发出中断信号强制线程切换上下文;Go的协程基本是主动和被动式调度,调度时机可预期
- 栈大小:线程栈一般是2MB,而且运行时不能更改大小;Go的协程栈只有2kb,而且可以动态扩容(64位机最大为1G)
以上基本是整个调度器的概括,不想看原理的同学可以不用往下看了,下面会进行源码级介绍;
3 GMP的源码结构
源码部分主要涉及三个文件:
1 runtime/amd_64.s 涉及到进程启动以及对CPU执行指令进行控制的汇编代码,进程的初始化部分也在这里面
2 runtime/runtime2.go 这里主要是运行时中一些重要数据结构的定义,比如g、m、p以及涉及到接口、defer、panic、map、slice等核心类型
3 runtime/proc.go 一些核心方法的实现,涉及gmp调度等核心代码在这里
这里我们主要关心gmp中与调度相关的代码;
3.1 G源码部分
3.1.1 G的结构
先来看下g的结构定义:
1 // runtime/runtime2.go
2 type g struct {
3 // 记录协程栈的栈顶和栈底位置
4 stack stack // offset known to runtime/cgo
5 // 主要作用是参与一些比较计算,当发现容量要超过栈分配空间后,可以进行扩容或者收缩
6 stackguard0 uintptr // offset known to liblink
7 stackguard1 uintptr // offset known to liblink
8
9 // 当前与g绑定的m
10 m *m // current m; offset known to arm liblink
11 // 这是一个比较重要的字段,里面保存的一些与goroutine运行位置相关的寄存器和指针,如rsp、rbp、rpc等寄存器
12 sched gobuf
13 syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
14 syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
15 stktopsp uintptr // expected sp at top of stack, to check in traceback
16
17 // 用于做参数传递,睡眠时其他goroutine可以设置param,唤醒时该g可以读取这些param
18 param unsafe.Pointer
19 // 记录当前goroutine的状态
20 atomicstatus uint32
21 stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
22 // goroutine的唯一id
23 goid int64
24 schedlink guintptr
25
26 // 标记是否可以被抢占
27 preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
28 preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule
29 preemptShrink bool // shrink stack at synchronous safe point
30
31 // 如果调用了LockOsThread方法,则g会绑定到某个m上,只在这个m上运行
32 lockedm muintptr
33 sig uint32
34 writebuf []byte
35 sigcode0 uintptr
36 sigcode1 uintptr
37 sigpc uintptr
38 // 创建该goroutine的语句的指令地址
39 gopc uintptr // pc of go statement that created this goroutine
40 ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
41 // goroutine函数的指令地址
42 startpc uintptr // pc of goroutine function
43 racectx uintptr
44 waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
45 cgoCtxt []uintptr // cgo traceback context
46 labels unsafe.Pointer // profiler labels
47 timer *timer // cached timer for time.Sleep
48 selectDone uint32 // are we participating in a select and did someone win the race?
49 }
跟g相关的还有两个数据结构比较重要:
stack是协程栈的地址信息,需要注意的是m0绑定的g0是在进程被分配的系统栈上分配协程栈的,而其他协程栈都是在堆上进行分配的。
gobuf中保存了协程执行的上下文信息,这里也可以看到协程切换的上下文信息极少;sp代表cpu的rsp寄存器的值,pc代表CPU的rip寄存器值、bp代表CPU的rbp寄存器值;ret用来保存系统调用的返回值,ctxt在gc的时候使用。
其中几个寄存器作用如下:
- SP:永远指向栈顶位置
- BP:某一时刻的栈顶位置,当新函数调用时,把当前SP地址赋值给BP、SP指向新的栈顶位置
- PC:代表代码经过编译为机器码后,当前执行的机器指令(可以理解为当前语句)
1 // Stack describes a Go execution stack.
2 // The bounds of the stack are exactly [lo, hi),
3 // with no implicit data structures on either side.
4 // goroutine协程栈的栈顶和栈底
5 type stack struct {
6 lo uintptr // 栈顶,低地址
7 hi uintptr // 栈底,高地址
8 }
9
10 // gobuf中保存了非常重要的上下文执行信息,
11 type gobuf struct {
12 // 代表cpu的rsp寄存器的值,永远指向栈顶位置
13 sp uintptr
14 // 代表代码经过编译为机器码后,当前执行的机器指令(可以理解为当前语句)
15 pc uintptr
16 // 指向所保存执行上下文的goroutine
17 g guintptr
18 // gc时候使用
19 ctxt unsafe.Pointer
20 // 用来保存系统调用的返回值
21 ret uintptr
22 lr uintptr
23 // 某一时刻的栈顶位置,当新函数调用时,把当前SP地址赋值给BP、SP指向新的栈顶位置
24 bp uintptr // for framepointer-enabled architectures
25 }
3.1.2 G的状态
就像线程有自己的状态一样,goroutine也有自己的状态,主要记录在atomicstatus字段上:
1 // defined constants
2 const (
3 // 代表协程刚开始创建时的状态,当新创建的协程初始化后,为变为_Gdead状态,_Gdread也是协程被销毁时的状态;
4 // 刚创建时也被会置为_Gdead主要是考虑GC可以去用去扫描dead状态下的协程栈
5 _Gidle = iota // 0
6 // 代表协程正在运行队列中,等待被运行
7 _Grunnable // 1
8 // 代表当前协程正在被运行,已经被分配了逻辑处理的线程,即p和m
9 _Grunning // 2
10 // 代表当前协程正在执行系统调用
11 _Gsyscall // 3
12 // 表示当前协程在运行时被锁定,陷入阻塞,不能执行用户代码
13 _Gwaiting // 4
14
15 _Gmoribund_unused // 5
16 // 新创建的协程初始化后,或者协程被销毁后的状态
17 _Gdead // 6
18
19 // _Genqueue_unused is currently unused.
20 _Genqueue_unused // 7
21 // 代表在进行协程栈扫描时发现需要扩容或者缩容,将协程中的栈转移到新栈时的状态;这个时候不执行用户代码,也不在p的runq中
22 _Gcopystack // 8
23
24 // 代表g被抢占后的状态
25 _Gpreempted // 9
26
27 // 这几个状态是垃圾回收时涉及,后续文章进行介绍
28 _Gscan = 0x1000
29 _Gscanrunnable = _Gscan + _Grunnable // 0x1001
30 _Gscanrunning = _Gscan + _Grunning // 0x1002
31 _Gscansyscall = _Gscan + _Gsyscall // 0x1003
32 _Gscanwaiting = _Gscan + _Gwaiting // 0x1004
33 _Gscanpreempted = _Gscan + _Gpreempted // 0x1009
34 )
这里是利用常量定义的枚举。
Go的状态变更可以看下图:
3.1.3 G的创建
当我们使用go关键字新建一个goroutine时,编译器会编译为runtime中对应的函数调用(newproc,而go 关键字后面的函数成为协程的任务函数),进行创建,整体步骤如下:
用 systemstack 切换到系统堆栈,调用 newproc1 ,newproc1 实现g的获取。
尝试从p的本地g空闲链表和全局g空闲链表找到一个g的实例。
如果上面未找到,则调用 malg 生成新的g的实例,且分配好g的栈和设置好栈的边界,接着添加到 allgs 数组里面,allgs保存了所有的g。
保存g切换的上下文,这里很关键,g的切换依赖 sched 字段。
生成唯一的goid,赋值给该g。
调用 runqput 将g插入队列中,如果本地队列还有剩余的位置,将G插入本地队列的尾部,若本地队列已满,插入全局队列。
如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒或新建某个m来执行任务。
这里对应的是newproc函数:
1 func newproc(siz int32, fn *funcval) {
2 argp := add(unsafe.Pointer(&fn), sys.PtrSize)
3 gp := getg()
4 // 获取调用者的指令地址,也就是调用newproc时又call指令压栈的函数返回地址
5 pc := getcallerpc()
6 // systemstack的作用是切换到m0对应的g0所属的系统栈
7 // 使用g0所在的系统栈创建goroutine对象
8 // 传递参数包括goroutine的任务函数、argp参数起始地址、siz是参数长度、调用方的pc指针
9 systemstack(func() {
10 newg := newproc1(fn, argp, siz, gp, pc)
11 // 创建完成后将g放到创建者(某个g,如果是进程初始化启动阶段则为g0)所在的p的队列中
12 _p_ := getg().m.p.ptr()
13 runqput(_p_, newg, true)
14
15 if mainStarted {
16 wakep()
17 }
18 })
19 }
其中systemstack是一段汇编代码,位于asm_amd64.s文件中,主要是寄存器指令的操作,笔者不懂汇编这里先不做介绍。
newproc1是获取newg的函数,主要步骤:
1、首先防止当前g被抢占,绑定m
2、对传入的参数占用的内存空间进行对齐处理
3、从p的空闲队列中获取一个空闲的g,如果么有就创建一个g,并在堆上创建协程栈,并设置状态为_Gdead添加到全局allgs中
4、计算整体协程任务函数的参数空间大小,并设置sp指针
5、执行参数从getg的堆栈到newg堆栈的复制
6、设置newg的sched和startpc、gopc等跟上下文相关的字段值
7、设置newg状态为runable并设置goid
8、接触getg与m的防抢占状态
代码注释如下:
1 func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
2 .....
3 // 如果是初始化时候这个代表g0
4 _g_ := getg()
5
6 if fn == nil {
7 _g_.m.throwing = -1 // do not dump full stacks
8 throw("go of nil func value")
9 }
10 // 使_g_.m.locks++,来防止这个时候g对应的m被抢占
11 acquirem() // disable preemption because it can be holding p in a local var
12 // 参数的地址,下面一句目的是为了做到内存对齐
13 siz := narg
14 siz = (siz + 7) &^ 7
15
16 // We could allocate a larger initial stack if necessary.
17 // Not worth it: this is almost always an error.
18 // 4*PtrSize: extra space added below
19 // PtrSize: caller's LR (arm) or return address (x86, in gostartcall).
20 if siz >= _StackMin-4*sys.PtrSize-sys.PtrSize {
21 throw("newproc: function arguments too large for new goroutine")
22 }
23
24 _p_ := _g_.m.p.ptr()
25 newg := gfget(_p_) // 首先从p的gfree队列中看看有没有空闲的g,有则使用
26 if newg == nil {
27 // 如果没找到就使用new关键字来创建一个g并在堆上分配栈空间
28 newg = malg(_StackMin)
29 // 将newg的状态设置为_Gdead,因为这样GC就不会去扫描一个没有初始化的协程栈
30 casgstatus(newg, _Gidle, _Gdead)
31 // 添加到全局的allg切片中(需要加锁访问)
32 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
33 }
34 // 下面是检查协程栈的创建情况和状态
35 if newg.stack.hi == 0 {
36 throw("newproc1: newg missing stack")
37 }
38
39 if readgstatus(newg) != _Gdead {
40 throw("newproc1: new g is not Gdead")
41 }
42 // 计算运行空间大小并进行内存对齐
43 totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
44 totalSize += -totalSize & (sys.StackAlign - 1) // align to StackAlign
45 // 计算sp寄存器指针的位置
46 sp := newg.stack.hi - totalSize
47 // 确定参数入栈位置
48 spArg := sp
49 .........
50 if narg > 0 {
51 // 将参数从newproc函数的栈复制到新的协程的栈中,memove是一段汇编代码
52 // 从argp位置挪动narg大小的内存到sparg位置
53 memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
54 // 因为涉及到从栈到堆栈上的复制,go在垃圾回收中使用了三色标记和写入屏障等手段,所以这里要考虑屏障复制
55 // 目标栈可能会有垃圾存在,所以设置屏障并且标记为灰色
56 if writeBarrier.needed && !_g_.m.curg.gcscandone { // 如果启用了写入屏障并且源堆栈为灰色(目标始终为黑色),则执行屏障复制。
57 f := findfunc(fn.fn)
58 stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
59 if stkmap.nbit > 0 {
60 // We're in the prologue, so it's always stack map index 0.
61 bv := stackmapdata(stkmap, 0)
62 bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
63 }
64 }
65 }
66 // 把newg的sched结构体成员的所有字段都设置为0,其实就是初始化
67 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
68 newg.sched.sp = sp
69 newg.stktopsp = sp
70 // pc指针表示当newg被调度起来时从这个位置开始执行
71 // 这里是先设置为goexit,在gostartcallfn中会进行处理,更改sp为这里的pc,将pc改为真正的协程任务函数fn的指令位置
72 // 这样使得任务函数执行完毕后,会继续执行goexit中相关的清理工作
73 newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
74 newg.sched.g = guintptr(unsafe.Pointer(newg)) // 保存当前的g
75 gostartcallfn(&newg.sched, fn) // 在这里完成g启动时所有相关上下文指针的设置,主要为sp、pc和ctxt,ctxt被设置为fn
76 newg.gopc = callerpc // 保存newproc的pc,即调用者创建时的指令位置
77 newg.ancestors = saveAncestors(callergp)
78 // 设置startpc为任务函数,主要用于函数调用栈的trackback和栈收缩工作
79 // newg的执行开始位置并不依赖这个字段,而是通过sched.pc确定
80 newg.startpc = fn.fn
81 if _g_.m.curg != nil {
82 newg.labels = _g_.m.curg.labels
83 }
84 // 判断newg的任务函数是不是runtime系统的任务函数,是则sched.ngsys+1;
85 // 主协程则代表runtime.main函数,在这里就为判断为真
86 if isSystemGoroutine(newg, false) {
87 atomic.Xadd(&sched.ngsys, +1)
88 }
89 // Track initial transition?
90 newg.trackingSeq = uint8(fastrand())
91 if newg.trackingSeq%gTrackingPeriod == 0 {
92 newg.tracking = true
93 }
94 // 更改当前g的状态为_Grunnable
95 casgstatus(newg, _Gdead, _Grunnable)
96 // 设置g的goid,因为p会每次批量生成16个id,每次newproc如果新建一个g,id就会加1
97 // 所以这里m0的g0的id为0,而主协程的goid为1,其他的依次递增
98 if _p_.goidcache == _p_.goidcacheend {
99 // Sched.goidgen is the last allocated id,
100 // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
101 // At startup sched.goidgen=0, so main goroutine receives goid=1.
102 // 使用原子操作修改全局变量,这里的sched是在runtime2.go中的一个全局变量类型为schedt
103 // 原子操作具有多线程可见性,同时比加锁性能更高
104 _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
105 _p_.goidcache -= _GoidCacheBatch - 1
106 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
107 }
108 newg.goid = int64(_p_.goidcache)
109 _p_.goidcache++
110 if raceenabled {
111 newg.racectx = racegostart(callerpc)
112 }
113 if trace.enabled {
114 traceGoCreate(newg, newg.startpc)
115 }
116 // 释放getg与m的绑定
117 releasem(_g_.m)
118
119 return newg
120 }
其中有几个关键地方需要强调
3.1.4 协程栈在堆空间的分配
malg函数,用来创建一个新g和对应的栈空间分配,这个函数主要强调的是栈空间分配部分,通过切换到系统栈上进行空间分配,分配完后设置栈底和栈顶的两个位置的保护字段,当栈上进行分配变量空间发现超过stackguard1时,会进行扩容,同时在某些条件下也会进行缩容
1 // Allocate a new g, with a stack big enough for stacksize bytes.
2 func malg(stacksize int32) *g {
3 newg := new(g)
4 if stacksize >= 0 {
5 stacksize = round2(_StackSystem + stacksize)
6 systemstack(func() {
7 newg.stack = stackalloc(uint32(stacksize))
8 })
9 newg.stackguard0 = newg.stack.lo + _StackGuard
10 newg.stackguard1 = ^uintptr(0)
11 // Clear the bottom word of the stack. We record g
12 // there on gsignal stack during VDSO on ARM and ARM64.
13 *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
14 }
15 return newg
16 }
stackalloc代码位于runtime/stack.go文件中;
协程栈首先在进程初始化时会创建栈的管理结构:
1、栈池stackpool,这个栈池主要用来对大小为2、4、8kb的小栈做缓存使用,使用的同样是mspan这种结构来存储;
2、为大栈分配的stackLarge
1 OS | FixedStack | NumStackOrders
2 -----------------+------------+---------------
3 linux/darwin/bsd | 2KB | 4
4 windows/32 | 4KB | 3
5 windows/64 | 8KB | 2
6 plan9 | 4KB | 3
7
8 // Global pool of spans that have free stacks.
9 // Stacks are assigned an order according to size.
10 // order = log_2(size/FixedStack)
11 // There is a free list for each order.
12 var stackpool [_NumStackOrders]struct {
13 item stackpoolItem
14 _ [cpu.CacheLinePadSize - unsafe.Sizeof(stackpoolItem{})%cpu.CacheLinePadSize]byte
15 }
16
17 //go:notinheap
18 type stackpoolItem struct {
19 mu mutex
20 span mSpanList
21 }
22
23 // Global pool of large stack spans.
24 var stackLarge struct {
25 lock mutex
26 free [heapAddrBits - pageShift]mSpanList // free lists by log_2(s.npages)
27 }
28
29 func stackinit() {
30 if _StackCacheSize&_PageMask != 0 {
31 throw("cache size must be a multiple of page size")
32 }
33 for i := range stackpool {
34 stackpool[i].item.span.init()
35 lockInit(&stackpool[i].item.mu, lockRankStackpool)
36 }
37 for i := range stackLarge.free {
38 stackLarge.free[i].init()
39 lockInit(&stackLarge.lock, lockRankStackLarge)
40 }
41 }
stackalloc会首先判断栈空间大小,是大栈还是固定空间的小栈,
1、对于小栈,如果是还没有分配栈缓存空间,则进入stackpoolalloc函数进行分配空间(需要加锁),这里最终是从全局的mheap也就是堆空间中获取内存空间;如果有栈缓存空间,则从g对应的mcache中的stackcache上获取内存空间(无锁),如果stackcache上没有足够空间则调用stackcacherefill方法为stackpool进行扩容(也是从mheap中拿取,加锁)然后分配给协程
2、对于大栈,先从stackLarge中获取,如果没有则从mheap中获取,两个步骤都需要加载访问;
3、最后创建stack结构返回给newg
1 func stackalloc(n uint32) stack {
2 // Stackalloc must be called on scheduler stack, so that we
3 // never try to grow the stack during the code that stackalloc runs.
4 // Doing so would cause a deadlock (issue 1547).
5 thisg := getg()
6 .........
7
8 // Small stacks are allocated with a fixed-size free-list allocator.
9 // If we need a stack of a bigger size, we fall back on allocating
10 // a dedicated span.
11 var v unsafe.Pointer
12 if n < _FixedStack<<_NumStackOrders && n < _StackCacheSize {
13 order := uint8(0)
14 n2 := n
15 for n2 > _FixedStack {
16 order++
17 n2 >>= 1
18 }
19 var x gclinkptr
20 if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" {
21 // thisg.m.p == 0 can happen in the guts of exitsyscall
22 // or procresize. Just get a stack from the global pool.
23 // Also don't touch stackcache during gc
24 // as it's flushed concurrently.
25 lock(&stackpool[order].item.mu)
26 x = stackpoolalloc(order)
27 unlock(&stackpool[order].item.mu)
28 } else {
29 c := thisg.m.p.ptr().mcache
30 x = c.stackcache[order].list
31 if x.ptr() == nil {
32 stackcacherefill(c, order)
33 x = c.stackcache[order].list
34 }
35 c.stackcache[order].list = x.ptr().next
36 c.stackcache[order].size -= uintptr(n)
37 }
38 v = unsafe.Pointer(x)
39 } else {
40 var s *mspan
41 npage := uintptr(n) >> _PageShift
42 log2npage := stacklog2(npage)
43
44 // Try to get a stack from the large stack cache.
45 lock(&stackLarge.lock)
46 if !stackLarge.free[log2npage].isEmpty() {
47 s = stackLarge.free[log2npage].first
48 stackLarge.free[log2npage].remove(s)
49 }
50 unlock(&stackLarge.lock)
51
52 lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)
53
54 if s == nil {
55 // Allocate a new stack from the heap.
56 s = mheap_.allocManual(npage, spanAllocStack)
57 if s == nil {
58 throw("out of memory")
59 }
60 osStackAlloc(s)
61 s.elemsize = uintptr(n)
62 }
63 v = unsafe.Pointer(s.base())
64 }
65
66 if raceenabled {
67 racemalloc(v, uintptr(n))
68 }
69 if msanenabled {
70 msanmalloc(v, uintptr(n))
71 }
72 if stackDebug >= 1 {
73 print(" allocated ", v, "\n")
74 }
75 return stack{uintptr(v), uintptr(v) + uintptr(n)}
76 }
非g0的g为什么要在堆上分配空间?
虽然堆不如栈快,但是goroutine是go模拟的线程,具有动态扩容和缩容的能力,而系统栈是线性空间,在系统栈上发生缩容和扩容会存在空间不足或者栈空间碎片等问题;所以go这里在堆上分配协程栈;因为是在堆空间也就意味着这部分空间也需要进行垃圾回收和释放;所以Go的GC是多线程并发标记时,内存屏障是对整个协程栈标记灰色,来让回收器进行扫描。
3.1.5 G的上下文设置和切换
协程栈的切换主要是在两个地方,由执行调度逻辑的g0切换到执行用户逻辑的g的过程,以及执行用户逻辑的g退出或者被抢占切换为g0执行调度的过程,抢占在下文中介绍
上面代码中当newg被初始化时,会初始化sched中的pc和sp指针,其中会把pc先设置为goexit函数的第二条指令。
1 // 把newg的sched结构体成员的所有字段都设置为0,其实就是初始化
2 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
3 newg.sched.sp = sp
4 newg.stktopsp = sp
5 // pc指针表示当newg被调度起来时从这个位置开始执行
6 // 这里是先设置为goexit,在gostartcallfn中会进行处理,更改sp为这里的pc,将pc改为真正的协程任务函数fn的指令位置
7 // 这样使得任务函数执行完毕后,会继续执行goexit中相关的清理工作
8 newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
9 newg.sched.g = guintptr(unsafe.Pointer(newg)) // 保存当前的g
10 gostartcallfn(&newg.sched, fn) // 在这里完成g启动时所有相关上下文指针的设置,主要为sp、pc和ctxt,ctxt被设置为fn
11 newg.gopc = callerpc // 保存newproc的pc,即调用者创建时的指令位置
然后进入gostartcallfn函数,最终是在gostartcall函数中进行处理
1 // gostartcallfn 位于runtime/stack.go中
2
3 func gostartcallfn(gobuf *gobuf, fv *funcval) {
4 var fn unsafe.Pointer
5 if fv != nil {
6 fn = unsafe.Pointer(fv.fn)
7 } else {
8 fn = unsafe.Pointer(funcPC(nilfunc))
9 }
10 gostartcall(gobuf, fn, unsafe.Pointer(fv))
11 }
12
13 // gostartcall 位于runtime/sys_x86.go中
14
15 func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
16 // newg的栈顶,目前newg栈上只有fn函数的参数,sp指向的是fn的第一个参数
17 sp := buf.sp
18 // 为返回地址预留空间
19 sp -= sys.PtrSize
20 // buf.pc中设置的是goexit函数中的第二条指令
21 // 因为栈是自顶向下,先进后出,所以这里伪装fn是被goexit函数调用的,goexit在前fn在后
22 // 使得fn返回后到goexit继续执行,以完成一些清理工作。
23 *(*uintptr)(unsafe.Pointer(sp)) = buf.pc
24 buf.sp = sp // 重新设置栈顶
25 // 将pc指向goroutine的任务函数fn,这样当goroutine获得执行权时,从任务函数入口开始执行
26 // 如果是主协程,那么fn就是runtime.main,从这里开始执行
27 buf.pc = uintptr(fn)
28 buf.ctxt = ctxt
29 }
可以看到在newg初始化时进行的一系列设置工作,将goexit先压入栈顶,然后伪造sp位置,让cpu看起来是从goexit中调用的协程任务函数,然后将pc指针指向任务函数,当协程被执行时,从pc处开始执行,任务函数执行完毕后执行goexit;
这里是设置工作,具体的切换工作,需要经由schedule调度函数选中一个g,进入execute函数设置g的相关状态和栈保护字段等信息,然后进入gogo函数,通过汇编语言,将CPU寄存器以及函数调用栈切换为g的sched中相关指针和协程栈。gogo函数源码如下:
1 // gogo的具体汇编代码位于asm_amd64.s中
2
3 // func gogo(buf *gobuf)
4 // restore state from Gobuf; longjmp
5 TEXT runtime·gogo(SB), NOSPLIT, $0-8
6 // 0(FP)表示第一个参数,即buf=&gp.sched
7 MOVQ buf+0(FP), BX // gobuf
8 // DX = gp.sched.g,DX代表数据寄存器
9 MOVQ gobuf_g(BX), DX
10 MOVQ 0(DX), CX // make sure g != nil
11 JMP gogo<>(SB)
12
13 TEXT gogo<>(SB), NOSPLIT, $0
14 // 将tls保存到CX寄存器
15 get_tls(CX)
16 // 下面这条指令把当前要运行的g(上面第9行中已经把go_buf中的g放入到了DX中),
17 // 放入CX寄存器的g位置即tls[0]这个位置,也就是线程的本地存储中,
18 // 这样下次runtime中调用getg时获取的就是这个g
19 MOVQ DX, g(CX)
20 MOVQ DX, R14 // set the g register
21 // 把CPU的SP寄存器设置为g.sched.sp这样就完成了栈的切换,从g0切换为g
22 MOVQ gobuf_sp(BX), SP // restore SP
23 // 将ret、ctxt、bp分别存入对应的寄存器,完成了CPU上下文的切换
24 MOVQ gobuf_ret(BX), AX
25 MOVQ gobuf_ctxt(BX), DX
26 MOVQ gobuf_bp(BX), BP
27 // 清空sched的值,相关值已经存入到寄存器中,这里清空后可以减少GC的工作量
28 MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
29 MOVQ $0, gobuf_ret(BX)
30 MOVQ $0, gobuf_ctxt(BX)
31 MOVQ $0, gobuf_bp(BX)
32 // 把sched.pc放入BX寄存器
33 MOVQ gobuf_pc(BX), BX
34 // JMP把BX的值放入CPU的IP寄存器,所以这时候CPU从该地址开始继续执行指令
35 JMP BX
AX、BX、CX、DX是8086处理器的4个数据寄存器,可以简单认为相当于4个硬件的变量;
上文总体来说,将g存入到tls中(线程的本地存储),设置SP和相关寄存器为g.sched中的字段(SP、ret、ctxt、bp),然后跳转到pc指针位置执行指令
3.1.6 G的退出处理
协程栈的退出需要分为两种情况,即运行main函数的主协程和普通的用户协程;
主协程的fn任务函数位于proc.go中的main函数中,对于主协程g.shched.pc指向的也是这个位置,这里会调用用户的mian函数(main_main),main_main运行完毕后,会调用exit(0)直接退出,而不会跑到goexit函数中。
1 // runtime/proc.go 中的main函数
2 // The main goroutine.
3 func main() {
4 g := getg()
5
6 .................
7 fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
8 fn()
9 ..................
10 ..................
11 exit(0)
12 ..................
13 }
用户协程因为将goexit作为协程栈栈底,所以当执行完协程任务函数时,会执行goexit函数,goexit是一段汇编指令:
1 // The top-most function running on a goroutine
2 // returns to goexit+PCQuantum.
3 TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0
4 BYTE $0x90 // NOP
5 CALL runtime·goexit1(SB) // does not return
6 // traceback from goexit1 must hit code range of goexit
7 BYTE $0x90 // NOP
这里直接调用goexit1,goexit1位于runtime/proc.go中
1 // Finishes execution of the current goroutine.
2 func goexit1() {
3 if raceenabled {
4 racegoend()
5 }
6 if trace.enabled {
7 traceGoEnd()
8 }
9 mcall(goexit0)
10 }
通过mcall调用goexit0,mcall是一段汇编代码它的作用是把执行的栈切换到g0的栈
1 TEXT runtime·mcall<ABIInternal>(SB), NOSPLIT, $0-8
2 MOVQ AX, DX // DX = fn
3
4 // save state in g->sched
5 // mcall返回地址放入BX中
6 MOVQ 0(SP), BX // caller's PC
7 // 下面部分是保存g的执行上下文,pc、sp、bp
8 // g.shced.pc = BX
9 MOVQ BX, (g_sched+gobuf_pc)(R14)
10 LEAQ fn+0(FP), BX // caller's SP
11 MOVQ BX, (g_sched+gobuf_sp)(R14)
12 MOVQ BP, (g_sched+gobuf_bp)(R14)
13
14 // switch to m->g0 & its stack, call fn
15 // 将g.m保存到BX寄存器中
16 MOVQ g_m(R14), BX
17 // 这段代码意思是从m结构体中获取g0字段保存到SI中
18 MOVQ m_g0(BX), SI // SI = g.m.g0
19 CMPQ SI, R14 // if g == m->g0 call badmcall
20 // goodm中完成了从g的栈切换到g0的栈
21 JNE goodm
22 JMP runtime·badmcall(SB)
23 goodm:
24 MOVQ R14, AX // AX (and arg 0) = g
25 MOVQ SI, R14 // g = g.m.g0
26 get_tls(CX) // Set G in TLS
27 MOVQ R14, g(CX)
28 MOVQ (g_sched+gobuf_sp)(R14), SP // sp = g0.sched.sp
29 PUSHQ AX // open up space for fn's arg spill slot
30 MOVQ 0(DX), R12
31 // 这里意思是调用goexit0(g)
32 CALL R12 // fn(g)
33 POPQ AX
34 JMP runtime·badmcall2(SB)
35 RET
goexit0代码位于runtime/proc.go中,他主要完成最后的清理工作:
1、把g的状态从——Gruning变为Gdead
2、清空g的一些字段
3、接触g与m的绑定关系,即g.m = nil;m.currg = nil
4、把g放入p的freeg队列中,下次创建g可以直接获取,而不用从内存分配
5、调用schedule进入下一次调度循环
1 // 这段代码执行在g0的栈上,gp是我们要处理退出的g的结构体指针
2 // goexit continuation on g0.
3 func goexit0(gp *g) {
4 _g_ := getg() // 获取g0
5 // 更改g的状态为_Gdead
6 casgstatus(gp, _Grunning, _Gdead)
7 if isSystemGoroutine(gp, false) {
8 atomic.Xadd(&sched.ngsys, -1)
9 }
10 // 清空g的一些字段
11 gp.m = nil
12 locked := gp.lockedm != 0
13 gp.lockedm = 0
14 _g_.m.lockedg = 0
15 gp.preemptStop = false
16 gp.paniconfault = false
17 gp._defer = nil // should be true already but just in case.
18 gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
19 gp.writebuf = nil
20 gp.waitreason = 0
21 gp.param = nil
22 gp.labels = nil
23 gp.timer = nil
24
25 if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
26 // Flush assist credit to the global pool. This gives
27 // better information to pacing if the application is
28 // rapidly creating an exiting goroutines.
29 assistWorkPerByte := float64frombits(atomic.Load64(&gcController.assistWorkPerByte))
30 scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))
31 atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
32 gp.gcAssistBytes = 0
33 }
34 // 接触g与m的绑定关系
35 dropg()
36
37 if GOARCH == "wasm" { // no threads yet on wasm
38 gfput(_g_.m.p.ptr(), gp)
39 schedule() // never returns
40 }
41
42 if _g_.m.lockedInt != 0 {
43 print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
44 throw("internal lockOSThread error")
45 }
46 // 将g加入p的空闲队列
47 gfput(_g_.m.p.ptr(), gp)
48 if locked {
49 // The goroutine may have locked this thread because
50 // it put it in an unusual kernel state. Kill it
51 // rather than returning it to the thread pool.
52
53 // Return to mstart, which will release the P and exit
54 // the thread.
55 if GOOS != "plan9" { // See golang.org/issue/22227.
56 gogo(&_g_.m.g0.sched)
57 } else {
58 // Clear lockedExt on plan9 since we may end up re-using
59 // this thread.
60 _g_.m.lockedExt = 0
61 }
62 }
63 // 执行下一轮调度
64 schedule()
65 }
3.2 P源码部分
3.2.1 P的结构
1 // runtime/runtime2.go
2
3 type p struct {
4 // 全局变量allp中的索引位置
5 id int32
6 // p的状态标识
7 status uint32 // one of pidle/prunning/...
8 link puintptr
9 // 调用schedule的次数,每次调用schedule这个值会加1
10 schedtick uint32 // incremented on every scheduler call
11 // 系统调用的次数,每次进行系统调用加1
12 syscalltick uint32 // incremented on every system call
13 // 用于sysmon协程记录被监控的p的系统调用时间和运行时间
14 sysmontick sysmontick // last tick observed by sysmon
15 // 指向绑定的m,p如果是idle状态这个值为nil
16 m muintptr // back-link to associated m (nil if idle)
17 // 用于分配微小对象和小对象的一个块的缓存空间,里面有各种不同等级的span
18 mcache *mcache
19 // 一个chunk大小(512kb)的内存空间,用来对堆上内存分配的缓存优化达到无锁访问的目的
20 pcache pageCache
21 raceprocctx uintptr
22
23 deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
24 deferpoolbuf [5][32]*_defer
25
26 // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
27 // 可以分配给g的id的缓存,每次会一次性申请16个
28 goidcache uint64
29 goidcacheend uint64
30
31 // Queue of runnable goroutines. Accessed without lock.
32 // 本地可运行的G队列的头部和尾部,达到无锁访问
33 runqhead uint32
34 runqtail uint32
35 // 本地可运行的g队列,是一个使用数组实现的循环队列
36 runq [256]guintptr
37 // 下一个待运行的g,这个g的优先级最高
38 // 如果当前g运行完后还有剩余可用时间,那么就应该运行这个runnext的g
39 runnext guintptr
40
41 // Available G's (status == Gdead)
42 // p上的空闲队列列表
43 gFree struct {
44 gList
45 n int32
46 }
47
48 ............
49 // 用于内存对齐
50 _ uint32 // Alignment for atomic fields below
51 .......................
52 // 是否被抢占
53 preempt bool
54
55 // Padding is no longer needed. False sharing is now not a worry because p is large enough
56 // that its size class is an integer multiple of the cache line size (for any of our architectures).
57 }
通过这里的结构可以看出,虽然P叫做逻辑处理器Processor,实际上它更多是资源的管理者,其中包含了可运行的g队列资源、内存分配的资源、以及对调度循环、系统调用、sysmon协程的相关记录。通过P的资源管理来尽量实现无锁访问,提升应用性能。
3.2.2 P的状态
当程序刚开始运行进行初始化时,所有的P都处于_Pgcstop状态,随着的P的初始化(runtime.procresize),会被设置为_Pidle状态。
当M需要运行时会调用runtime.acquirep来使P变为_Prunning状态,并通过runtime.releasep来释放,重新变为_Pidele。
当G执行时需要进入系统调用,P会被设置为_Psyscall,如果这个时候被系统监控抢夺(runtime.retake),则P会被重新修改为_Pidle。
如果在程序中发生GC,则P会被设置为_Pgcstop,并在runtime.startTheWorld时重新调整为_Prunning。
(这部分文字来自《Go程序员面试宝典》,图片来自这里)
3.2.3 P的创建
P的初始化是在schedinit函数中调用的,schedinit函数是在runtime的汇编启动代码里调用的。
1 ...........................
2 CALL runtime·args(SB)
3 CALL runtime·osinit(SB)
4 CALL runtime·schedinit(SB)
5 ...........................
shcedinit中通过调用procresize进行P的分配。P的个数默认等于CPU核数,如果设置了GOMAXPROCS环境变量,则会采用设置的值来确定P的个数。所以runtime.GOMAXPROCS是限制的并行线程数量,而不是系统线程即M的总数,M是按需创建。
1 func schedinit() {
2 .................
3 lock(&sched.lock)
4 sched.lastpoll = uint64(nanotime())
5 procs := ncpu
6 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
7 procs = n
8 }
9 if procresize(procs) != nil {
10 throw("unknown runnable goroutine during bootstrap")
11 }
12 unlock(&sched.lock)
13
14 // World is effectively started now, as P's can run.
15 worldStarted()
16 .....................
17 }
上面获取ncpu的个数,然后传递给procresize函数。
无论是初始化时的分配,还是后期调整,都是通过procresize来创建p以及初始化
```
1 func procresize(nprocs int32) p {
2 .............................
3 old := gomaxprocs
4 ......................
5 if nprocs > int32(len(allp)) {
6 // Synchronize with retake, which could be running
7 // concurrently since it doesn't run on a P.
8 lock(&allpLock)
9 if nprocs <= int32(cap(allp)) {
10 // 如果需要的p小于allp这个全局变量(切片)的cap能力,取其中的一部分
11 allp = allp[:nprocs]
12 } else {
13 // 否则创建nprocs数量的p,并把allp的中复制给nallp
14 nallp := make([]p, nprocs)
15 // Copy everything up to allp's cap so we
16 // never lose old allocated Ps.
17 copy(nallp, allp[:cap(allp)])
18 allp = nallp
19 }
20 ....................................
21 unlock(&allpLock)
22 }
23
24 // 进行p的初始化
25 for i := old; i < nprocs; i++ {
26 pp := allp[i]
27 if pp == nil {