问题
Golang 中的 Goroutine 支持是一个非常方便的特性,开发人员使用一个简单地 go func(){}() 语句即可启动一个轻量级的线程,轻易的就可以支持高并发编程模式。
Goroutine 是 Golang 中的基本执行单元,每一个 Go 程序都至少含有一个 Goroutine:主 Goroutine,它随着程序的启动而自动创建,并随着程序的结束自动销毁。
某天一个同事给了我一张测试环境的系统监控图(container_go_goroutines):
- T1 时刻:监控显示系统中大致有 150 个 goroutine
- T2 时刻:对系统执行简单的压测
- T3 时刻:监控显示系统中大致有 10k 个 goroutine
- T3 + 10 小时时刻:压测已经停止,监控显示系统中仍然大致有 10k 个 goroutine
从这个监控结果来看主要有如下几个问题:
- 在简单的压测下,系统中就有 10k 个 goroutine 是否正常?
- 压测停止后,为什么 goroutine 数量不降低?
- 一个 Go 程序中,goroutine 数量会有上限吗?
- 当系统中 goroutine 数量太多时系统是否会回收 goroutine?
上述前两个问题与具体业务关系比较大,后两个是与 Go 语言的运行时相关,在抛开业务的情况下我们可以先调研一下后两个问题的答案。
Goroutine 的诞生
例子
接下来我们使用一个非常简单的 Go 程序来观察一下,在 Golang 中是如何启动一个 Goroutine 的。示例代码如下(运行该程序,会在标准输出的打印一个字符串 1):
func main() {
go func() {
fmt.Println("1")
}()
time.Sleep(time.Second)
}
首先我们需要获取到对应的汇编代码,查看 go 语句会被编译成什么函数。
go tool compile
使用 go tool compile -N -l -S main.go 查看汇编代码:
# go tool compile -N -l -S main.go
"".main STEXT size=86 args=0x0 locals=0x18 funcid=0x0
...
0x0024 00036 (main.go:16) LEAQ "".main.func1·f(SB), AX
0x002b 00043 (main.go:16) MOVQ AX, 8(SP)
0x0030 00048 (main.go:16) PCDATA $1, $0
0x0030 00048 (main.go:16) CALL runtime.newproc(SB)
0x0035 00053 (main.go:20) MOVQ $1000000000, (SP)
0x003d 00061 (main.go:20) NOP
0x0040 00064 (main.go:20) CALL time.Sleep(SB)
...
"".main.func1 STEXT size=147 args=0x0 locals=0x68 funcid=0x0
...
0x0020 00032 (main.go:17) MOVUPS X0, ""..autotmp_0+56(SP)
0x0025 00037 (main.go:17) LEAQ ""..autotmp_0+56(SP), AX
0x002a 00042 (main.go:17) MOVQ AX, ""..autotmp_2+48(SP)
0x002f 00047 (main.go:17) TESTB AL, (AX)
0x0031 00049 (main.go:17) LEAQ type.string(SB), CX
0x0038 00056 (main.go:17) MOVQ CX, ""..autotmp_0+56(SP)
0x003d 00061 (main.go:17) LEAQ ""..stmp_0(SB), CX
0x0044 00068 (main.go:17) MOVQ CX, ""..autotmp_0+64(SP)
0x0049 00073 (main.go:17) TESTB AL, (AX)
0x004b 00075 (main.go:17) JMP 77
0x004d 00077 (main.go:17) MOVQ AX, ""..autotmp_1+72(SP)
0x0052 00082 (main.go:17) MOVQ $1, ""..autotmp_1+80(SP)
0x005b 00091 (main.go:17) MOVQ $1, ""..autotmp_1+88(SP)
0x0064 00100 (main.go:17) MOVQ AX, (SP)
0x0068 00104 (main.go:17) MOVQ $1, 8(SP)
0x0071 00113 (main.go:17) MOVQ $1, 16(SP)
0x007a 00122 (main.go:17) PCDATA $1, $0
0x007a 00122 (main.go:17) CALL fmt.Println(SB)
...
备注:使用该方法时,如果代码中有引用第三方包的话,会报错 can't find import,可以通过先手动编译该包(go install)得到 .a 文件,然后使用 -I 包含该文件即可。
go tool objdump
使用该方法时,需要先编译代码得到可执行文件,然后再反编译得到汇编码:
# go tool compile -N -l main.go
# go tool objdump -s main main.o
TEXT "".main(SB) gofile../Users/bytedance/workspace/code.byted.org/mq/main.go
...
main.go:16 0xf61 488d0500000000 LEAQ 0(IP), AX [3:7]R_PCREL:"".main.func1·f
main.go:16 0xf68 4889442408 MOVQ AX, 0x8(SP)
main.go:16 0xf6d e800000000 CALL 0xf72 [1:5]R_CALL:runtime.newproc<1>
main.go:20 0xf72 48c7042400ca9a3b MOVQ $0x3b9aca00, 0(SP)
main.go:20 0xf7a 0f1f00 NOPL 0(AX)
main.go:20 0xf7d e800000000 CALL 0xf82 [1:5]R_CALL:time.Sleep
main.go:21 0xf82 488b6c2410 MOVQ 0x10(SP), BP
main.go:21 0xf87 4883c418 ADDQ $0x18, SP
main.go:21 0xf8b c3 RET
main.go:15 0xf8c e800000000 CALL 0xf91 [1:5]R_CALL:runtime.morestack_noctxt
main.go:15 0xf91 ebaa JMP "".main(SB)
TEXT "".main.func1(SB) gofile../Users/bytedance/workspace/code.byted.org/mq/main.go
...
main.go:17 0x100d e800000000 CALL 0x1012 [1:5]R_CALL:fmt.Println
main.go:18 0x1012 488b6c2460 MOVQ 0x60(SP), BP
main.go:18 0x1017 4883c468 ADDQ $0x68, SP
main.go:18 0x101b c3 RET
main.go:16 0x101c e800000000 CALL 0x1021 [1:5]R_CALL:runtime.morestack_noctxt
main.go:16 0x1021 e96dffffff JMP "".main.func1(SB)
go build
# go build -gcflags -S main.go
"".main STEXT size=86 args=0x0 locals=0x18 funcid=0x0
...
LEAQ "".main.func1·f(SB), AX
MOVQ AX, 8(SP)
PCDATA $1, $0
CALL runtime.newproc(SB)
MOVQ $1000000000, (SP)
NOP
CALL time.Sleep(SB)
...
"".main.func1 STEXT size=138 args=0x0 locals=0x58 funcid=0x0
...
源码
从上面的汇编代码可以看到, go 语句最终会被编译成对 runtime.newproc 的调用,而 newproc 又会调用 runtime.newproc1:
// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
//
// The stack layout of this call is unusual: it assumes that the
// arguments to pass to fn are on the stack sequentially immediately
// after &fn. Hence, they are logically part of newproc's argument
// frame, even though they don't appear in its signature (and can't
// because their types differ between call sites).
//
// This must be nosplit because this stack layout means there are
// untyped arguments in newproc's argument frame. Stack copies won't
// be able to adjust them and stack splits won't be able to copy them.
//
//go:nosplit
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, argp, siz, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
// Create a new g in state _Grunnable, starting at fn, with narg bytes
// of arguments starting at argp. callerpc is the address of the go
// statement that created this. The caller is responsible for adding
// the new g to the scheduler.
//
// This must run on the system stack because it's the continuation of
// newproc, which cannot split the stack.
//
//go:systemstack
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
_g_ := getg()
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
acquirem() // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7
// We could allocate a larger initial stack if necessary.
// Not worth it: this is almost always an error.
// 4*sizeof(uintreg): extra space added below
// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw("newproc: function arguments too large for new goroutine")
}
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
// 省略部分代码
releasem(_g_.m)
return newg
}
func allgadd(gp *g) {
if readgstatus(gp) == _Gidle {
throw("allgadd: bad status Gidle")
}
lock(&allglock)
allgs = append(allgs, gp)
if &allgs[0] != allgptr {
atomicstorep(unsafe.Pointer(&allgptr), unsafe.Pointer(&allgs[0]))
}
atomic.Storeuintptr(&allglen, uintptr(len(allgs)))
unlock(&allglock)
}
主要执行流程如下:
- 通过 systemstack 切换到系统栈
- 通过 gfget 尝试获取一个 G,没有则通过 malg 重新创建一个 G
- 通过 runqput 将 G 放到待运行队列
其中获取 G 的主要函数是 gfget,代码如下:
// Get from gfree list.
// If local list is empty, grab a batch from global list.
func gfget(_p_ *p) *g {
retry:
if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
lock(&sched.gFree.lock)
// Move a batch of free Gs to the P.
for _p_.gFree.n < 32 {
// Prefer Gs with stacks.
gp := sched.gFree.stack.pop()
if gp == nil {
gp = sched.gFree.noStack.pop()
if gp == nil {
break
}
}
sched.gFree.n--
_p_.gFree.push(gp)
_p_.gFree.n++
}
unlock(&sched.gFree.lock)
goto retry
}
gp := _p_.gFree.pop()
if gp == nil {
return nil
}
_p_.gFree.n--
if gp.stack.lo == 0 {
// Stack was deallocated in gfput. Allocate a new one.
systemstack(func() {
gp.stack = stackalloc(_FixedStack)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
if raceenabled {
racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
if msanenabled {
msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
}
return gp
}
代码比较简单,主要流程如下:
- 如果 P 的本地 gFree 队列为空且调度器的 gFree 队列不为空,则尝试从调度器的 gFree 队列中获取 G 到 P 的本地 gFree 队列,最多获取 32 个
- 从 P 的本地 gFree 队列获取一个 G,如果为空则返回
- 如果 G 的栈为空,则分配固定大小的栈(2KB)
可以看到,每个 P 和调度器都维护有一个 gFree 队列,保存有已经释放的 G(处于 Gdead 状态);每次新建 G 时都优先尝试从 P 和调度器的 gFree 队列获取一个 G,如果没有再重新创建一个 G。
Goroutine 的死亡
当用户编写的代码执行完成之后,goroutine 的执行流程会走到 goexit 函数,当一个 goroutine 启动的时候,runtime 会通过构造 goroutine 的调用栈,模拟成 goexit 函数调用用户的函数,这样当用户代码执行完成之后就会执行到 goexit 函数:
// goexit is the return stub at the top of every goroutine call stack.
// Each goroutine stack is constructed as if goexit called the
// goroutine's entry point function, so that when the entry point
// function returns, it will return to goexit, which will call goexit1
// to perform the actual exit.
//
// This function must never be called directly. Call goexit1 instead.
// gentraceback assumes that goexit terminates the stack. A direct
// call on the stack will cause gentraceback to stop walking the stack
// prematurely and if there is leftover state it may panic.
func goexit(neverCallThisFunction)
// The top-most function running on a goroutine
// returns to goexit+PCQuantum. Defined as ABIInternal
// so as to make it identifiable to traceback (this
// function it used as a sentinel; traceback wants to
// see the func PC, not a wrapper PC).
TEXT runtime·goexit<ABIInternal>(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP
可以看到最终会调用 runtime.goexit1 函数:
// Finishes execution of the current goroutine.
func goexit1() {
// 省略部分代码
mcall(goexit0)
}
// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
gp.m = nil
// 省略部分代码
dropg()
if GOARCH == "wasm" { // no threads yet on wasm
gfput(_g_.m.p.ptr(), gp)
schedule() // never returns
}
if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\\n")
throw("internal lockOSThread error")
}
gfput(_g_.m.p.ptr(), gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
schedule()
}
主要流程如下:
- 将 G 的状态设置到 Gdead
- 调用 gfput 将 G 放到 Free 队列
- 调用 schedule 执行下一次的调度,返回到调度循环
其中 gfput 的代码如下:
// Put on gfree list.
// If local list is too long, transfer a batch to the global list.
func gfput(_p_ *p, gp *g) {
if readgstatus(gp) != _Gdead {
throw("gfput: bad status (not Gdead)")
}
stksize := gp.stack.hi - gp.stack.lo
if stksize != _FixedStack {
// non-standard stack size - free it.
stackfree(gp.stack)
gp.stack.lo = 0
gp.stack.hi = 0
gp.stackguard0 = 0
}
_p_.gFree.push(gp)
_p_.gFree.n++
if _p_.gFree.n >= 64 {
lock(&sched.gFree.lock)
for _p_.gFree.n >= 32 {
_p_.gFree.n--
gp = _p_.gFree.pop()
if gp.stack.lo == 0 {
sched.gFree.noStack.push(gp)
} else {
sched.gFree.stack.push(gp)
}
sched.gFree.n++
}
unlock(&sched.gFree.lock)
}
}
主要流程如下:
- 如果 G 的栈大小不是固定的 2KB,则释放栈,等待 G 被重用的时候重新分配
- 将当前 G 放置到 P 的 gFree 队列
- 如果当前 P 的队列大于了 64,则将大于 32 的部分放置到调度器的队列
如何监控 Goroutine 的数量
runtime 包提供了一个 NumGoroutine 来获取当前系统中的协程数量,具体代码如下:
// NumGoroutine returns the number of goroutines that currently exist.
func NumGoroutine() int {
return int(gcount())
}
func gcount() int32 {
n := int32(atomic.Loaduintptr(&allglen)) - sched.gFree.n - int32(atomic.Load(&sched.ngsys))
for _, _p_ := range allp {
n -= _p_.gFree.n
}
// All these variables can be changed concurrently, so the result can be inconsistent.
// But at least the current goroutine is running.
if n < 1 {
n = 1
}
return n
}
可以看到,主要的计算方式如下:
- 从 allglen 中拿到所有的 G 的数量
- 去掉调度器中的 gFree 中的数量和系统 Goroutine 数量
- 去掉每个 P 中的 gFree 数量
总结
通过上述的梳理,可以简单的总结如下:
- 当用户启动一个 G 时,会首先从 P 的本地 gFree 队列获取,再从调度器的 gFree 队列获取,最后考虑重新创建一个 G
- 所有的 G 都会保存在全局的 allgs 切片中,系统中没有 G 的数量限制
- 当一个 G 退出时,会首先考虑保存到 P 的本地 gFree 队列,如果本地数量过多则放置到调度器的 gFree 队列
- 获取 G 的数量时,会减去处于 Gdead 的 G 数量以及系统 G 的数量
一个例子
最后附加一个示例来查看当前系统中的 Goroutine 数量。
func rung(n int) {
v := int32(0)
var wg sync.WaitGroup
wg.Add(n + 1)
for i := 0; i < n; i++ {
go func() {
defer wg.Wait()
defer wg.Done()
atomic.AddInt32(&v, 1)
}()
}
for {
if atomic.LoadInt32(&v) == int32(n) {
fmt.Println("Goroutine Number: ", runtime.NumGoroutine())
break
}
}
wg.Done()
wg.Wait()
}
func main() {
f, err := os.Create("pprof.out")
if err != nil {
panic(err)
}
err = pprof.StartCPUProfile(f)
if err != nil {
panic(err)
}
defer pprof.StopCPUProfile()
fmt.Println("Goroutine Number: ", runtime.NumGoroutine())
fmt.Println("Rung 1000")
rung(1000)
time.Sleep(1 * time.Second)
fmt.Println("After Rung 1000 Goroutine Number: ", runtime.NumGoroutine())
fmt.Println("Rung 100")
rung(100)
time.Sleep(1 * time.Second)
fmt.Println("After Rung 100 Goroutine Number: ", runtime.NumGoroutine())
rung(10000000)
}
输出如下:
Goroutine Number: 2
Rung 1000
Goroutine Number: 1002
After Rung 1000 Goroutine Number: 2
Rung 100
Goroutine Number: 102
After Rung 100 Goroutine Number: 2
Goroutine Number: 10000002