协程的同步:关闭通道-测试阻塞的通道
通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者需要关闭通道,接收者永远不会需要。
继续看示例 goroutine2.go:我们如何在通道的 sendData() 完成的时候发送一个信号,getData() 又如何检测到通道是否关闭或阻塞?
第一个可以通过函数 close(ch) 来完成:这个将通道标记为无法通过发送操作 <- 接受更多的值;给已经关闭的通道发送或者再次关闭都会导致运行时的 panic。在创建一个通道后使用 defer 语句是个不错的办法(类似这种情况):
ch := make(chan float64)
defer close(ch)
第二个问题可以使用逗号,ok 操作符:用来检测通道是否被关闭。
如何来检测可以收到没有被阻塞(或者通道没有被关闭)?
v, ok := <-ch // ok is true if v received value
通常和 if 语句一起使用:
if v, ok := <-ch; ok {
process(v)
}
或者在 for 循环中接收的时候,当关闭或者阻塞的时候使用 break:
v, ok := <-ch
if !ok {
break
}
process(v)
在示例程序中使用这些可以改进为版本 goroutine3.go,输出相同。
实现非阻塞通道的读取,需要使用 select。
package main
import "fmt"
func main() {
ch := make(chan string)
go sendData(ch)
getData(ch)
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
close(ch)
}
func getData(ch chan string) {
for {
input, open := <-ch
if !open {
break
}
fmt.Printf("%s ", input)
}
}
改变了以下代码:
现在只有 sendData() 是协程,getData() 和 main() 在同一个线程中:
go sendData(ch)
getData(ch)
在 sendData() 函数的最后,关闭了通道:
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
close(ch)
}
在 for 循环的 getData() 中,在每次接收通道的数据之前都使用 if !open 来检测:
for {
input, open := <-ch
if !open {
break
}
fmt.Printf("%s ", input)
}
使用 for-range 语句来读取通道是更好的办法,因为这会自动检测通道是否关闭:
for input := range ch {
process(input)
}
关于通道的关闭的小结
- 只有接收方goroutine所有的数据都发送完毕后才会关闭
- 通道是种类型,是可以被垃圾回收机制回收的;通道的关闭不是必须的
- 对一个关闭的通道再发送值就会导致panic
- 对一个关闭的通道进行接收会一直获取值直到通道为空。
- 对一个关闭的并且没有值的通道执行接收操作,会得到对应类型的零值。
- 关闭一个已经关闭的通道会导致panic。
select {
case u:= <- ch1:
...
case v:= <- ch2:
...
...
default: // no value ready to be received
...
}
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go pump1(ch1)
go pump2(ch2)
go suck(ch1, ch2)
time.Sleep(1e9)
}
func pump1(ch chan int) {
for i := 0; ; i++ {
ch <- i * 2
}
}
func pump2(ch chan int) {
for i := 0; ; i++ {
ch <- i + 5
}
}
func suck(ch1, ch2 chan int) {
for {
select {
case v := <-ch1:
fmt.Printf("Received on channel 1: %d\n", v)
case v := <-ch2:
fmt.Printf("Received on channel 2: %d\n", v)
}
}
}
Received on channel 2: 5
Received on channel 2: 6
Received on channel 1: 0
Received on channel 2: 7
Received on channel 2: 8
Received on channel 2: 9
Received on channel 2: 10
Received on channel 1: 2
Received on channel 2: 11
...
Received on channel 2: 47404
Received on channel 1: 94346
Received on channel 1: 94348
type Ticker struct {
C <-chan Time // the channel on which the ticks are delivered.
// contains filtered or unexported fields
...
}
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
...
case v:= <-ch2:
...
case <-ticker.C:
logState(status) // call some logging function logState
default: // no value ready to be received
...
}
import "time"
rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
<- chRate // rate limit our Service.Method RPC calls
go client.Call("Service.Method", req, ...)
}
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // one second
timeout <- true
}()
select {
case <-ch:
// a read from ch has occured
case <-timeout:
// the read from ch has timed out
break
}
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work) // start the goroutine for that work
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("Work failed with %s in %v", err, work)
}
}()
do(work)
}