尽管Goroutine(协程)非常清轻量,但是本身也是占用资源的,过多协程切换也会带来开销,总之物极必反,这里无限制的开协程的结果只会是Game Over。生产实践中必须考虑控制协程数量

1.简单的可以直接起固定个数协程处理

// runBoundedTask 起maxTaskNum个协程共同处理任务
func runBoundedTask(dataChan <-chan int, maxTaskNum int) {
    var wg sync.WaitGroup
    wg.Add(maxTaskNum)

    for i := 0; i < maxTaskNum; i++ {
        go func() {
            defer wg.Done()

            for data := range dataChan {
                func(data int) {

                    // do something
                    time.Sleep(3 * time.Second)
                }(data)
            }
        }()
    }

    wg.Wait()
}

2.复杂点的可以基于Semaphore动态调整处理任务的协程数

// runSemaphoreTask 
// 最大同时运行maxTaskNum个任务处理数据
// 使用信号量维持maxTaskNum个信号
func runSemaphoreTask(dataChan <-chan int, maxTaskNum int64) {
    w := semaphore.NewWeighted(maxTaskNum)

    var wg sync.WaitGroup

    for data := range dataChan {
        // 先获取信号量,如果被消费完则阻塞等待信号量返还
        _ = w.Acquire(context.TODO(), 1)

        wg.Add(1)
        go func(data int) {
            defer wg.Done()

            // 运行完成返还信号量
            defer w.Release(1)

            // do something
            time.Sleep(3 * time.Second)
        }(data)
    }

    wg.Wait()
}

3.终极方案直接上协程池

常用协程池https://github.com/panjf2000/ants,如下实现。

// runGoroutinePoolTask 使用协程池动态管理协程数量
func runGoroutinePoolTask(dataChan <-chan int, maxTaskNum int) {
    p, _ := ants.NewPool(maxTaskNum)
    defer p.Release()

    var wg sync.WaitGroup

    for _ = range dataChan {
        wg.Add(1)

        // 提交任务,协程池动态管理数量,可以做更多的分配优化策略
        _ = p.Submit(func() {
            defer wg.Done()

            // do something
            time.Sleep(3 * time.Second)
        })

    }

    wg.Wait()
}


更多细节参考