问题

Golang 中的 Goroutine 支持是一个非常方便的特性,开发人员使用一个简单地 go func(){}() 语句即可启动一个轻量级的线程,轻易的就可以支持高并发编程模式。

Goroutine 是 Golang 中的基本执行单元,每一个 Go 程序都至少含有一个 Goroutine:主 Goroutine,它随着程序的启动而自动创建,并随着程序的结束自动销毁。

某天一个同事给了我一张测试环境的系统监控图(container_go_goroutines):

  1. T1 时刻:监控显示系统中大致有 150 个 goroutine
  2. T2 时刻:对系统执行简单的压测
  3. T3 时刻:监控显示系统中大致有 10k 个 goroutine
  4. T3 + 10 小时时刻:压测已经停止,监控显示系统中仍然大致有 10k 个 goroutine

从这个监控结果来看主要有如下几个问题:

  • 在简单的压测下,系统中就有 10k 个 goroutine 是否正常?
  • 压测停止后,为什么 goroutine 数量不降低?
  • 一个 Go 程序中,goroutine 数量会有上限吗?
  • 当系统中 goroutine 数量太多时系统是否会回收 goroutine?

上述前两个问题与具体业务关系比较大,后两个是与 Go 语言的运行时相关,在抛开业务的情况下我们可以先调研一下后两个问题的答案。

Goroutine 的诞生

例子

接下来我们使用一个非常简单的 Go 程序来观察一下,在 Golang 中是如何启动一个 Goroutine 的。示例代码如下(运行该程序,会在标准输出的打印一个字符串 1):

func main() {
	go func() {
		fmt.Println("1")
	}()

	time.Sleep(time.Second)
}

首先我们需要获取到对应的汇编代码,查看 go 语句会被编译成什么函数。

go tool compile

使用 go tool compile -N -l -S main.go 查看汇编代码:

# go tool compile -N -l -S main.go

"".main STEXT size=86 args=0x0 locals=0x18 funcid=0x0
				...
        0x0024 00036 (main.go:16)       LEAQ    "".main.func1·f(SB), AX
        0x002b 00043 (main.go:16)       MOVQ    AX, 8(SP)
        0x0030 00048 (main.go:16)       PCDATA  $1, $0
        0x0030 00048 (main.go:16)       CALL    runtime.newproc(SB)
        0x0035 00053 (main.go:20)       MOVQ    $1000000000, (SP)
        0x003d 00061 (main.go:20)       NOP
        0x0040 00064 (main.go:20)       CALL    time.Sleep(SB)
				...
"".main.func1 STEXT size=147 args=0x0 locals=0x68 funcid=0x0
        ...
        0x0020 00032 (main.go:17)       MOVUPS  X0, ""..autotmp_0+56(SP)
        0x0025 00037 (main.go:17)       LEAQ    ""..autotmp_0+56(SP), AX
        0x002a 00042 (main.go:17)       MOVQ    AX, ""..autotmp_2+48(SP)
        0x002f 00047 (main.go:17)       TESTB   AL, (AX)
        0x0031 00049 (main.go:17)       LEAQ    type.string(SB), CX
        0x0038 00056 (main.go:17)       MOVQ    CX, ""..autotmp_0+56(SP)
        0x003d 00061 (main.go:17)       LEAQ    ""..stmp_0(SB), CX
        0x0044 00068 (main.go:17)       MOVQ    CX, ""..autotmp_0+64(SP)
        0x0049 00073 (main.go:17)       TESTB   AL, (AX)
        0x004b 00075 (main.go:17)       JMP     77
        0x004d 00077 (main.go:17)       MOVQ    AX, ""..autotmp_1+72(SP)
        0x0052 00082 (main.go:17)       MOVQ    $1, ""..autotmp_1+80(SP)
        0x005b 00091 (main.go:17)       MOVQ    $1, ""..autotmp_1+88(SP)
        0x0064 00100 (main.go:17)       MOVQ    AX, (SP)
        0x0068 00104 (main.go:17)       MOVQ    $1, 8(SP)
        0x0071 00113 (main.go:17)       MOVQ    $1, 16(SP)
        0x007a 00122 (main.go:17)       PCDATA  $1, $0
        0x007a 00122 (main.go:17)       CALL    fmt.Println(SB)
        ...
备注:使用该方法时,如果代码中有引用第三方包的话,会报错 can't find import,可以通过先手动编译该包(go install)得到 .a 文件,然后使用 -I 包含该文件即可。

go tool objdump

使用该方法时,需要先编译代码得到可执行文件,然后再反编译得到汇编码:

