1 并发过高导致程序崩溃

我们首先看一个非常简单的例子:


func main() {
	var wg sync.WaitGroup
	for i := 0; i < math.MaxInt32; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			fmt.Println(i)
			time.Sleep(time.Second)
		}(i)
	}
	wg.Wait()
}

这个例子实现了 math.MaxInt32 个协程的并发,约 2^31 = 2 亿个,每个协程内部几乎没有做什么事情。正常的情况下呢,这个程序会乱序输出 1 -> 2^31 个数字。

那实际运行的结果是怎么样的呢?


$ go run main.go
...
150577
150578
panic: too many concurrent operations on a single file or socket (max 1048575)

goroutine 1199236 [running]:
internal/poll.(*fdMutex).rwlock(0xc0000620c0, 0x0, 0xc0000781b0)
        /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x13f
internal/poll.(*FD).writeLock(...)
        /usr/local/go/src/internal/poll/fd_mutex.go:239
internal/poll.(*FD).Write(0xc0000620c0, 0xc125ccd6e0, 0x11, 0x20, 0x0, 0x0, 0x0)
        /usr/local/go/src/internal/poll/fd_unix.go:255 +0x5e
fmt.Fprintf(0x10ed3e0, 0xc00000e018, 0x10d3024, 0xc, 0xc0e69b87b0, 0x1, 0x1, 0x11, 0x0, 0x0)
        /usr/local/go/src/fmt/print.go:205 +0xa5
fmt.Printf(...)
        /usr/local/go/src/fmt/print.go:213
main.main.func1(0xc0000180b0, 0x124c31)
...

运行的结果是程序直接崩溃了,关键的报错信息是:

panic: too many concurrent operations on a single file or socket (max 1048575)

对单个 file/socket 的并发操作个数超过了系统上限,这个报错是 fmt.Printf 函数引起的,fmt.Printf 将格式化后的字符串打印到屏幕,即标准输出。在 linux 系统中,标准输出也可以视为文件,内核(kernel)利用文件描述符(file descriptor)来访问文件,标准输出的文件描述符为 1,错误输出文件描述符为 2,标准输入的文件描述符为 0。

简而言之,系统的资源被耗尽了。

那如果我们将 fmt.Printf 这行代码去掉呢?那程序很可能会因为内存不足而崩溃。这一点更好理解,每个协程至少需要消耗 2KB 的空间,那么假设计算机的内存是 2GB,那么至多允许 2GB/2KB = 1M 个协程同时存在。那如果协程中还存在着其他需要分配内存的操作,那么允许并发执行的协程将会数量级地减少。

2 如何解决

不同的应用程序,消耗的资源是不一样的。比较推荐的方式的是:应用程序来主动限制并发的协程数量。

2.1 利用 channel 的缓存区

可以利用信道 channel 的缓冲区大小来实现:

// main_chan.go
func main() {
	var wg sync.WaitGroup
	ch := make(chan struct{}, 3)
	for i := 0; i < 10; i++ {
		ch <- struct{}{}
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			log.Println(i)
			time.Sleep(time.Second)
			<-ch
		}(i)
	}
	wg.Wait()
}

make(chan struct{}, 3) 创建缓冲区大小为 3 的 channel,在没有被接收的情况下,至多发送 3 个消息则被阻塞。
开启协程前,调用 ch <- struct{}{},若缓存区满,则阻塞。
协程任务结束,调用 <-ch 释放缓冲区。
sync.WaitGroup 并不是必须的,例如 http 服务,每个请求天然是并发的,此时使用 channel 控制并发处理的任务数量,就不需要 sync.WaitGroup。
运行结果如下:

$ go run main_chan.go
2020/12/21 00:48:28 2
2020/12/21 00:48:28 0
2020/12/21 00:48:28 1
2020/12/21 00:48:29 3
2020/12/21 00:48:29 4
2020/12/21 00:48:29 5
2020/12/21 00:48:30 6
2020/12/21 00:48:30 7
2020/12/21 00:48:30 8
2020/12/21 00:48:31 9

从日志中可以很容易看到,每秒钟只并发执行了 3 个任务,达到了协程并发控制的目的。

2.2 利用第三方库

目前有很多第三方库实现了协程池,可以很方便地用来控制协程的并发数量,比较受欢迎的有:

  1. Jeffail/tunny
  2. panjf2000/ants
    以 tunny 举例:
package main

import (
	"log"
	"time"

	"github.com/Jeffail/tunny"
)

func main() {
	pool := tunny.NewFunc(3, func(i interface{}) interface{} {
		log.Println(i)
		time.Sleep(time.Second)
		return nil
	})
	defer pool.Close()

	for i := 0; i < 10; i++ {
		go pool.Process(i)
	}
	time.Sleep(time.Second * 4)
}

tunny.NewFunc(3, f) 第一个参数是协程池的大小(poolSize),第二个参数是协程运行的函数(worker)。
pool.Process(i) 将参数 i 传递给协程池定义好的 worker 处理。
pool.Close() 关闭协程池。
运行结果如下:

$ go run main_tunny.go
2020/12/21 01:00:21 6
2020/12/21 01:00:21 1
2020/12/21 01:00:21 3
2020/12/21 01:00:22 8
2020/12/21 01:00:22 4
2020/12/21 01:00:22 7
2020/12/21 01:00:23 5
2020/12/21 01:00:23 2
2020/12/21 01:00:23 0
2020/12/21 01:00:24 9