• 有关channel的问题
  • 保证channel关闭一次的方式
  • 关闭channel的优雅方式

一、有关channel的问题

Golang面试有关channel的问题肯可能会问到:
Q:如何关闭一个channel
A:close()
Q:怎么判断一个channel已经关闭了
A:(1) value, ok := <- ch // ok为false,则通道已经关闭; (2) <-ch // 如果channel关闭,这里就不会阻塞,而这句代码可以用在select中或直接在执行代码中作为阻塞机制。

而在实际工作中,我也曾遇到过一个项目的Bug,根因就是关闭了已经关闭的channel,而出现了panic。
于是应该考虑使用一种机制,当多个goroutine使用一个channel进行发送或接收工作的时候,保证在goroutine中只执行一次关闭channel的操作。

二、保证channel关闭一次的方式

当以多个goroutine的方式执行下面的两个函数,显然在第一个SafeClose()的goroutine执行便会关闭通道ch,而第二个SafeClose()的goroutine执行便会出现panic。于是这里通过recover()的方式来避免panic。
简单来说,就是忽视关闭已经关闭的channel而产生的panic。显然,这不是一种好的方法。

func SafeSend(ch chan int, value int) (closed bool) {
	defer func() {
		if recover() != nil {
			closed = true
		}
	}()

	ch <- value
	return false
}

func SafeClose(ch chan int) (justClosed bool) {
	defer func() {
		if recover() != nil {
			justClosed = false
		}
	}()

	close(ch)
	return true
}

下面的方法是通过使用到sync.Once,来保证关闭channel的操作只执行一次。使用的方式没什么毛病,在诸多Golang开源项目中,大多使用sync.Once来关闭channel。

type MyChannel struct {
	C    chan int
	once sync.Once
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan int)}
}

func (mc *MyChannel) SafeClose() {
	mc.once.Do(func(){
		close(mc.C)
	})
}

下面的例子是通过加锁和设置关闭标志,在锁中执行通道关闭和对关闭标志赋值,从而保证channel在初始关闭标志状态下只被关闭一次。

type MyChannel1 struct {
	C      chan int
	closed bool
	mutex  sync.Mutex
}

func NewMyChannel1() *MyChannel {
	return &MyChannel{C: make(chan int)}
}

func (mc *MyChannel1) SafeClose() {
	mc.mutex.Lock()
	if !mc.closed {
		close(mc.C)
		mc.closed = true
	}
	mc.mutex.Unlock()
}

func (mc *MyChannel1) IsClosed() bool {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	return mc.closed
}

三、关闭channel的优雅方式

The Channel Closing Principle(通道关闭原则)

  • 不要在接收端关闭channel
  • 当有多个并发执行的发送端时,不要在发送端关闭channel
  • 仅可以在唯一或最后一个发送端中关闭channel
1、一个发送端,多个接收端

当只有一个发送端,那么在需要结束的时候,在发送端直接关闭channel即可,示例如下:

import (
	"sync"
	"fmt"
	"math/rand"
	"time"
)

// sender: 1, receivers: M
const (
	MaxRandomNumber = 100000
	NumReceivers1 = 100
)

var wg sync.WaitGroup

func sender(ch chan int) {
	var value int
	for {
		value = rand.Intn(MaxRandomNumber)
		if 0 == value { // 当随机生成的数为0时,关闭通道
			fmt.Printf("Closed for number: %d\n", value)
			close(ch)
			return
		} else {
			ch <- value
		}
	}
}

func receiver(ch chan int) {
	defer wg.Done()

	for value := range ch {
		fmt.Println(value)
	}
}

func main() {
	rand.Seed(time.Now().Unix())

	ch := make(chan int, 100)

	go sender(ch)

	wg.Add(NumReceivers1)
	for i:=0;i<NumReceivers1;i++ {
		go receiver(ch)
	}
	wg.Wait()
}
2、多个发送端,一个接收端