# go tool compile -N -l main.go   
# go tool objdump -s main main.o

TEXT "".main(SB) gofile../Users/bytedance/workspace/code.byted.org/mq/main.go
  ...
  main.go:16            0xf61                   488d0500000000          LEAQ 0(IP), AX          [3:7]R_PCREL:"".main.func1·f
  main.go:16            0xf68                   4889442408              MOVQ AX, 0x8(SP)
  main.go:16            0xf6d                   e800000000              CALL 0xf72              [1:5]R_CALL:runtime.newproc<1>
  main.go:20            0xf72                   48c7042400ca9a3b        MOVQ $0x3b9aca00, 0(SP)
  main.go:20            0xf7a                   0f1f00                  NOPL 0(AX)
  main.go:20            0xf7d                   e800000000              CALL 0xf82              [1:5]R_CALL:time.Sleep
  main.go:21            0xf82                   488b6c2410              MOVQ 0x10(SP), BP
  main.go:21            0xf87                   4883c418                ADDQ $0x18, SP
  main.go:21            0xf8b                   c3                      RET
  main.go:15            0xf8c                   e800000000              CALL 0xf91              [1:5]R_CALL:runtime.morestack_noctxt
  main.go:15            0xf91                   ebaa                    JMP "".main(SB)

TEXT "".main.func1(SB) gofile../Users/bytedance/workspace/code.byted.org/mq/main.go
  ...
  main.go:17            0x100d                  e800000000              CALL 0x1012             [1:5]R_CALL:fmt.Println
  main.go:18            0x1012                  488b6c2460              MOVQ 0x60(SP), BP
  main.go:18            0x1017                  4883c468                ADDQ $0x68, SP
  main.go:18            0x101b                  c3                      RET
  main.go:16            0x101c                  e800000000              CALL 0x1021             [1:5]R_CALL:runtime.morestack_noctxt
  main.go:16            0x1021                  e96dffffff              JMP "".main.func1(SB)

go build

# go build -gcflags -S main.go  

"".main STEXT size=86 args=0x0 locals=0x18 funcid=0x0
        ...
        LEAQ    "".main.func1·f(SB), AX
        MOVQ    AX, 8(SP)
        PCDATA  $1, $0
        CALL    runtime.newproc(SB)
        MOVQ    $1000000000, (SP)
        NOP
        CALL    time.Sleep(SB)
        ...
"".main.func1 STEXT size=138 args=0x0 locals=0x58 funcid=0x0
        ...

源码

从上面的汇编代码可以看到, go 语句最终会被编译成对 runtime.newproc 的调用,而 newproc 又会调用 runtime.newproc1

// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
//
// The stack layout of this call is unusual: it assumes that the
// arguments to pass to fn are on the stack sequentially immediately
// after &fn. Hence, they are logically part of newproc's argument
// frame, even though they don't appear in its signature (and can't
// because their types differ between call sites).
//
// This must be nosplit because this stack layout means there are
// untyped arguments in newproc's argument frame. Stack copies won't
// be able to adjust them and stack splits won't be able to copy them.
//
//go:nosplit
func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
}

// Create a new g in state _Grunnable, starting at fn, with narg bytes
// of arguments starting at argp. callerpc is the address of the go
// statement that created this. The caller is responsible for adding
// the new g to the scheduler.
//
// This must run on the system stack because it's the continuation of
// newproc, which cannot split the stack.
//
//go:systemstack
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()

	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
	acquirem() // disable preemption because it can be holding p in a local var
	siz := narg
	siz = (siz + 7) &^ 7

	// We could allocate a larger initial stack if necessary.
	// Not worth it: this is almost always an error.
	// 4*sizeof(uintreg): extra space added below
	// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
	if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
		throw("newproc: function arguments too large for new goroutine")
	}

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}

	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}

	// 省略部分代码

	releasem(_g_.m)

	return newg
}

func allgadd(gp *g) {
	if readgstatus(gp) == _Gidle {
		throw("allgadd: bad status Gidle")
	}

	lock(&allglock)
	allgs = append(allgs, gp)
	if &allgs[0] != allgptr {
		atomicstorep(unsafe.Pointer(&allgptr), unsafe.Pointer(&allgs[0]))
	}
	atomic.Storeuintptr(&allglen, uintptr(len(allgs)))
	unlock(&allglock)
}

主要执行流程如下:

  1. 通过 systemstack 切换到系统栈
  2. 通过 gfget 尝试获取一个 G,没有则通过 malg 重新创建一个 G
  3. 通过 runqput 将 G 放到待运行队列

