尽管Goroutine(协程)非常清轻量,但是本身也是占用资源的,过多协程切换也会带来开销,总之物极必反,这里无限制的开协程的结果只会是Game Over。生产实践中必须考虑控制协程数量
1.简单的可以直接起固定个数协程处理
// runBoundedTask 起maxTaskNum个协程共同处理任务
func runBoundedTask(dataChan <-chan int, maxTaskNum int) {
var wg sync.WaitGroup
wg.Add(maxTaskNum)
for i := 0; i < maxTaskNum; i++ {
go func() {
defer wg.Done()
for data := range dataChan {
func(data int) {
// do something
time.Sleep(3 * time.Second)
}(data)
}
}()
}
wg.Wait()
}
2.复杂点的可以基于Semaphore动态调整处理任务的协程数
// runSemaphoreTask
// 最大同时运行maxTaskNum个任务处理数据
// 使用信号量维持maxTaskNum个信号
func runSemaphoreTask(dataChan <-chan int, maxTaskNum int64) {
w := semaphore.NewWeighted(maxTaskNum)
var wg sync.WaitGroup
for data := range dataChan {
// 先获取信号量,如果被消费完则阻塞等待信号量返还
_ = w.Acquire(context.TODO(), 1)
wg.Add(1)
go func(data int) {
defer wg.Done()
// 运行完成返还信号量
defer w.Release(1)
// do something
time.Sleep(3 * time.Second)
}(data)
}
wg.Wait()
}
3.终极方案直接上协程池
常用协程池https://github.com/panjf2000/ants,如下实现。
// runGoroutinePoolTask 使用协程池动态管理协程数量
func runGoroutinePoolTask(dataChan <-chan int, maxTaskNum int) {
p, _ := ants.NewPool(maxTaskNum)
defer p.Release()
var wg sync.WaitGroup
for _ = range dataChan {
wg.Add(1)
// 提交任务,协程池动态管理数量,可以做更多的分配优化策略
_ = p.Submit(func() {
defer wg.Done()
// do something
time.Sleep(3 * time.Second)
})
}
wg.Wait()
}
更多细节参考