协程的同步:关闭通道-测试阻塞的通道

通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者需要关闭通道,接收者永远不会需要。

继续看示例 goroutine2.go:我们如何在通道的 sendData() 完成的时候发送一个信号,getData() 又如何检测到通道是否关闭或阻塞?

第一个可以通过函数 close(ch) 来完成:这个将通道标记为无法通过发送操作 <- 接受更多的值;给已经关闭的通道发送或者再次关闭都会导致运行时的 panic。在创建一个通道后使用 defer 语句是个不错的办法(类似这种情况):

ch := make(chan float64)
defer close(ch)

第二个问题可以使用逗号,ok 操作符:用来检测通道是否被关闭。

如何来检测可以收到没有被阻塞(或者通道没有被关闭)?

v, ok := <-ch   // ok is true if v received value

通常和 if 语句一起使用:

if v, ok := <-ch; ok {
  process(v)
}

或者在 for 循环中接收的时候,当关闭或者阻塞的时候使用 break:

v, ok := <-ch
if !ok {
  break
}
process(v)

在示例程序中使用这些可以改进为版本 goroutine3.go,输出相同。

实现非阻塞通道的读取,需要使用 select。

package main
import "fmt"
func main() {
    ch := make(chan string)
    go sendData(ch)
    getData(ch)
}
func sendData(ch chan string) {
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
    close(ch)
}
func getData(ch chan string) {
    for {
        input, open := <-ch
        if !open {
            break
        }
        fmt.Printf("%s ", input)
    }
}

改变了以下代码:

现在只有 sendData() 是协程,getData() 和 main() 在同一个线程中:

go sendData(ch)
getData(ch)

在 sendData() 函数的最后,关闭了通道:

func sendData(ch chan string) {
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
    close(ch)
}

在 for 循环的 getData() 中,在每次接收通道的数据之前都使用 if !open 来检测:

for {
        input, open := <-ch
        if !open {
            break
        }
        fmt.Printf("%s ", input)
    }

使用 for-range 语句来读取通道是更好的办法,因为这会自动检测通道是否关闭:

for input := range ch {
      process(input)
}
关于通道的关闭的小结
  • 只有接收方goroutine所有的数据都发送完毕后才会关闭
  • 通道是种类型,是可以被垃圾回收机制回收的;通道的关闭不是必须的
  • 对一个关闭的通道再发送值就会导致panic
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的并且没有值的通道执行接收操作,会得到对应类型的零值。
  • 关闭一个已经关闭的通道会导致panic。
select {
case u:= <- ch1:
        ...
case v:= <- ch2:
        ...
        ...
default: // no value ready to be received
        ...
}
package main
import (
    "fmt"
    "time"
)
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go pump1(ch1)
    go pump2(ch2)
    go suck(ch1, ch2)
    time.Sleep(1e9)
}
func pump1(ch chan int) {
    for i := 0; ; i++ {
        ch <- i * 2
    }
}
func pump2(ch chan int) {
    for i := 0; ; i++ {
        ch <- i + 5
    }
}
func suck(ch1, ch2 chan int) {
    for {
        select {
        case v := <-ch1:
            fmt.Printf("Received on channel 1: %d\n", v)
        case v := <-ch2:
            fmt.Printf("Received on channel 2: %d\n", v)
        }
    }
}
Received on channel 2: 5
Received on channel 2: 6
Received on channel 1: 0
Received on channel 2: 7
Received on channel 2: 8
Received on channel 2: 9
Received on channel 2: 10
Received on channel 1: 2
Received on channel 2: 11
...
Received on channel 2: 47404
Received on channel 1: 94346
Received on channel 1: 94348
type Ticker struct {
    C <-chan Time // the channel on which the ticks are delivered.
    // contains filtered or unexported fields
    ...
}
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
    ...
case v:= <-ch2:
    ...
case <-ticker.C:
    logState(status) // call some logging function logState
default: // no value ready to be received
    ...
}
import "time"
rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
    <- chRate // rate limit our Service.Method RPC calls
    go client.Call("Service.Method", req, ...)
}
timeout := make(chan bool, 1)
go func() {
        time.Sleep(1e9) // one second
        timeout <- true
}()
select {
    case <-ch:
        // a read from ch has occured
    case <-timeout:
        // the read from ch has timed out
        break
}
func server(workChan <-chan *Work) {
    for work := range workChan {
        go safelyDo(work)   // start the goroutine for that work
    }
}
func safelyDo(work *Work) {
    defer func() {
        if err := recover(); err != nil {
            log.Printf("Work failed with %s in %v", err, work)
        }
    }()
    do(work)
}