其中获取 G 的主要函数是 gfget,代码如下:

// Get from gfree list.
// If local list is empty, grab a batch from global list.
func gfget(_p_ *p) *g {
retry:
	if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
		lock(&sched.gFree.lock)
		// Move a batch of free Gs to the P.
		for _p_.gFree.n < 32 {
			// Prefer Gs with stacks.
			gp := sched.gFree.stack.pop()
			if gp == nil {
				gp = sched.gFree.noStack.pop()
				if gp == nil {
					break
				}
			}
			sched.gFree.n--
			_p_.gFree.push(gp)
			_p_.gFree.n++
		}
		unlock(&sched.gFree.lock)
		goto retry
	}
	gp := _p_.gFree.pop()
	if gp == nil {
		return nil
	}
	_p_.gFree.n--
	if gp.stack.lo == 0 {
		// Stack was deallocated in gfput. Allocate a new one.
		systemstack(func() {
			gp.stack = stackalloc(_FixedStack)
		})
		gp.stackguard0 = gp.stack.lo + _StackGuard
	} else {
		if raceenabled {
			racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
		if msanenabled {
			msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
	}
	return gp
}

代码比较简单,主要流程如下:

  1. 如果 P 的本地 gFree 队列为空且调度器的 gFree 队列不为空,则尝试从调度器的 gFree 队列中获取 G 到 P 的本地 gFree 队列,最多获取 32 个
  2. 从 P 的本地 gFree 队列获取一个 G,如果为空则返回
  3. 如果 G 的栈为空,则分配固定大小的栈(2KB)

可以看到,每个 P 和调度器都维护有一个 gFree 队列,保存有已经释放的 G(处于 Gdead 状态);每次新建 G 时都优先尝试从 P 和调度器的 gFree 队列获取一个 G,如果没有再重新创建一个 G。

Goroutine 的死亡

当用户编写的代码执行完成之后,goroutine 的执行流程会走到 goexit 函数,当一个 goroutine 启动的时候,runtime 会通过构造 goroutine 的调用栈,模拟成 goexit 函数调用用户的函数,这样当用户代码执行完成之后就会执行到 goexit 函数:

// goexit is the return stub at the top of every goroutine call stack.
// Each goroutine stack is constructed as if goexit called the
// goroutine's entry point function, so that when the entry point
// function returns, it will return to goexit, which will call goexit1
// to perform the actual exit.
//
// This function must never be called directly. Call goexit1 instead.
// gentraceback assumes that goexit terminates the stack. A direct
// call on the stack will cause gentraceback to stop walking the stack
// prematurely and if there is leftover state it may panic.
func goexit(neverCallThisFunction)

// The top-most function running on a goroutine
// returns to goexit+PCQuantum. Defined as ABIInternal
// so as to make it identifiable to traceback (this
// function it used as a sentinel; traceback wants to
// see the func PC, not a wrapper PC).
TEXT runtime·goexit<ABIInternal>(SB),NOSPLIT,$0-0
	BYTE	$0x90	// NOP
	CALL	runtime·goexit1(SB)	// does not return
	// traceback from goexit1 must hit code range of goexit
	BYTE	$0x90	// NOP

可以看到最终会调用 runtime.goexit1 函数:

// Finishes execution of the current goroutine.
func goexit1() {
	// 省略部分代码
	mcall(goexit0)
}

// goexit continuation on g0.
func goexit0(gp *g) {
	_g_ := getg()

	casgstatus(gp, _Grunning, _Gdead)
	if isSystemGoroutine(gp, false) {
		atomic.Xadd(&sched.ngsys, -1)
	}
	gp.m = nil

	// 省略部分代码
	dropg()

	if GOARCH == "wasm" { // no threads yet on wasm
		gfput(_g_.m.p.ptr(), gp)
		schedule() // never returns
	}

	if _g_.m.lockedInt != 0 {
		print("invalid m->lockedInt = ", _g_.m.lockedInt, "\\n")
		throw("internal lockOSThread error")
	}
	gfput(_g_.m.p.ptr(), gp)
	if locked {
		// The goroutine may have locked this thread because
		// it put it in an unusual kernel state. Kill it
		// rather than returning it to the thread pool.

		// Return to mstart, which will release the P and exit
		// the thread.
		if GOOS != "plan9" { // See golang.org/issue/22227.
			gogo(&_g_.m.g0.sched)
		} else {
			// Clear lockedExt on plan9 since we may end up re-using
			// this thread.
			_g_.m.lockedExt = 0
		}
	}
	schedule()
}

主要流程如下:

  1. 将 G 的状态设置到 Gdead
  2. 调用 gfput 将 G 放到 Free 队列
  3. 调用 schedule 执行下一次的调度,返回到调度循环

其中 gfput 的代码如下:

// Put on gfree list.
// If local list is too long, transfer a batch to the global list.
func gfput(_p_ *p, gp *g) {
	if readgstatus(gp) != _Gdead {
		throw("gfput: bad status (not Gdead)")
	}

	stksize := gp.stack.hi - gp.stack.lo

	if stksize != _FixedStack {
		// non-standard stack size - free it.
		stackfree(gp.stack)
		gp.stack.lo = 0
		gp.stack.hi = 0
		gp.stackguard0 = 0
	}

	_p_.gFree.push(gp)
	_p_.gFree.n++
	if _p_.gFree.n >= 64 {
		lock(&sched.gFree.lock)
		for _p_.gFree.n >= 32 {
			_p_.gFree.n--
			gp = _p_.gFree.pop()
			if gp.stack.lo == 0 {
				sched.gFree.noStack.push(gp)
			} else {
				sched.gFree.stack.push(gp)
			}
			sched.gFree.n++
		}
		unlock(&sched.gFree.lock)
	}
}

主要流程如下:

  1. 如果 G 的栈大小不是固定的 2KB,则释放栈,等待 G 被重用的时候重新分配
  2. 将当前 G 放置到 P 的 gFree 队列
  3. 如果当前 P 的队列大于了 64,则将大于 32 的部分放置到调度器的队列

如何监控 Goroutine 的数量

runtime 包提供了一个 NumGoroutine 来获取当前系统中的协程数量,具体代码如下:

// NumGoroutine returns the number of goroutines that currently exist.
func NumGoroutine() int {
	return int(gcount())
}

func gcount() int32 {
	n := int32(atomic.Loaduintptr(&allglen)) - sched.gFree.n - int32(atomic.Load(&sched.ngsys))
	for _, _p_ := range allp {
		n -= _p_.gFree.n
	}

	// All these variables can be changed concurrently, so the result can be inconsistent.
	// But at least the current goroutine is running.
	if n < 1 {
		n = 1
	}
	return n
}

可以看到,主要的计算方式如下:

  • 从 allglen 中拿到所有的 G 的数量
  • 去掉调度器中的 gFree 中的数量和系统 Goroutine 数量
  • 去掉每个 P 中的 gFree 数量

总结

通过上述的梳理,可以简单的总结如下:

  1. 当用户启动一个 G 时,会首先从 P 的本地 gFree 队列获取,再从调度器的 gFree 队列获取,最后考虑重新创建一个 G
  2. 所有的 G 都会保存在全局的 allgs 切片中,系统中没有 G 的数量限制
  3. 当一个 G 退出时,会首先考虑保存到 P 的本地 gFree 队列,如果本地数量过多则放置到调度器的 gFree 队列
  4. 获取 G 的数量时,会减去处于 Gdead 的 G 数量以及系统 G 的数量

一个例子

最后附加一个示例来查看当前系统中的 Goroutine 数量。

func rung(n int) {
	v := int32(0)
	var wg sync.WaitGroup
	wg.Add(n + 1)

	for i := 0; i < n; i++ {
		go func() {
			defer wg.Wait()
			defer wg.Done()
			atomic.AddInt32(&v, 1)
		}()
	}

	for {
		if atomic.LoadInt32(&v) == int32(n) {
			fmt.Println("Goroutine Number: ", runtime.NumGoroutine())
			break
		}
	}
	wg.Done()
	wg.Wait()
}

func main() {
	f, err := os.Create("pprof.out")
	if err != nil {
		panic(err)
	}
	err = pprof.StartCPUProfile(f)
	if err != nil {
		panic(err)
	}
	defer pprof.StopCPUProfile()

	fmt.Println("Goroutine Number: ", runtime.NumGoroutine())

	fmt.Println("Rung 1000")
	rung(1000)
	time.Sleep(1 * time.Second)
	fmt.Println("After Rung 1000 Goroutine Number: ", runtime.NumGoroutine())

	fmt.Println("Rung 100")
	rung(100)
	time.Sleep(1 * time.Second)
	fmt.Println("After Rung 100 Goroutine Number: ", runtime.NumGoroutine())

	rung(10000000)
}

输出如下:

Goroutine Number:  2
Rung 1000
Goroutine Number:  1002
After Rung 1000 Goroutine Number:  2
Rung 100
Goroutine Number:  102
After Rung 100 Goroutine Number:  2
Goroutine Number:  10000002

参考文档