在语言层面上,Go针对CSP模型提供了四种并发模式,分别是创建模式、退出模式、管道模式和超时与取消模式

一、创建模式

  先看一下简单的代码示例:

package main

import "fmt"

type T struct{}

func spawn(f func(v T)) chan T {
	c := make(chan T)
	go func() {
		// 可以在这个携程里面使用c这个channel与main现成进行通信
		v := <-c
		f(v)
	}()

	return c
}

func main() {
	// 拿到了从spawn函数返回的channel,就可以利用其与spawn函数内的携程进行通信
	c := spawn(func(v T) {
		fmt.Println(v)
	})
	
	t := T{}
	c <- t
}

  通过在函数内部创建goroutine,并让其对函数创建的channel变量做处理,然后再通过函数将channel变量返回给其他goroutine,这样两个goroutine之间就通过这个channel建立起了联系,这就是go语言中最常见的goroutine创建模式。

二、退出模式

  退出模式又细分为以下几种:

1. 分离模式

  对于分离模式的goroutine,其在创建启动之后便与创建它的goroutine彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这种goroutine主要有以下两个常见的用途:

  • 一次性任务:
    创建一个新的goroutine用来执行一个简单的任务,执行后即退出。
func DialContext(ctx context.Context, network, address string) error {
	// ...
	oldCancel := make(chan struct{})
	subCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	go func() {
		select {
		case <-oldCancel:
			cancel()
		case <-subCtx.Done():
		}
	}()

	ctx = subCtx
	// ...

	return nil
}
  • 常驻后台的任务:
    其实现通常使用for或者for select代码形式,是goroutine常驻后台不退出,并多以定时器或者事件进行驱动执行。

2. join模式

  在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。携程通过channel也可以实现。

  • 等待一个goroutine退出
package main

import (
	"fmt"
	"time"
)

func worker(args ...interface{}) {
	if len(args) == 0 {
		return
	}

	interval, ok := args[0].(int)
	if !ok {
		return
	}

	time.Sleep(time.Second * (time.Duration(interval)))
}

func spwan(f func(args ...interface{}), args ...interface{}) chan struct{} {
	c := make(chan struct{})
	go func() {
		f(args...)
		c <- struct{}{}
	}()

	return c
}

func main() {
	done := spwan(worker, 5)
	fmt.Println("spwan a worker goroutine")
	// 这里读取channel会把主线程阻塞,等待goroutine执行完worker函数即可
	<-done
	fmt.Println("worker done")
}
  • 获取goroutine的退出状态
    如果新goroutine的创建者不仅要等待goroutine的退出,还要精准获取其结束状态,同样可以通过自定义类型的channel来实现这一场景需求。
package main

import (
	"errors"
	"fmt"
	"time"
)

var OK = errors.New("ok")

func worker(args ...interface{}) error {
	if len(args) == 0 {
		return errors.New("invalid args")
	}

	interval, ok := args[0].(int)
	if !ok {
		return errors.New("invalid internal arg")
	}

	time.Sleep(time.Second * (time.Duration(interval)))
	return nil
}

func spwan(f func(args ...interface{}) error, args ...interface{}) chan error {
	// 将channel由struct{}{}改成error类型就可以在不同线程之间传递工作函数最终的返回状态
	c := make(chan error)
	go func() {
		c <- f(args...)
	}()

	return c
}

func main() {
	done := spwan(worker, 5)
	fmt.Println("spwan worker1")
	// 阻塞主协程,直到工作协程向channel中写入数据
	err := <-done
	fmt.Println("worker1 done:", err)
	done = spwan(worker)
	fmt.Println("spwan worker2")
	// 阻塞主协程,直到工作协程向channel中写入数据
	err = <-done
	fmt.Println("worker2 done:", err)
}
  • 等待多个goroutine退出
    在有些场景中,goroutine的创建者可能会创建不止一个goroutine,并且需要等待全部新goroutine退出。可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式:
package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(args ...interface{}) {
	if len(args) == 0 {
		return
	}

	interval, ok := args[0].(int)
	if !ok {
		return
	}

	time.Sleep(time.Second * (time.Duration(interval)))
}

