Cond

sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源状态发生变化时,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。

假如有一个协程正在接收数据,其他协程必须等待这个协程接收完数据,才能读取到正确的数据。上述情形下,如果单纯的使用 channel 或者互斥锁,只能有一个协程可以等待,并读取到数据,没办法通知其他协程也读取数据。这个时候怎么办?

1)可以用一个全局变量标识第一个协程是否接收数据完毕,剩下的协程反复检查该变量的值,直到读取到数据。

2)也可创建多个 channel, 每个协程阻塞在一个 Channel 上,由接收数据的协程在数据接收完毕后,挨个通知,类似于msgbus实现。

然后 Go 中其实内置来一个 sync.Cond 来解决这个问题。

使用

下面的例子实现了通Cond实现通知协程的流程:

Cond的主要方法有:

1)NewCond(l Locker) *Cond

NewCond 创建实例需要关联一个锁,使用方式为:

cond := sync.NewCond(&sync.Mutex{})

2)Wait()

Wait阻塞当前的 goroutine,等待唤起,在调用Wait前需要Lock,执行完Wait后的逻辑之后需要调用Unlock。

3)Signal()

Signal唤醒一个阻塞的goroutine,执行前不需要调用Lock。

4)Broadcast()

Broadcast唤起所有阻塞的 goroutine,执行前不需要调用Lock。

实现原理

数据结构

我们来看下sync.Cond的结构体,它的代码在 /sr/sync/cond.go下:

每个Cond实例都会关联一个锁 L(互斥锁 Mutex,或读写锁 RWMutex),当调用Wait方法时,必须加锁。

Wait方法

Wait方法的实现为:

Wait方法首先调用runtime_notifyListAdd方法将wait索引+1,然后调用runtime_notifyListWait将自己加入到等待队列中,然后释放锁,等待其他协程的唤醒。需要注意的是,Wait的使用方式最好是:

强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait方法如果不加锁,有可能会出现竞态条件。这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。

如下,假设调用Wait方法前没有加锁的话,那么所有协程都会去调用condition方法去判断是否满足条件,然后都通过验证,执行后续操作:

此时会出现的情况为,本来是需要在满足condition方法的前提下,才能执行的操作。现在有可能的效果,为前面一部分协程执行时,还是满足condition条件的;但是后面的协程,尽管不满足condition条件,还是执行了后续操作,可能导致程序出错。

正常的用法应该是,在调用Wait方法前便加锁,只会有一个协程判断是否满足condition条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。

Signal方法

Signal的实现为:

Signal方法notify一个wait的goroutine,获取当前的notify的index,然后遍历队列,根据index找到对应的sudog,唤醒这个sudog,wait等待的goroutine就可以继续执行了。

Broadcast方法

Broadcast方法的实现为:

broadcast方法是唤醒全部wait的goroutine,实现也比较简单,就是直接循环wait队列,全部执行goready,更新notify的index为wait的index,表示全部wait的goroutine都被唤醒了。