Channel
-
从根本上来说,channel只是一个数据结构,可以被写入数据,也可以被读取数据
- 所谓的发送数据到channel,或者从channel读取数据,说白了就是对一个数据结构的操作
- 因为协程原则上不会出现多线程编程中经常遇到的资源竞争问题,所以这个channel的数据结
构甚至在访问的时候都不用加锁- 因为Go语言支持多CPU核心并发执行多个goroutine,会造成资
源竞争,所以在必要的位置还是需要加锁的
- 因为Go语言支持多CPU核心并发执行多个goroutine,会造成资
-
通道可以传输 int, string, 结构体,甚至是函数
-
通道传递是拷贝值
- 对于大数据类型,可以传递指针以避免大量拷贝
- 注意此时的并发安全,即多个goroutine通过指针对原始值的并发操作
- 此时需要额外的同步操作(例如锁)来避免竞争
- 对于大数据类型,可以传递指针以避免大量拷贝
-
channel可以用来无锁编程,但是channel本身底层还是通过加锁实现的
-
channel是面向同一个进程的
- 多个进程之间内存一般不会共享,所以没法用channel
- 进程间通讯可以考虑IPC方法,比如有名管道(
os.Pipe 函数),还有强大和灵活的的socket - 如果通道要给外部使用,或者通过通道对外提供功能,那就不要传指针值了,容易造成安全漏洞
-
ch := make(chan bool) - 无缓存的channel是同步的,阻塞式的
- 必须等待两边都准备好才开始传递数据,否则堵塞
- 使用不当容易引发死锁
- 无缓存的channel是同步的,阻塞式的
-
ch := make(chan int, 10) - 有缓存的channel是异步的,只有当缓冲区写满时才堵塞
-
通道的关闭
- channel 使用完后不关闭也没有关系
- 因为channel 没有被任何协程用到后会被自动回收
- 显示关闭 channel 一般是用来通知其他协程某个任务已经完成了
- 关闭一个已经关闭的channel,或者往一个已经关闭的channel写入,都会panic
- 应该是生产者关闭通道,而不是消费者
- 否则生产者可能在channel关闭后写入,导致panic
- 消费者在超时后应该通过channel向生产者发送完成消息,让生产者关闭channel并返回
- 已经被关闭的通道不能往里面写入,但可以接受数据
- 读取一个已经关闭且没有数据的通道会立刻返回一个零值
- 读取时通过判断第二个返回值也可以判读收到的值是否有效
- channel 使用完后不关闭也没有关系
-
单向通道
- 多用于函数的参数, 提高安全性
- 只能写入:
var send chan<- int - 只能读取:
var recv <-chan int
-
使用for-range循环读取channel
- 从指定通道中读取数据直到通道关闭(close)
- 如果生产者忘记关闭通道,则消费者会一直堵塞在for-range循环中
- 如果
ch 值为nil ,则会那么这条for语句就会被永远地阻塞在有for关键字的那一行
例子
- 用channel进行同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | ype Stream struct { // some fields cc chan struct{} } func (s *Stream) Wait() error { <-s.cc // some code } func (s *Stream) Close() { // some code close(s.cc) } func (s *Stream) IsClosed() bool { select { case <-s.cc: return true default: return false } } |
- 用channel限制速度
1 2 3 4 5 6 | limiter := time.Tick(time.Millisecond * 200) // 每 200ms 执行一次请求 for req := range requests { <-limiter ... } |
- 临时速度限制
- 就是说只限制前几次请求,而不限制后续请求速度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | const N = 3 // 限制前三次请求 limiter := make(chan struct{}, N) // 先把通道堵塞 for i := 0; i < N; i++ { limiter <- struct{} } // 前三次请求限制为200毫秒一次 go func() { for t := range time.Tick(time.Millisecond * 200) { limiter <- struct{} } }() for req := range { <-limiter // 业务逻辑 } |
- channel作为函数返回值
- 一般作为生产者,另起一个goroutine并发生产,返回channel用于消费
- 注意点在于使用defer把goroutine的生命周期封装在生产函数中
- 目的在于避免写入nil或者多次关闭channel
- 消费者只需要处理阻塞和零值
- 生产者负责在生产完毕后关闭channel
1 2 3 4 5 6 7 8 9 10 11 | func producer(num int) <-chan { ch := make(chan int, num) go func() { defer close(ch) // 重要! for i:=0; i< num; i++ { ch <- i } }() return ch } |
- 实现信号量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | var wg sync.WaitGroup sem := make(chan struct{}, 5) // 最多并发5个 for i := 0; i < 100; i++ { wag.Add(1) go func(id int) { return wg.Done() sem <- struct{} // 获取信号量 defer func(){ <-sem // 释放信号量 }() // 业务逻辑 ... }(i) } wg.Wait() |
Select
-
select语句是专为通道而设计的,所以每个case表达式中都只能包含操作通道的表达式
-
多个channel准备好时,随机选一个执行
-
select语句包含的候选分支中的case表达式都会在该语句执行开始时先被求值
- 求值的顺序是依从代码编写的顺序从上到下
- 如果表达式在被求值时,相应的操作正处于阻塞状态,那么对该case表达式的求值就是不成功的,即这个case表达式所在的候选分支是不满足选择条件的
-
select 默认阻塞,只有监听的channel中有发送或者接受数据时才运行
- 设置default则不阻塞,通道内没有待接受的数据则执行default
- 如果不加default,则会有死锁风险
-
select语句只能对其中的每一个case表达式各求值一次,如果我们想连续或定时地操作其中的通道的话,就需要通过在for语句中嵌入select语句的方式实现
- 注意简单地在select语句的分支中使用break语句,只能结束当前的select语句的执行,而并不会对外层的for语句产生作用
- 这种错误的用法可能会让这个for语句无休止地运行下去
-
利用channel+select来广播退出信息
- 每个子goroutine利用select监听done通道
- 当主程序想要关闭子goroutine时,可以关闭done通道
- 此时select会立刻监听到nil消息,子goroutine可以以此退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | func Generate(done chan bool) chan int { ch := make(chan int) go func() { defer close(ch) for { select{ case ch <- rand.Int(): ... case <- done: // 接受到通知并退出 return } } }() return ch } done := make(chan bool) ch := Generate(done) fmt.Println(<-ch) // 消费 close(done) //通过关闭通道来发送通知 |
- 如果在select语句中发现某个通道已关闭,那么应该怎样屏蔽掉它所在的分支?
- 发现某个channel被关闭后,为了防止再次进入这个分支,可以把这个channel重新赋值为nil,这样这个case就一直被阻塞了
1 2 3 4 5 6 7 8 9 10 11 12 | for { select { case _, ok := <-ch1: if !ok { ch1 = nil } case ... : ... default: ... } } |
- 单个case的化简写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | // bad select { case <-ch: } // good <-ch // bad for { select { case x := <-ch: _ = x } } //good for x := range ch { ... } |