当发送端为多个的时候,为了避免多次关闭channel,可以考虑增加一个作为标志的channel,当需要关闭channel的时候,通过关闭标志channel,通知多个发送端结束工作从而停止了发送工作。
换一个角度去想,就标志channel而言,唯一的接收端是它的实际发送者,因此依然遵循通道关闭原则。所以实际上,传输数据的channel并没有显式关闭。示例代码如下:

import (
	"math/rand"
	"sync"
	"fmt"
	"time"
)

const (
	MaxRandomNumber1 = 100000
	NumSender1 = 100
)

var wg1 sync.WaitGroup

func sender1(ch chan int, stopCh chan struct{}) {
	var value int
	for {
		value = rand.Intn(MaxRandomNumber1)

		select {
		case <-stopCh:
			return
		case ch <- value:

		}
	}
}

func receiver1(ch chan int, stopCh chan struct{}) {
	defer wg1.Done()
	for value := range ch {
		if MaxRandomNumber1 - 1 == value {
			close(stopCh)
			fmt.Printf("From receiver1 closed by number: %d\n", value)
			return
		}

		fmt.Println(value)
	}
}

func main() {
	rand.Seed(time.Now().Unix())

	ch := make(chan int, 100)
	stopCh := make(chan struct{})

	for i:=0;i<NumSender1;i++ {
		go sender1(ch, stopCh)
	}

	wg1.Add(1)
	go receiver1(ch, stopCh)
	wg1.Wait()
}
3、多个发送端,多个接收端

当有多个发送端和多个接收端时,无法再套用之前的方法。因为关闭channel的操作只能执行一次,之前当接收端是一个的时候,是把接收端创造成一个发送端,那么现在的情况,就可以考虑重新创造一个“发送端”,来做关闭通道的工作。
发送端和接收端都有终止channel通讯的条件,当随机值为0时发送端终止,当随机值为99999时接收端终止。在没有达到条件的时候,moderator的goroutine中,toStop通道被阻塞,当有发送端或接收端向toStop中传值之后,stopCh通道的关闭接着执行,然后所有发送端和接收端都直接返回。同样,数据通道ch没有被显式关闭。示例代码如下:

import (
	"sync"
	"math/rand"
	"fmt"
	"time"
)

const (
	MaxRandomNumber2 = 100000
	NumSenders       = 1000
	NumReceivers     = 10
)

var (
	wg2       sync.WaitGroup
	stoppedBy string
)

func moderator(stopCh chan struct{}, toStop chan string) {
	stoppedBy = <-toStop
	close(stopCh)
}

func senders(id int, stopCh chan struct{}, toStop chan string, ch chan int) {
	var value int
	for {
		value = rand.Intn(MaxRandomNumber2)
		if 0 == value {
			time.Sleep(2*time.Second) // 这里睡眠两秒是为了让接收端也有一定概率先于发送端向toStop传值,从而更加公平一点
			select {
			case toStop <- fmt.Sprintf("sender#%d : %d\n", id, value):
			default:
			}
			return
		}

		select {
		case <-stopCh:
			return
		case ch <- value:
		default:
		}
	}
}

func receivers(id int, stopCh chan struct{}, toStop chan string, ch chan int) {
	defer wg2.Done()

	for {
		select {
		case <-stopCh:
			return

		case value := <-ch:
			if MaxRandomNumber2-1 == value {
				select {
				case toStop <- fmt.Sprintf("receiver#%d : %d\n", id, value):
				default:
				}
				return
			}
			fmt.Println(value)
		default:
		}
	}
}

func main() {
	rand.Seed(time.Now().Unix())

	ch := make(chan int, 100)
	stopCh := make(chan struct{})
	toStop := make(chan string, 1)

	go moderator(stopCh, toStop)

	for i := 0; i < NumSenders; i++ {
		go senders(i, stopCh, toStop, ch)
	}

	wg2.Add(NumReceivers)
	for i := 0; i < NumReceivers; i++ {
		go receivers(i, stopCh, toStop, ch)
	}
	wg2.Wait()

	fmt.Printf("Stopped by %s\n", stoppedBy)
}