gogogoroutinechannelgo
go
channelchannel
关闭 channel 的基本原则
channel
channelchannel
这个原则的来源就因为:
channelchannel
如何关闭
defer-recoverypanicrecoverychannelclosegosync.Once
senderreceiver
senderreceiversenderreceiversenderreceiversenderreceiver
senderchannel
func main() {
dataCh := make(chan int, 100)
// sender
go func() {
for i := 0; i < 1000; i++ {
dataCh <- i + 1
}
log.Println("send complete")
close(dataCh)
}()
// receiver
for i := 0; i < 5; i++ {
go func() {
for {
data, ok := <-dataCh
if !ok { // 已关闭
return
}
_ = data
}
}()
}
select {
case <-time.After(time.Second * 5):
fmt.Println(runtime.NumGoroutine())
}
}
stopChreceiverstopChdataChsenderdataCh
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
const Max = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
dataCh := make(chan int)
stopCh := make(chan struct{})
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == Max-1 {
close(stopCh)
return
}
log.Println(value)
}
}()
wgReceivers.Wait()
}
receiverstopChchannelpanictoStop stopCh
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
dataCh := make(chan int)
stopCh := make(chan struct{})
// 这个是添加的中间人,通过它来接收关闭 stopCh 的请求,做一次关闭
// 这里给缓存是 goroutine 启动时机,可能导致 select 选择,导致逻辑问题
toStop := make(chan string, 1)
var stoppedBy string
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
// 由于 select 是随机选择的,所以先在这里尝试得知是否关闭
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
senderreceivertoStopstopChtoStopchannel<-toStoptoStop<-xxsenderreceiverselectdefault
toStopsenderreceiver
...
toStop := make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
...
if value == Max-1 {
toStop <- "receiver#" + id
return
}
...
channel 的注意点
channelmakevar c chan intnil channelnil channel
var c chan int
c <- 1 // panic
channel
c := make(chan int)
close(c)
c <- 1 // panic
channel
c := make(chan int)
close(c)
close(c) // panic
channelclosechannelgc
总结
channel
sendersenderchannelsenderchannelchannel
channelchannel
channel
channel
参考