在go语言中,大家都很熟悉如何使用channel来shutdown一个正在运行中的协程,本质就是向运行中的goroutine传递关闭的消息,无论是使用共享内存还是channel都可以传递,这块的区别更像csp和actor的区别,一个关注点在处理者上,另一个在消息上。
// 通过cancel传递消息来shutdown协程
cancel := make(chan struct{})
go func() {
select {
case <-cancel:
break
}
return
}()
02
—
控制权反转
oklog/run 使用来名为actor的结构体,actor包含主要业务逻辑的执行函数以及相对应的一个shutdown函数,多个actor为一个group,每个actor提供自己的运行和关闭接口(就是两个函数),并将控制的权力交给group,来实现多个协程的统一管理。
// oklog/run的使用
interrupt := errors.New("interrupt")
var g run.Group
g.Add(func() error { return interrupt }, func(error) {})
cancel := make(chan struct{})
g.Add(func() error { <-cancel; return nil }, func(error) { close(cancel) })
res := make(chan error)
go func() { res <- g.Run() }()
select {
case err := <-res:
if want, have := interrupt, err; want != have {
t.Errorf("want %v, have %v", want, have)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("timeout")
}
03
—
源码及注释
package run
// 一个actor的集合
type Group struct {
actors []actor
}
// 向集合中添加actor
func (g *Group) Add(execute func() error, interrupt func(error)) {
g.actors = append(g.actors, actor{execute, interrupt})
}
// 并发运行集合中所有的actor
func (g *Group) Run() error {
if len(g.actors) == 0 {
return nil
}
errors := make(chan error, len(g.actors))
for _, a := range g.actors {
go func(a actor) {
errors <- a.execute()
}(a)
}
// 等待actor的报错
err := <-errors
// 一旦任意actor报错,使用每个actor的关闭接口,关闭所有的actor
for _, a := range g.actors {
a.interrupt(err)
}
// 等待所有的函数返回再交出控制权
for i := 1; i < cap(errors); i++ {
<-errors
}
// 返回初识错误
return err
}
// actor,为协程提供了关闭接口
type actor struct {
execute func() error
interrupt func(error)
}
04
—
扩展