func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
	c := make(chan struct{})
	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(i int) {
			name := fmt.Sprintf("worker-%d", i)
			f(args...)
			fmt.Println(name, "done")
			wg.Done()
		}(i)
	}

	go func() {
		wg.Wait()
		c <- struct{}{}
	}()

	return c
}

func main() {
	done := spawnGroup(5, worker, 3)
	fmt.Println("spwan a group workers")
	<-done
	fmt.Println("group workers done")
}
  • 支持超时机制等待
    有时候,我们不想无限阻塞等待所有新创建goroutine的退出,而是仅等待一段合理的时间。如果在这段时间内goroutine没有退出,则创建者会继续向下执行或主动退出。下面的示例代码在等待多个goroutine退出的例子之上增加了超时机制:
func main() {
    done := spawnGroup(5, worker, 30)
    println("spawn a group of workers")
    
    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-done:
        println("group workers done")
    }
}

3. 通知并等待模式(notify-and-wait)

  很多时候,goroutine创建者需要主动通知那些新的goroutine退出,尤其是当main goroutine作为创建者的时候。

  • 通知并等待一个goroutine退出
    我们从一个简单的“通知并等待一个goroutine退出”场景入手
package main

import (
	"fmt"
	"time"
)

func worker(j int) {
	time.Sleep(time.Second * (time.Duration(j)))
}

func spwan(f func(int)) chan string {
	quit := make(chan string)
	go func() {
		var job chan int
		for {
			select {
			case j := <-job:
				f(j)
			case <-quit:
				quit <- "ok"
			}
		}
	}()

	return quit
}

func main() {
	quit := spwan(worker)
	fmt.Println("spawn a worker goroutine")
	time.Sleep(5 * time.Second)

	fmt.Println("notify the worker to exit...")
	quit <- "exit"

	timer := time.NewTimer(time.Second * 10)
	defer timer.Stop()
	select {
	case status := <-quit:
		fmt.Println("worker done:", status)
	case <-timer.C:
		fmt.Println("wait worker exit timeout")
	}
}
  • 通知并等待多个goroutine退出
package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(j int) {
	time.Sleep(time.Second * (time.Duration(j)))
}

func spawnGroup(n int, f func(int)) chan struct{} {
	quit := make(chan struct{})
	job := make(chan int)
	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			name := fmt.Sprintf("worker-%d:", i)
			for {
				j, ok := <-job
				if !ok {
					fmt.Println(name, "done")
					return
				}

				worker(j)
			}
		}(i)
	}

	go func() {
		<-quit
		// 广播给所有的携程
		close(job)
		wg.Wait()
		// 回复主线程已确认退出
		quit <- struct{}{}
	}()

	return quit
}

func main() {
	quit := spawnGroup(5, worker)
	fmt.Println("spawn a group of workers")

	time.Sleep(5 * time.Second)
	fmt.Println("notify the worker group to exit...")
	// 通知监听协程退出
	quit <- struct{}{}

	timer := time.NewTicker(time.Second * 5)
	defer timer.Stop()
	select {
	case <-timer.C:
		fmt.Println("wait group workers exit timeout!")
	case <-quit:
		fmt.Println("group workers done")
	}
}

  上面这段示例代码的关键是创建者直接利用了worker goroutine接收任务(job)的channel来广播退出通知,而实现这一广播的代码就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过“comma ok”模式获取的ok值为false,也就表明该channel已经被关闭,于是worker goroutine执行退出逻辑(退出前wg.Done()被执行)。

4. 退出模式的应用

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(args ...interface{}) {
	if len(args) == 0 {
		return
	}

	interval, ok := args[0].(int)
	if !ok {
		return
	}

	time.Sleep(time.Second * (time.Duration(interval)))
}

func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
	c := make(chan struct{})
	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(i int) {
			name := fmt.Sprintf("worker-%d", i)
			f(args...)
			fmt.Println(name, "done")
			wg.Done()
		}(i)
	}

	go func() {
		wg.Wait()
		c <- struct{}{}
	}()

	return c
}

func main() {
	done := spawnGroup(5, worker, 3)
	fmt.Println("spwan a group workers")
	<-done
	fmt.Println("group workers done")
}

参考:《Go语言精进之路:从新手到高手的编程思想、方法和技巧1》