sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源状态发生变化时,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。
假如有一个协程正在接收数据,其他协程必须等待这个协程接收完数据,才能读取到正确的数据。上述情形下,如果单纯的使用 channel 或者互斥锁,只能有一个协程可以等待,并读取到数据,没办法通知其他协程也读取数据。这个时候怎么办?
1)可以用一个全局变量标识第一个协程是否接收数据完毕,剩下的协程反复检查该变量的值,直到读取到数据。
2)也可创建多个 channel, 每个协程阻塞在一个 Channel 上,由接收数据的协程在数据接收完毕后,挨个通知,类似于msgbus实现。
然后 Go 中其实内置来一个 sync.Cond 来解决这个问题。
使用下面的例子实现了通Cond实现通知协程的流程:
Cond的主要方法有:
1)NewCond(l Locker) *Cond
NewCond 创建实例需要关联一个锁,使用方式为:
cond := sync.NewCond(&sync.Mutex{})
2)Wait()
Wait阻塞当前的 goroutine,等待唤起,在调用Wait前需要Lock,执行完Wait后的逻辑之后需要调用Unlock。
3)Signal()
Signal唤醒一个阻塞的goroutine,执行前不需要调用Lock。
4)Broadcast()
Broadcast唤起所有阻塞的 goroutine,执行前不需要调用Lock。
实现原理数据结构
我们来看下sync.Cond的结构体,它的代码在 /sr/sync/cond.go下:
每个Cond实例都会关联一个锁 L(互斥锁 Mutex,或读写锁 RWMutex),当调用Wait方法时,必须加锁。
Wait方法
Wait方法的实现为:
Wait方法首先调用runtime_notifyListAdd方法将wait索引+1,然后调用runtime_notifyListWait将自己加入到等待队列中,然后释放锁,等待其他协程的唤醒。需要注意的是,Wait的使用方式最好是:
强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait方法如果不加锁,有可能会出现竞态条件。这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。
如下,假设调用Wait方法前没有加锁的话,那么所有协程都会去调用condition方法去判断是否满足条件,然后都通过验证,执行后续操作:
此时会出现的情况为,本来是需要在满足condition方法的前提下,才能执行的操作。现在有可能的效果,为前面一部分协程执行时,还是满足condition条件的;但是后面的协程,尽管不满足condition条件,还是执行了后续操作,可能导致程序出错。
正常的用法应该是,在调用Wait方法前便加锁,只会有一个协程判断是否满足condition条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。
Signal方法
Signal的实现为:
Signal方法notify一个wait的goroutine,获取当前的notify的index,然后遍历队列,根据index找到对应的sudog,唤醒这个sudog,wait等待的goroutine就可以继续执行了。
Broadcast方法
Broadcast方法的实现为:
broadcast方法是唤醒全部wait的goroutine,实现也比较简单,就是直接循环wait队列,全部执行goready,更新notify的index为wait的index,表示全部wait的goroutine都被唤醒了。