尽管Goroutine(协程)非常清轻量,但是本身也是占用资源的,过多协程切换也会带来开销,总之物极必反,无限制的开协程的结果只会是Game Over。生产实践中必须考虑控制协程数量,本文带你看看针对不同场景和需求的协程数量控制方式,看看这些姿势你都会了吗?

要是本文对您有帮助的话,欢迎【关注】作者,【点赞】+【收藏】,保持交流!

场景

如下,go中一个典型场景是,接受数据然后开协程处理,代码如下

// runTaskDataGenerator 产生数据
func runTaskDataGenerator(dataChan chan int) {
    for i := 0; i < 100; i++ {
        dataChan <- i
    }

    close(dataChan)
}

// runInfiniteTask 每来一个数据起协程处理任务
func runInfiniteTask(dataChan <-chan int) {
    var wg sync.WaitGroup

    for data := range dataChan {
        wg.Add(1)
        go func(data int) {
            defer wg.Done()

            // do something
            time.Sleep(3 * time.Second)
        }(data)
    }

    wg.Wait()
}

func TestRunInfiniteTask(t *testing.T) {
    dataChan := make(chan int)

    go runTaskDataGenerator(dataChan)
    go runNumGoroutineMonitor()

    runInfiniteTask(dataChan)
}

runNumGoroutineMonitor监控协程数量

// runNumGoroutineMonitor 协程数量监控
func runNumGoroutineMonitor() {
    log.Printf("协程数量->%d\n", runtime.NumGoroutine())

    for {
        select {
        case <-time.After(time.Second):
            log.Printf("协程数量->%d\n", runtime.NumGoroutine())
        }
    }
}

运行结果如下,可以看到有多少数据就会起多少协程,如果任务处理时间较长,短时间可能出现大量协程,耗尽资源,需要控制协程数量。

=== RUN TestRunInfiniteTask
2023/01/05 12:38:42 协程数量->6
2023/01/05 12:38:43 协程数量->105
2023/01/05 12:38:44 协程数量->105
2023/01/05 12:38:45 协程数量->23
--- PASS: TestRunInfiniteTask (3.00s)

固定个数协程并发处理任务

一般叫做Bounded/Fixed并发控制。

  • 优点是简单,不复杂的并发任务这样简单处理即可。
  • 缺点在于dataChan可能流量不不均衡,需要同时处理的任务多少在变动,但是对应的协程数量保持不变,要不就是任务处理堵塞要不就是存在多余的协程空闲
// runBoundedTask 起maxTaskNum个协程共同处理任务
func runBoundedTask(dataChan <-chan int, maxTaskNum int) {
    var wg sync.WaitGroup
    wg.Add(maxTaskNum)

    for i := 0; i < maxTaskNum; i++ {
        go func() {
            defer wg.Done()

            for data := range dataChan {
                func(data int) {

                    // do something
                    time.Sleep(3 * time.Second)
                }(data)
            }
        }()
    }

    wg.Wait()
}

动态个数协程并发处理任务

针对固定个数协程的缺点,一个思路是协程数量最好能够根据来的处理任务的多少,动态变更,指定一个并发上限,任务多时增加协程数量,任务少时减少协程数量。这里提供两种实现思路

自定义令牌池实现

令牌池维持最大允许并发任务数个令牌,每个任务启动时请求令牌,运行完成返回令牌。

// runDynamicTask 
// 最大同时运行maxTaskNum个任务处理数据
// 自定义令牌池维持maxTaskNum个令牌供竞争
func runDynamicTask(dataChan <-chan int, maxTaskNum int) {
    // 初始化令牌池
    tokenPool := make(chan struct{}, maxTaskNum)
    for i := 0; i < maxTaskNum; i++ {
        tokenPool <- struct{}{}
    }

    var wg sync.WaitGroup

    for data := range dataChan {
        // 先获取令牌,如果被消费完则阻塞等待其它任务返还令牌
        <-tokenPool

        wg.Add(1)
        go func(data int) {
            defer wg.Done()

            // 任务运行完成,返还令牌
            defer func() {
                tokenPool <- struct{}{}
            }()

            // do something
            time.Sleep(3 * time.Second)
        }(data)
    }

    wg.Wait()
}

信号量Semaphore实现

同上,令牌池换成信号量。

// runSemaphoreTask 
// 最大同时运行maxTaskNum个任务处理数据
// 使用信号量维持maxTaskNum个信号
func runSemaphoreTask(dataChan <-chan int, maxTaskNum int64) {
    w := semaphore.NewWeighted(maxTaskNum)

    var wg sync.WaitGroup

    for data := range dataChan {
        // 先获取信号量,如果被消费完则阻塞等待信号量返还
        _ = w.Acquire(context.TODO(), 1)

        wg.Add(1)
        go func(data int) {
            defer wg.Done()

            // 运行完成返还信号量
            defer w.Release(1)

            // do something
            time.Sleep(3 * time.Second)
        }(data)
    }

    wg.Wait()
}

指定处理速度并发处理任务

针对固定个数协程的缺点,另一个思路是借鉴限流器的实现,控制每个时刻最大允许协程数量也达到控制协程数量的目的。这里也提供两种实现思路

自定义令牌池实现

相当于一个简单限流器,指定速度生产令牌,每个任务启动时必须请求到令牌。

// runRateLimitTask 限制每秒允许的最大协程数量,限流器的思路
func runRateLimitTask(dataChan <-chan int) {
    // 初始化令牌池
    tokenPool := make(chan struct{})
    go func() {
        for {
            select {
            // 动态控制令牌生成速度
            case <-time.After(time.Second):
                tokenPool <- struct{}{}
            }
        }
    }()

    var wg sync.WaitGroup

    for data := range dataChan {
        // 先获取令牌,如果被消费完则阻塞等待新令牌产生
        <-tokenPool

        wg.Add(1)
        go func(data int) {
            defer wg.Done()

            // do something
            time.Sleep(3 * time.Second)
        }(data)
    }

    wg.Wait()
}

官方限流器实现

逻辑同上,每个任务启动必须先获取令牌。

// runRateLimitTask2 限制每秒允许的最大协程数量,使用官方限流器
func runRateLimitTask2(dataChan <-chan int) {
    // 初始化令牌池
    limit := rate.Every(time.Second) // 每秒一个
    limiter := rate.NewLimiter(limit, 10)

    var wg sync.WaitGroup

    for data := range dataChan {
        // 先获取令牌,如果被消费完则阻塞等待新令牌产生
        _ = limiter.Wait(context.TODO())

        wg.Add(1)
        go func(data int) {
            defer wg.Done()

            // do something
            time.Sleep(3 * time.Second)
        }(data)
    }

    wg.Wait()
}

协程池并发处理任务

生产业务中,针对复杂业务或者不想那么麻烦,可以直接上协程池。

常用协程池https://github.com/panjf2000/ants,如下实现。

代码上看起来简洁很多,根本原理和动态个数协程控制思路差不多,后续单开一篇文章讲讲协程池的实现。

// runGoroutinePoolTask 使用协程池动态管理协程数量
func runGoroutinePoolTask(dataChan <-chan int, maxTaskNum int) {
    p, _ := ants.NewPool(maxTaskNum)
    defer p.Release()

    var wg sync.WaitGroup

    for _ = range dataChan {
        wg.Add(1)

        // 提交任务,协程池动态管理数量,可以做更多的分配优化策略
        _ = p.Submit(func() {
            defer wg.Done()

            // do something
            time.Sleep(3 * time.Second)
        })

    }

    wg.Wait()
}

参考