在go语言中,大家都很熟悉如何使用channel来shutdown一个正在运行中的协程,本质就是向运行中的goroutine传递关闭的消息,无论是使用共享内存还是channel都可以传递,这块的区别更像csp和actor的区别,一个关注点在处理者上,另一个在消息上。
// 通过cancel传递消息来shutdown协程cancel := make(chan struct{})go func() {select {case <-cancel:break}return}()
02
—
控制权反转
oklog/run 使用来名为actor的结构体,actor包含主要业务逻辑的执行函数以及相对应的一个shutdown函数,多个actor为一个group,每个actor提供自己的运行和关闭接口(就是两个函数),并将控制的权力交给group,来实现多个协程的统一管理。
// oklog/run的使用interrupt := errors.New("interrupt")var g run.Groupg.Add(func() error { return interrupt }, func(error) {})cancel := make(chan struct{})g.Add(func() error { <-cancel; return nil }, func(error) { close(cancel) })res := make(chan error)go func() { res <- g.Run() }()select {case err := <-res:if want, have := interrupt, err; want != have {t.Errorf("want %v, have %v", want, have)}case <-time.After(100 * time.Millisecond):t.Errorf("timeout")}
03
—
源码及注释
package run// 一个actor的集合type Group struct {actors []actor}// 向集合中添加actorfunc (g *Group) Add(execute func() error, interrupt func(error)) {g.actors = append(g.actors, actor{execute, interrupt})}// 并发运行集合中所有的actorfunc (g *Group) Run() error {if len(g.actors) == 0 {return nil}errors := make(chan error, len(g.actors))for _, a := range g.actors {go func(a actor) {errors <- a.execute()}(a)}// 等待actor的报错err := <-errors// 一旦任意actor报错,使用每个actor的关闭接口,关闭所有的actorfor _, a := range g.actors {a.interrupt(err)}// 等待所有的函数返回再交出控制权for i := 1; i < cap(errors); i++ {<-errors}// 返回初识错误return err}// actor,为协程提供了关闭接口type actor struct {execute func() errorinterrupt func(error)}
04
—
扩展