在go语言中,大家都很熟悉如何使用channel来shutdown一个正在运行中的协程,本质就是向运行中的goroutine传递关闭的消息,无论是使用共享内存还是channel都可以传递,这块的区别更像csp和actor的区别,一个关注点在处理者上,另一个在消息上。


  // 通过cancel传递消息来shutdown协程
  cancel := make(chan struct{})
go func() {
select {
case <-cancel:
break
}
return
}()



02


控制权反转


oklog/run 使用来名为actor的结构体,actor包含主要业务逻辑的执行函数以及相对应的一个shutdown函数,多个actor为一个group,每个actor提供自己的运行和关闭接口(就是两个函数),并将控制的权力交给group,来实现多个协程的统一管理。

  // oklog/run的使用
interrupt := errors.New("interrupt")
  var g run.Group
g.Add(func() error { return interrupt }, func(error) {})
cancel := make(chan struct{})
g.Add(func() error { <-cancel; return nil }, func(error) { close(cancel) })
res := make(chan error)
go func() { res <- g.Run() }()
select {
case err := <-res:
if want, have := interrupt, err; want != have {
t.Errorf("want %v, have %v", want, have)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("timeout")
}



03


源码及注释


package run


// 一个actor的集合
type Group struct {
actors []actor
}


// 向集合中添加actor
func (g *Group) Add(execute func() error, interrupt func(error)) {
g.actors = append(g.actors, actor{execute, interrupt})
}


// 并发运行集合中所有的actor
func (g *Group) Run() error {
if len(g.actors) == 0 {
return nil
}


errors := make(chan error, len(g.actors))
for _, a := range g.actors {
go func(a actor) {
errors <- a.execute()
}(a)
}


// 等待actor的报错
err := <-errors


  // 一旦任意actor报错,使用每个actor的关闭接口,关闭所有的actor
for _, a := range g.actors {
a.interrupt(err)
}


// 等待所有的函数返回再交出控制权
for i := 1; i < cap(errors); i++ {
<-errors
}


// 返回初识错误
return err
}




// actor,为协程提供了关闭接口
type actor struct {
execute func() error
interrupt func(error)
}




04


扩展