先上代码,有兴趣的可以跑一下
可以实现在有限的线程里执行多个任务,控制内存使用,防止内存飙高
package main
import (
"context"
"fmt"
"strconv"
"sync"
"time"
)
// 定义静态变量 用于外部访问内部方法
var pool *_Pool
// 定义空结构体,相当于类,和下面组合New方法起来就是pool类的静态方法
type _Pool struct {
}
// 定义接口 可传任意参数
type TaskFunc func(args ...interface{})
// 定义任务实体,里面有方法和参数
type Task struct {
f TaskFunc
args interface{}
}
// 定义线程池对象
type WorkPool struct {
Pool chan *Task //定义任务池
WorkCount int //工作线程数量,决定初始化几个goroutine
StopCtx context.Context //上下文
StopCancel context.CancelFunc
WG sync.WaitGroup //阻塞计数器
}
//任务执行
func (t *Task) Execute(args ...interface{}) {
t.f(args...)
}
// 实例化一个新线程池
func (*_Pool) New(workerCount int, len int) *WorkPool {
return &WorkPool{
WorkCount: workerCount,
Pool: make(chan *Task, len),
}
}
// 任务入队
func (w *WorkPool) PushTask(task *Task) {
w.Pool <- task
}
// 任务调度 go协程从channel里取任务执行Execute方法
func (w *WorkPool) Work(wid int) {
for {
select {
case <-w.StopCtx.Done():
w.WG.Done()
fmt.Printf("线程%d 退出执行了
", wid)
return
case t := <-w.Pool:
if t != nil {
t.Execute()
fmt.Printf("f被线程%d执行了,参数为%v
", wid, t.args)
}
}
}
}
//启动线程池,触发任务调度
func (w *WorkPool) Start() *WorkPool {
//定义好worker数量
w.WG.Add(w.WorkCount)
w.StopCtx, w.StopCancel = context.WithCancel(context.Background())
for i := 0; i < w.WorkCount; i++ {
//定义多少个协程来工作
go w.Work(i)
}
return w
}
// 停止执行任务,回收正在执行任务的协程 协程计数器减1 直到变成0退出,否则阻塞
func (w *WorkPool) Stop() {
w.StopCancel()
w.WG.Wait()
}
func main() {
// 任务计数器
taskWg := sync.WaitGroup{}
workerCount := 2
taskCount := 10
// 启动线程池 len=channel通道容量,超过容量生产者阻塞,容量变成0 消费者阻塞
pool := pool.New(workerCount, 5).Start()
taskWg.Add(taskCount)
//构建任务 放入线程池
for i := 0; i < taskCount; i++ {
task := &Task{
args: "zhangSan" + strconv.FormatInt(int64(i), 10),
f: func(args ...interface{}) {
time.Sleep(time.Second)
taskWg.Done() // 任务完成计数器减一
},
}
pool.PushTask(task)
}
fmt.Println("任务入队完成")
//等待任务执行完成
taskWg.Wait()
// 回收资源
close(pool.Pool)
fmt.Println("任务全部执行完成")
}
运行效果
f被线程0执行了,参数为zhangSan0 f被线程1执行了,参数为zhangSan1 f被线程1执行了,参数为zhangSan3 f被线程0执行了,参数为zhangSan2 任务入队完成 f被线程0执行了,参数为zhangSan5 f被线程1执行了,参数为zhangSan4 f被线程1执行了,参数为zhangSan7 f被线程0执行了,参数为zhangSan6 f被线程1执行了,参数为zhangSan8 任务全部执行完成
里面代码注释的很详细了,就不赘述了