- 有关channel的问题
- 保证channel关闭一次的方式
- 关闭channel的优雅方式
一、有关channel的问题
Golang面试有关channel的问题肯可能会问到:
Q:如何关闭一个channel
A:close()
Q:怎么判断一个channel已经关闭了
A:(1) value, ok := <- ch // ok为false,则通道已经关闭; (2) <-ch // 如果channel关闭,这里就不会阻塞,而这句代码可以用在select中或直接在执行代码中作为阻塞机制。
而在实际工作中,我也曾遇到过一个项目的Bug,根因就是关闭了已经关闭的channel,而出现了panic。
于是应该考虑使用一种机制,当多个goroutine使用一个channel进行发送或接收工作的时候,保证在goroutine中只执行一次关闭channel的操作。
二、保证channel关闭一次的方式
当以多个goroutine的方式执行下面的两个函数,显然在第一个SafeClose()的goroutine执行便会关闭通道ch,而第二个SafeClose()的goroutine执行便会出现panic。于是这里通过recover()的方式来避免panic。
简单来说,就是忽视关闭已经关闭的channel而产生的panic。显然,这不是一种好的方法。
func SafeSend(ch chan int, value int) (closed bool) {
defer func() {
if recover() != nil {
closed = true
}
}()
ch <- value
return false
}
func SafeClose(ch chan int) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()
close(ch)
return true
}
下面的方法是通过使用到sync.Once,来保证关闭channel的操作只执行一次。使用的方式没什么毛病,在诸多Golang开源项目中,大多使用sync.Once来关闭channel。
type MyChannel struct {
C chan int
once sync.Once
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan int)}
}
func (mc *MyChannel) SafeClose() {
mc.once.Do(func(){
close(mc.C)
})
}
下面的例子是通过加锁和设置关闭标志,在锁中执行通道关闭和对关闭标志赋值,从而保证channel在初始关闭标志状态下只被关闭一次。
type MyChannel1 struct {
C chan int
closed bool
mutex sync.Mutex
}
func NewMyChannel1() *MyChannel {
return &MyChannel{C: make(chan int)}
}
func (mc *MyChannel1) SafeClose() {
mc.mutex.Lock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
mc.mutex.Unlock()
}
func (mc *MyChannel1) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
三、关闭channel的优雅方式
The Channel Closing Principle(通道关闭原则):
- 不要在接收端关闭channel
- 当有多个并发执行的发送端时,不要在发送端关闭channel
- 仅可以在唯一或最后一个发送端中关闭channel
1、一个发送端,多个接收端
当只有一个发送端,那么在需要结束的时候,在发送端直接关闭channel即可,示例如下:
import (
"sync"
"fmt"
"math/rand"
"time"
)
// sender: 1, receivers: M
const (
MaxRandomNumber = 100000
NumReceivers1 = 100
)
var wg sync.WaitGroup
func sender(ch chan int) {
var value int
for {
value = rand.Intn(MaxRandomNumber)
if 0 == value { // 当随机生成的数为0时,关闭通道
fmt.Printf("Closed for number: %d\n", value)
close(ch)
return
} else {
ch <- value
}
}
}
func receiver(ch chan int) {
defer wg.Done()
for value := range ch {
fmt.Println(value)
}
}
func main() {
rand.Seed(time.Now().Unix())
ch := make(chan int, 100)
go sender(ch)
wg.Add(NumReceivers1)
for i:=0;i<NumReceivers1;i++ {
go receiver(ch)
}
wg.Wait()
}
2、多个发送端,一个接收端
当发送端为多个的时候,为了避免多次关闭channel,可以考虑增加一个作为标志的channel,当需要关闭channel的时候,通过关闭标志channel,通知多个发送端结束工作从而停止了发送工作。
换一个角度去想,就标志channel而言,唯一的接收端是它的实际发送者,因此依然遵循通道关闭原则。所以实际上,传输数据的channel并没有显式关闭。示例代码如下:
import (
"math/rand"
"sync"
"fmt"
"time"
)
const (
MaxRandomNumber1 = 100000
NumSender1 = 100
)
var wg1 sync.WaitGroup
func sender1(ch chan int, stopCh chan struct{}) {
var value int
for {
value = rand.Intn(MaxRandomNumber1)
select {
case <-stopCh:
return
case ch <- value:
}
}
}
func receiver1(ch chan int, stopCh chan struct{}) {
defer wg1.Done()
for value := range ch {
if MaxRandomNumber1 - 1 == value {
close(stopCh)
fmt.Printf("From receiver1 closed by number: %d\n", value)
return
}
fmt.Println(value)
}
}
func main() {
rand.Seed(time.Now().Unix())
ch := make(chan int, 100)
stopCh := make(chan struct{})
for i:=0;i<NumSender1;i++ {
go sender1(ch, stopCh)
}
wg1.Add(1)
go receiver1(ch, stopCh)
wg1.Wait()
}
3、多个发送端,多个接收端
当有多个发送端和多个接收端时,无法再套用之前的方法。因为关闭channel的操作只能执行一次,之前当接收端是一个的时候,是把接收端创造成一个发送端,那么现在的情况,就可以考虑重新创造一个“发送端”,来做关闭通道的工作。
发送端和接收端都有终止channel通讯的条件,当随机值为0时发送端终止,当随机值为99999时接收端终止。在没有达到条件的时候,moderator的goroutine中,toStop通道被阻塞,当有发送端或接收端向toStop中传值之后,stopCh通道的关闭接着执行,然后所有发送端和接收端都直接返回。同样,数据通道ch没有被显式关闭。示例代码如下:
import (
"sync"
"math/rand"
"fmt"
"time"
)
const (
MaxRandomNumber2 = 100000
NumSenders = 1000
NumReceivers = 10
)
var (
wg2 sync.WaitGroup
stoppedBy string
)
func moderator(stopCh chan struct{}, toStop chan string) {
stoppedBy = <-toStop
close(stopCh)
}
func senders(id int, stopCh chan struct{}, toStop chan string, ch chan int) {
var value int
for {
value = rand.Intn(MaxRandomNumber2)
if 0 == value {
time.Sleep(2*time.Second) // 这里睡眠两秒是为了让接收端也有一定概率先于发送端向toStop传值,从而更加公平一点
select {
case toStop <- fmt.Sprintf("sender#%d : %d\n", id, value):
default:
}
return
}
select {
case <-stopCh:
return
case ch <- value:
default:
}
}
}
func receivers(id int, stopCh chan struct{}, toStop chan string, ch chan int) {
defer wg2.Done()
for {
select {
case <-stopCh:
return
case value := <-ch:
if MaxRandomNumber2-1 == value {
select {
case toStop <- fmt.Sprintf("receiver#%d : %d\n", id, value):
default:
}
return
}
fmt.Println(value)
default:
}
}
}
func main() {
rand.Seed(time.Now().Unix())
ch := make(chan int, 100)
stopCh := make(chan struct{})
toStop := make(chan string, 1)
go moderator(stopCh, toStop)
for i := 0; i < NumSenders; i++ {
go senders(i, stopCh, toStop, ch)
}
wg2.Add(NumReceivers)
for i := 0; i < NumReceivers; i++ {
go receivers(i, stopCh, toStop, ch)
}
wg2.Wait()
fmt.Printf("Stopped by %s\n", stoppedBy)
}