Channel

  • 从根本上来说,channel只是一个数据结构,可以被写入数据,也可以被读取数据

    • 所谓的发送数据到channel,或者从channel读取数据,说白了就是对一个数据结构的操作
    • 因为协程原则上不会出现多线程编程中经常遇到的资源竞争问题,所以这个channel的数据结
      构甚至在访问的时候都不用加锁
      • 因为Go语言支持多CPU核心并发执行多个goroutine,会造成资
        源竞争,所以在必要的位置还是需要加锁的
  • 通道可以传输 int, string, 结构体,甚至是函数

  • 通道传递是拷贝值

    • 对于大数据类型,可以传递指针以避免大量拷贝
      • 注意此时的并发安全,即多个goroutine通过指针对原始值的并发操作
      • 此时需要额外的同步操作(例如锁)来避免竞争
  • channel可以用来无锁编程,但是channel本身底层还是通过加锁实现的

  • channel是面向同一个进程的

    • 多个进程之间内存一般不会共享,所以没法用channel
    • 进程间通讯可以考虑IPC方法,比如有名管道(os.Pipe函数),还有强大和灵活的的socket
    • 如果通道要给外部使用,或者通过通道对外提供功能,那就不要传指针值了,容易造成安全漏洞
  • ch := make(chan bool)

    • 无缓存的channel是同步的,阻塞式的
      • 必须等待两边都准备好才开始传递数据,否则堵塞
    • 使用不当容易引发死锁
  • ch := make(chan int, 10)

    • 有缓存的channel是异步的,只有当缓冲区写满时才堵塞
  • 通道的关闭

    • channel 使用完后不关闭也没有关系
      • 因为channel 没有被任何协程用到后会被自动回收
      • 显示关闭 channel 一般是用来通知其他协程某个任务已经完成了
    • 关闭一个已经关闭的channel,或者往一个已经关闭的channel写入,都会panic
    • 应该是生产者关闭通道,而不是消费者
      • 否则生产者可能在channel关闭后写入,导致panic
      • 消费者在超时后应该通过channel向生产者发送完成消息,让生产者关闭channel并返回
    • 已经被关闭的通道不能往里面写入,但可以接受数据
      • 读取一个已经关闭且没有数据的通道会立刻返回一个零值
      • 读取时通过判断第二个返回值也可以判读收到的值是否有效
  • 单向通道

    • 多用于函数的参数, 提高安全性
    • 只能写入:var send chan<- int
    • 只能读取:var recv <-chan int
  • 使用for-range循环读取channel

    • 从指定通道中读取数据直到通道关闭(close)
    • 如果生产者忘记关闭通道,则消费者会一直堵塞在for-range循环中
    • 如果ch值为nil,则会那么这条for语句就会被永远地阻塞在有for关键字的那一行

例子

  • 用channel进行同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ype Stream struct {
    // some fields
    cc chan struct{}
}

func (s *Stream) Wait() error {
    <-s.cc
    // some code
}
func (s *Stream) Close() {
    // some code
    close(s.cc)
}
func (s *Stream) IsClosed() bool {
    select {
    case <-s.cc:
        return true
    default:
        return false
    }
}
  • 用channel限制速度
1
2
3
4
5
6
limiter := time.Tick(time.Millisecond * 200)
// 每 200ms 执行一次请求
for req := range requests {
    <-limiter
    ...
}
  • 临时速度限制
    • 就是说只限制前几次请求,而不限制后续请求速度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const N = 3 // 限制前三次请求
limiter := make(chan struct{}, N)
// 先把通道堵塞
for i := 0; i < N; i++ {
    limiter <- struct{}
}
// 前三次请求限制为200毫秒一次
go func() {
    for t := range time.Tick(time.Millisecond * 200) {
        limiter <- struct{}
    }
}()

for req := range {
    <-limiter
    // 业务逻辑
}
  • channel作为函数返回值
    • 一般作为生产者,另起一个goroutine并发生产,返回channel用于消费
    • 注意点在于使用defer把goroutine的生命周期封装在生产函数中
      • 目的在于避免写入nil或者多次关闭channel
      • 消费者只需要处理阻塞和零值
      • 生产者负责在生产完毕后关闭channel
1
2
3
4
5
6
7
8
9
10
11
func producer(num int) <-chan {
    ch := make(chan int, num)
    go func() {
        defer close(ch) // 重要!
        for i:=0; i< num; i++ {
            ch <- i
        }
    }()

    return ch
}
  • 实现信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var wg sync.WaitGroup

sem := make(chan struct{}, 5) // 最多并发5个
for i := 0; i < 100; i++ {
    wag.Add(1)
    go func(id int) {
        return wg.Done()
        sem <- struct{} // 获取信号量
        defer func(){
            <-sem // 释放信号量
        }()
       
        // 业务逻辑
        ...
    }(i)
}

wg.Wait()

Select

  • select语句是专为通道而设计的,所以每个case表达式中都只能包含操作通道的表达式

  • 多个channel准备好时,随机选一个执行

  • select语句包含的候选分支中的case表达式都会在该语句执行开始时先被求值

    • 求值的顺序是依从代码编写的顺序从上到下
    • 如果表达式在被求值时,相应的操作正处于阻塞状态,那么对该case表达式的求值就是不成功的,即这个case表达式所在的候选分支是不满足选择条件的
  • select 默认阻塞,只有监听的channel中有发送或者接受数据时才运行

    • 设置default则不阻塞,通道内没有待接受的数据则执行default
    • 如果不加default,则会有死锁风险
  • select语句只能对其中的每一个case表达式各求值一次,如果我们想连续或定时地操作其中的通道的话,就需要通过在for语句中嵌入select语句的方式实现

    • 注意简单地在select语句的分支中使用break语句,只能结束当前的select语句的执行,而并不会对外层的for语句产生作用
    • 这种错误的用法可能会让这个for语句无休止地运行下去
  • 利用channel+select来广播退出信息

    • 每个子goroutine利用select监听done通道
    • 当主程序想要关闭子goroutine时,可以关闭done通道
    • 此时select会立刻监听到nil消息,子goroutine可以以此退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Generate(done chan bool) chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for {
            select{
            case ch <- rand.Int():
                ...
            case <- done: // 接受到通知并退出
                return
            }
        }  
    }()
   
    return ch
}

done := make(chan bool)
ch := Generate(done)
fmt.Println(<-ch) // 消费
close(done) //通过关闭通道来发送通知
  • 如果在select语句中发现某个通道已关闭,那么应该怎样屏蔽掉它所在的分支?
    • 发现某个channel被关闭后,为了防止再次进入这个分支,可以把这个channel重新赋值为nil,这样这个case就一直被阻塞了
1
2
3
4
5
6
7
8
9
10
11
12
for {
select {
    case _, ok := <-ch1:
        if !ok {
            ch1 = nil
        }
    case ... :
    ...
    default:
    ...
    }
}
  • 单个case的化简写法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// bad
select {
    case <-ch:
}
// good
<-ch

// bad
for {
    select {
    case x := <-ch:
        _ = x
    }
}

//good
for x := range ch {
   ...
}