golang实现并发执行多个goroutine,并拿到每个goroutine执行结果,如果其中一个goroutine报错,则结束未执行的goroutine, 还可以设置超时,话不多说上代码

type Concurrent interface {
   AddFunc(key string, f func(fRs chan interface{}, mErr chan error))
   Run() (map[string]interface{}, error)
}
type ConcurrentImp struct {
   Err         chan error             // 接收错误信息
   Rs          map[string]interface{} // 接收执行结果
   taskNum     uint32                 // 执行任务数
   finishedNum uint32                 // 已经完成的goroutine
   cancel      context.CancelFunc
   Ctx         context.Context           // 顶级context
   task        map[string]func(k string) // 任务列表
   Finished    chan int                  // 是否执行完成
   Timer       time.Duration             // 设置超时
}

func NewConcurrentImp() *ConcurrentImp {

   ctx, cancel := context.WithCancel(context.Background())
   return &ConcurrentImp{
      Ctx:         ctx,
      Err:         make(chan error),
      taskNum:     0,
      finishedNum: 0,
      Finished:    make(chan int),
      Timer:       30 * time.Second,
      cancel:      cancel,
      task:        map[string]func(k string){},
      Rs:          map[string]interface{}{},
   }
}

func (c *ConcurrentImp) Run() (map[string]interface{}, error) {

   //执行任务
   for k, task := range c.task {
      go task(k)
   }
   // 执行监听
   go c.listenTask(c.Ctx)
   // 计时器
   timer := time.After(c.Timer)
   for {
      select {
      case err, ok := <-c.Err: // 子协程有错误
         if ok {
            c.cancel()
            return nil, err
         }
      case <-c.Finished: // 所有子程序都正常执行完成
         return c.Rs, nil

      case <-timer:
         c.cancel()
         return nil, errors.New("超时")
      default:

      }
   }
}

// 任务执行完成一个,数字减1
func (c *ConcurrentImp) listenTask(ctx context.Context) {
   for {
      select {
      case <-ctx.Done():
         fmt.Println("主协程退出,goroutine listen 退出")
         return
      default:
         if c.finishedNum == c.taskNum && c.finishedNum != 0 {
            c.Finished <- 1
            return
         }
      }
   }
}

func (c *ConcurrentImp) AddFunc(key string, f func(fRs chan interface{}, mErr chan error)) {
   c.task[key] = c.taskFunc(c.Ctx, f)
   atomic.AddUint32(&c.taskNum, 1)
}

// 讲方法放入任务列表
func (c *ConcurrentImp) taskFunc(ctx context.Context, f func(fRs chan interface{}, mErr chan error)) func(key string) {
   return func(key string) {
      rs := make(chan interface{})
      go f(rs, c.Err)
      for {
         select {
         case <-ctx.Done():
            fmt.Println("主协程退出,goroutine " + key + "退出")
            return
         case r := <-rs:
            c.Rs[key] = r
            atomic.AddUint32(&c.finishedNum, 1)
            return
         default:

         }
      }
   }
}

func main() {
   t := time.Now()
   var ct Concurrent
   ct = NewConcurrentImp()
   ct.AddFunc("task1", func(fRs chan interface{}, mErr chan error) {
      fRs <- 3
   })
   ct.AddFunc("task2", func(fRs chan interface{}, mErr chan error) {
      //time.Sleep(35 * time.Second)
      fRs <- 1
   })
   ct.AddFunc("task3", func(fRs chan interface{}, mErr chan error) {
      fRs <- 2
   })
   ct.AddFunc("task4", func(fRs chan interface{}, mErr chan error) {
      fRs <- 5
      //mErr <- errors.New("test err")
   })
   if rs, err := ct.Run(); err != nil {
      log.Fatalln(err)
   } else {
      fmt.Println(rs)
   }
   fmt.Println("program cost time:", time.Now().Sub(t))
}

测试结果:

 map[task1:3 task2:1 task3:2 task4:5]
program cost time: 535.35µs

有错误返回:

主协程退出,goroutine task3退出
主协程退出,goroutine task1退出
主协程退出,goroutine task4退出
主协程退出,goroutine listen 退出
主协程退出,goroutine task2退出
2022/07/28 14:08:36 test err
 

望大神指正。。。