前面那篇文章说了协程,协程是并发编程的基础,而管道(channel)则是并发中协程之间沟通的桥梁,很多时候我们启动一个协程去执行完一个操作,执行操作之后我们需要返回结果,或者多个协程之间需要相互协作,先看一个例子:

package mainimport ("fmt""sync")var wg = sync.WaitGroup{}func main() {wg.Add(2)go say("Hello")go say("World")wg.Wait()}func say(s string) {fmt.Println("I say: "+ s)wg.Done()}

案例特别简单,我们开启2个协程,打印了一个Hello 和 World,假设我们现在不需要直接打印结果,我们需要say函数把结果返回给主进程,该怎么做呢?

直接return肯定是行不通的,这时候就需要使用管道来通信了:

package mainimport"fmt"var ch = make(chan string)func main() {go say("Hello")go say("World")fori := 0; i < 2; i++ {fmt.Println(<-ch)}close(ch)}func say(s string) {ch <-"I say: "+ s}

在这个例子里面,我首先定义了一个全局的管道,在say函数里面往管道里面送,然后在主进程使接受管道的结果并打印出来。

管道的初始化使用make关键字(make在Go里面只用于通道chan、映射map以及切片slice的内存创建),发送和接受消息都使用 "<-" 符号。


阻塞

管道在功能上有点像消息队列,我们可以一边推消息,另一边取消息,但是channel的使用还是有很大限制的,有一个很重要的特性就是阻塞。

阻塞的意思就是默认情况下,channel就像是一个容量为0的队列,如果要想往队列里面送数据,那么必须有人接受,不然就会报错,产生死锁!

package mainfunc main() {var ch = make(chanint)ch <- 1ch <- 2ch <- 3}//运行结果:fatal error: all goroutines are asleep - deadlock!正确写法:package mainimport"fmt"func main() {var ch = make(chanint)go func() {ch <- 1ch <- 2ch <- 3}()fori := 0; i < 3; i++ {fmt.Println(<-ch)}}

不过有例外,管理可以有一个缓存,比如说缓存为3个,意味着你可以先往管道里面放3个元素,然后再去接受,所以上面的例子也可以这些:

package mainimport"fmt"func main() {var ch = make(chanint, 3)ch <- 1ch <- 2ch <- 3//ch <- 4fori := 0; i < 3; i++ {fmt.Println(<-ch)}}

关闭

还有一点,正常情况管道需要关闭,不然会产生内存泄露,使用close()函数,正常来说,管道必须由发送者关闭,如果尝试从一个被关闭的管道接受值,则会返回该类型的零值,如果尝试向被关闭的管道输送值,则会报错!这些倒是很容易理解,看一些例子:

package mainimport"fmt"func main() {var ch = make(chanint)go func() {ch <- 1ch <- 2ch <- 3close(ch)}()//尝试从已关闭的管道接收数据,1 2 3 0 0fori := 0; i < 5; i++ {fmt.Println(<-ch)}//手动判断管道是否关闭for{ifv, ok := <-ch; ok {fmt.Println(v)}else{break}}//采用for range遍历,可以自动判断forv := range ch {fmt.Println(v)}}

为了避免这种错误,有2种解决方案,一种是在每次从管道获取值的时候判断管理是否关闭,另一种则是使用for range 遍历。

设计原则

在Go的设计哲学里面,有这么一句话:

Do not communicate by sharing memory; instead, share memory by communicating.(不要通过共享内存来通信,而应通过通信来共享内存。)

这句话怎么理解呢?它指的是我们在编程开发中经常会遇到多个线程操作同一块数据的情况,举个例子,我们需要在多个协程给一个值进行加减:

package mainimport ("fmt""sync")var num = 0func main() {var done = make(chanint)go func() {fori := 0; i < 20000; i++ {add(1)}done <- 0}()go func() {fori := 0; i < 10000; i++ {add(2)}done <- 0}()<-done<-donefmt.Printf("num = %d\n", num)}func add(nint) {num = num + n}
--race
jwang@jwang:~/Documents/Work/MyBlog/Other$ go run --race main.go ================== WARNING: DATA RACE Read at 0x0000005f1648 by goroutine 7: main.main.func2() /home/jwang/Documents/Work/MyBlog/Other/main.go:37 +0x48 Previous write at 0x0000005f1648 by goroutine 6: main.main.func1() /home/jwang/Documents/Work/MyBlog/Other/main.go:37 +0x69 Goroutine 7 (running) created at: main.main() /home/jwang/Documents/Work/MyBlog/Other/main.go:23 +0x8b Goroutine 6 (running) created at: main.main() /home/jwang/Documents/Work/MyBlog/Other/main.go:16 +0x69 ================== num = 26447 Found 1 data race(s) exit status 66

本质上这是通过共享内存来通信,当然也不是说一定不能使用共享内存,想要解决这个问题也很简单,我们只需要给say函数里面加锁即可,关键代码如下:

var locker = sync.Mutex{} func add() { locker.Lock() defer locker.Unlock() num++ }

通过一把排它锁,我们保证同一时刻只能有一个协程操作该变量,就不会出现“data race”了,运行结果如下:

在Go里面不推荐使用共享内存这种方式,推荐使用channel来解决这些问题,代码如下:

package mainimport ("fmt")var num = 0var (ch   = make(chanint)done = make(chanint)//控制流程)func main() {go func() {fori := 0; i < 200000; i++ {ch <- 1}done <- 0}()go func() {fori := 0; i < 100000; i++ {ch <- 2}done <- 0}()go add()<-done<-doneclose(ch)<-donefmt.Printf("num = %d\n", num)}func add() {forv := range ch {num = num + v}done <- 0}

这种写法里面,我使用了2个channel,一个是用来接受计算的值,在add方式里面获取channel的值进行计算,因为channel是先进先出,类似队列,所以不会出现数据竞争的情况。

另外一个channel是用来控制整个执行流程的,这个利用了channel的阻塞特性。

不过上面这段代码依然有个问题,我使用了3个全局的变量,实际上也算是共享内存,下面看一个不使用全局变量的版本:

package mainimport ("fmt")func main() {var (num  = 0ch   = make(chanint)done = make(chanint))go func() {fori := 0; i < 200000; i++ {ch <- 1}done <- 0}()go func() {fori := 0; i < 100000; i++ {ch <- 2}done <- 0}()go add(&num, ch, done)<-done<-doneclose(ch)<-donefmt.Printf("num = %d\n", num)}func add(num *int, ch chanint, done chanint) {forv := range ch {*num = *num + v}done <- 0}

我们可以channel当作参数传递给函数,效果也是一样的,避免使用了全局变量,也不是说不能用全局变量,在并发编程的时候使用全局变量一定要考虑数据竞争的情况。

生产者消费者模型

最后,说一下生产者和消费者的案例,生产者消费者这种模型在实际开发中经常用到,比如消息队列,看一下go怎么实现:

package mainimport ("fmt""strconv""time")func main() {var ch = make(chan string)go producer(ch)go consumer(ch)//阻塞,防止进程退出for{select {}}}func producer(ch chan string) {ticker :=time.NewTicker(time.Millisecond * 500)for{select {case<-ticker.C:ch <-"msg: "+ strconv.Itoa(int(time.Now().Unix()))}}}func consumer(ch chan string) {fori := 0; i < 5; i++ {go func(iint) {for{res := <-chfmt.Printf("消费者 %d: %s\n", i, res)}}(i)}}在这个案例里面,我创建了一个生产者,多个消费者的模型,代码也非常简单,容易理解。