背景
使用go关键字就可以方便快捷的创建一个goroutine,受限于服务器硬件内存大小,如果不对goroutine数量进行限制,会出现Out of Memory错误。
通过协程池限制goroutine数一个有效避免泄漏的手段,但是自己手动实现一个协程池,总是会兼顾不到各种场景,比如释放,处理panic,动态扩容等。那么ants是公认的优秀实现协程池。
写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 ants,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能。
什么是ants
github:https://github.com/panjf2000/ants/blob/master/README_ZH.md
ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
ants的 Goroutine Pool 的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。
🚀 功能:
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
- 非阻塞机制
🛠 使用
demo
官方demo
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
ants.Submit
总结:和我们平时使用协程方法基本没有大的差异,基本无学习成本。
第二个用法,使用 ants.NewPoolWithFunc
因为PoolWithFunc这个 Pool 只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于Pool对原生 goroutine 在执行速度和内存消耗的优势更大
查看 ants源码:
pool.go提供了ants.NewPool(创建协程池)、Submit(task func())提交任务
pool_func.go使用NewPoolWithFunc(创建pool对象需要带具体的函数),并且使用Invoke(args interface{})进行调用,arg就是传给池函数func(interface{})的参数