在语言层面上,Go针对CSP模型提供了四种并发模式,分别是创建模式、退出模式、管道模式和超时与取消模式。
一、创建模式
先看一下简单的代码示例:
package main
import "fmt"
type T struct{}
func spawn(f func(v T)) chan T {
c := make(chan T)
go func() {
// 可以在这个携程里面使用c这个channel与main现成进行通信
v := <-c
f(v)
}()
return c
}
func main() {
// 拿到了从spawn函数返回的channel,就可以利用其与spawn函数内的携程进行通信
c := spawn(func(v T) {
fmt.Println(v)
})
t := T{}
c <- t
}
通过在函数内部创建goroutine,并让其对函数创建的channel变量做处理,然后再通过函数将channel变量返回给其他goroutine,这样两个goroutine之间就通过这个channel建立起了联系,这就是go语言中最常见的goroutine创建模式。
二、退出模式
退出模式又细分为以下几种:
1. 分离模式
对于分离模式的goroutine,其在创建启动之后便与创建它的goroutine彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这种goroutine主要有以下两个常见的用途:
- 一次性任务:
创建一个新的goroutine用来执行一个简单的任务,执行后即退出。
func DialContext(ctx context.Context, network, address string) error {
// ...
oldCancel := make(chan struct{})
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():
}
}()
ctx = subCtx
// ...
return nil
}
- 常驻后台的任务:
其实现通常使用for或者for select代码形式,是goroutine常驻后台不退出,并多以定时器或者事件进行驱动执行。
2. join模式
在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。携程通过channel也可以实现。
- 等待一个goroutine退出
package main
import (
"fmt"
"time"
)
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spwan(f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
go func() {
f(args...)
c <- struct{}{}
}()
return c
}
func main() {
done := spwan(worker, 5)
fmt.Println("spwan a worker goroutine")
// 这里读取channel会把主线程阻塞,等待goroutine执行完worker函数即可
<-done
fmt.Println("worker done")
}
- 获取goroutine的退出状态
如果新goroutine的创建者不仅要等待goroutine的退出,还要精准获取其结束状态,同样可以通过自定义类型的channel来实现这一场景需求。
package main
import (
"errors"
"fmt"
"time"
)
var OK = errors.New("ok")
func worker(args ...interface{}) error {
if len(args) == 0 {
return errors.New("invalid args")
}
interval, ok := args[0].(int)
if !ok {
return errors.New("invalid internal arg")
}
time.Sleep(time.Second * (time.Duration(interval)))
return nil
}
func spwan(f func(args ...interface{}) error, args ...interface{}) chan error {
// 将channel由struct{}{}改成error类型就可以在不同线程之间传递工作函数最终的返回状态
c := make(chan error)
go func() {
c <- f(args...)
}()
return c
}
func main() {
done := spwan(worker, 5)
fmt.Println("spwan worker1")
// 阻塞主协程,直到工作协程向channel中写入数据
err := <-done
fmt.Println("worker1 done:", err)
done = spwan(worker)
fmt.Println("spwan worker2")
// 阻塞主协程,直到工作协程向channel中写入数据
err = <-done
fmt.Println("worker2 done:", err)
}
- 等待多个goroutine退出
在有些场景中,goroutine的创建者可能会创建不止一个goroutine,并且需要等待全部新goroutine退出。可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式:
package main
import (
"fmt"
"sync"
"time"
)
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
name := fmt.Sprintf("worker-%d", i)
f(args...)
fmt.Println(name, "done")
wg.Done()
}(i)
}
go func() {
wg.Wait()
c <- struct{}{}
}()
return c
}
func main() {
done := spawnGroup(5, worker, 3)
fmt.Println("spwan a group workers")
<-done
fmt.Println("group workers done")
}
- 支持超时机制等待
有时候,我们不想无限阻塞等待所有新创建goroutine的退出,而是仅等待一段合理的时间。如果在这段时间内goroutine没有退出,则创建者会继续向下执行或主动退出。下面的示例代码在等待多个goroutine退出的例子之上增加了超时机制:
func main() {
done := spawnGroup(5, worker, 30)
println("spawn a group of workers")
timer := time.NewTimer(time.Second * 5)
defer timer.Stop()
select {
case <-timer.C:
println("wait group workers exit timeout!")
case <-done:
println("group workers done")
}
}
3. 通知并等待模式(notify-and-wait)
很多时候,goroutine创建者需要主动通知那些新的goroutine退出,尤其是当main goroutine作为创建者的时候。
- 通知并等待一个goroutine退出
我们从一个简单的“通知并等待一个goroutine退出”场景入手
package main
import (
"fmt"
"time"
)
func worker(j int) {
time.Sleep(time.Second * (time.Duration(j)))
}
func spwan(f func(int)) chan string {
quit := make(chan string)
go func() {
var job chan int
for {
select {
case j := <-job:
f(j)
case <-quit:
quit <- "ok"
}
}
}()
return quit
}
func main() {
quit := spwan(worker)
fmt.Println("spawn a worker goroutine")
time.Sleep(5 * time.Second)
fmt.Println("notify the worker to exit...")
quit <- "exit"
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-quit:
fmt.Println("worker done:", status)
case <-timer.C:
fmt.Println("wait worker exit timeout")
}
}
- 通知并等待多个goroutine退出
package main
import (
"fmt"
"sync"
"time"
)
func worker(j int) {
time.Sleep(time.Second * (time.Duration(j)))
}
func spawnGroup(n int, f func(int)) chan struct{} {
quit := make(chan struct{})
job := make(chan int)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
name := fmt.Sprintf("worker-%d:", i)
for {
j, ok := <-job
if !ok {
fmt.Println(name, "done")
return
}
worker(j)
}
}(i)
}
go func() {
<-quit
// 广播给所有的携程
close(job)
wg.Wait()
// 回复主线程已确认退出
quit <- struct{}{}
}()
return quit
}
func main() {
quit := spawnGroup(5, worker)
fmt.Println("spawn a group of workers")
time.Sleep(5 * time.Second)
fmt.Println("notify the worker group to exit...")
// 通知监听协程退出
quit <- struct{}{}
timer := time.NewTicker(time.Second * 5)
defer timer.Stop()
select {
case <-timer.C:
fmt.Println("wait group workers exit timeout!")
case <-quit:
fmt.Println("group workers done")
}
}
上面这段示例代码的关键是创建者直接利用了worker goroutine接收任务(job)的channel来广播退出通知,而实现这一广播的代码就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过“comma ok”模式获取的ok值为false,也就表明该channel已经被关闭,于是worker goroutine执行退出逻辑(退出前wg.Done()被执行)。
4. 退出模式的应用
package main
import (
"fmt"
"sync"
"time"
)
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
name := fmt.Sprintf("worker-%d", i)
f(args...)
fmt.Println(name, "done")
wg.Done()
}(i)
}
go func() {
wg.Wait()
c <- struct{}{}
}()
return c
}
func main() {
done := spawnGroup(5, worker, 3)
fmt.Println("spwan a group workers")
<-done
fmt.Println("group workers done")
}
参考:《Go语言精进之路:从新手到高手的编程思想、方法和技巧1》