golang实现并发执行多个goroutine,并拿到每个goroutine执行结果,如果其中一个goroutine报错,则结束未执行的goroutine, 还可以设置超时,话不多说上代码
type Concurrent interface {
AddFunc(key string, f func(fRs chan interface{}, mErr chan error))
Run() (map[string]interface{}, error)
}
type ConcurrentImp struct {
Err chan error // 接收错误信息
Rs map[string]interface{} // 接收执行结果
taskNum uint32 // 执行任务数
finishedNum uint32 // 已经完成的goroutine
cancel context.CancelFunc
Ctx context.Context // 顶级context
task map[string]func(k string) // 任务列表
Finished chan int // 是否执行完成
Timer time.Duration // 设置超时
}
func NewConcurrentImp() *ConcurrentImp {
ctx, cancel := context.WithCancel(context.Background())
return &ConcurrentImp{
Ctx: ctx,
Err: make(chan error),
taskNum: 0,
finishedNum: 0,
Finished: make(chan int),
Timer: 30 * time.Second,
cancel: cancel,
task: map[string]func(k string){},
Rs: map[string]interface{}{},
}
}
func (c *ConcurrentImp) Run() (map[string]interface{}, error) {
//执行任务
for k, task := range c.task {
go task(k)
}
// 执行监听
go c.listenTask(c.Ctx)
// 计时器
timer := time.After(c.Timer)
for {
select {
case err, ok := <-c.Err: // 子协程有错误
if ok {
c.cancel()
return nil, err
}
case <-c.Finished: // 所有子程序都正常执行完成
return c.Rs, nil
case <-timer:
c.cancel()
return nil, errors.New("超时")
default:
}
}
}
// 任务执行完成一个,数字减1
func (c *ConcurrentImp) listenTask(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("主协程退出,goroutine listen 退出")
return
default:
if c.finishedNum == c.taskNum && c.finishedNum != 0 {
c.Finished <- 1
return
}
}
}
}
func (c *ConcurrentImp) AddFunc(key string, f func(fRs chan interface{}, mErr chan error)) {
c.task[key] = c.taskFunc(c.Ctx, f)
atomic.AddUint32(&c.taskNum, 1)
}
// 讲方法放入任务列表
func (c *ConcurrentImp) taskFunc(ctx context.Context, f func(fRs chan interface{}, mErr chan error)) func(key string) {
return func(key string) {
rs := make(chan interface{})
go f(rs, c.Err)
for {
select {
case <-ctx.Done():
fmt.Println("主协程退出,goroutine " + key + "退出")
return
case r := <-rs:
c.Rs[key] = r
atomic.AddUint32(&c.finishedNum, 1)
return
default:
}
}
}
}
func main() {
t := time.Now()
var ct Concurrent
ct = NewConcurrentImp()
ct.AddFunc("task1", func(fRs chan interface{}, mErr chan error) {
fRs <- 3
})
ct.AddFunc("task2", func(fRs chan interface{}, mErr chan error) {
//time.Sleep(35 * time.Second)
fRs <- 1
})
ct.AddFunc("task3", func(fRs chan interface{}, mErr chan error) {
fRs <- 2
})
ct.AddFunc("task4", func(fRs chan interface{}, mErr chan error) {
fRs <- 5
//mErr <- errors.New("test err")
})
if rs, err := ct.Run(); err != nil {
log.Fatalln(err)
} else {
fmt.Println(rs)
}
fmt.Println("program cost time:", time.Now().Sub(t))
}
测试结果:
map[task1:3 task2:1 task3:2 task4:5]
program cost time: 535.35µs
有错误返回:
主协程退出,goroutine task3退出
主协程退出,goroutine task1退出
主协程退出,goroutine task4退出
主协程退出,goroutine listen 退出
主协程退出,goroutine task2退出
2022/07/28 14:08:36 test err
望大神指正。。。