1,为什么要控制goroutine的数量?
goroutine固然好,但是数量太多了,往往会带来很多麻烦,比如耗尽系统资源导致程序崩溃,或者CPU使用率过高导致系统忙不过来。比如:
1 for i:=0; i < 10000; i++ {
2 go work()
3 }
2,用什么方法控制goroutine的数量?
要在每一次执行go之前判断goroutine的数量,如果数量超了,就要阻塞go的执行。第一时间想到的就是使用通道。每次执行的go之前向通道写入值,直到通道满的时候就阻塞了,如下:
1 var ch chan int
2
3 func work() {
4 //do something
5 <-ch
6 }
7
8 func main() {
9 ch = make(chan int, 10)
10 for i:=0; i < 10000; i++ {
11 ch <- 1
12 go work()
13 }
14 }
这样每次同时运行的goroutine就被限制为10个了。但是新的问题出现了,因为并不是所有的goroutine都执行完了,在main函数退出之后,还有一些goroutine没有执行完就被强制结束了。这个时候我们就需要用到sync.WaitGroup。使用WaitGroup等待所有的goroutine退出。如下:
1 var wg *sync.WaitGroup
2
3 func work() {
4 defer wg.Done()
5 //do something
6 }
7
8 func main() {
9 wg = &sync.WaitGroup{}
10 for i:=0; i < 10000; i++ {
11 wg.Add(1)
12 go work()
13 }
14 wg.Wait()//等待所有goroutine退出
15 }
3,优雅的使用并控制goroutine的数量
综上所述,我们封装一下,代码如下:
1 package gpool
2
3 import (
4 "sync"
5 )
6
7 type pool struct {
8 queue chan int
9 wg *sync.WaitGroup
10 }
11
12 func New(size int) *pool {
13 if size <= 0 {
14 size = 1
15 }
16 return &pool{
17 queue: make(chan int, size),
18 wg: &sync.WaitGroup{},
19 }
20 }
21
22 func (p *pool) Add(delta int) {
23 for i := 0; i < delta; i++ {
24 p.queue <- 1
25 }
26 for i := 0; i > delta; i-- {
27 <-p.queue
28 }
29 p.wg.Add(delta)
30 }
31
32 func (p *pool) Done() {
33 <-p.queue
34 p.wg.Done()
35 }
36
37 func (p *pool) Wait() {
38 p.wg.Wait()
39 }
来段测试代码:
1 package gpool_test
2
3 import (
4 "runtime"
5 "testing"
6 "time"
7 "gpool"
8 )
9
10 func Test_Example(t *testing.T) {
11 pool := gpool.New(100)
12 println(runtime.NumGoroutine())
13 for i := 0; i < 1000; i++ {
14 pool.Add(1)
15 go func() {
16 time.Sleep(time.Second)
17 println(runtime.NumGoroutine())
18 pool.Done()
19 }()
20 }
21 pool.Wait()
22 println(runtime.NumGoroutine())
23 }
good job,Over~