Golang的sync包中的Cond实现了一种条件变量,可以使用在多个Reader等待共享资源ready的场景(如果只有一读一写,一个锁或者channel就搞定了)。
Cond的汇合点:多个goroutines等待、1个goroutine通知事件发生。
每个Cond都会关联一个Lock(*sync.Mutex or *sync.RWMutex),当修改条件或者调用Wait方法时,必须加锁,保护condition。
type Cond struct {
// L is held while observing or changing the condition
L Locker
// contains filtered or unexported fields
}
NewCond
func NewCond(l Locker) *Cond
新建一个Cond条件变量。
Broadcast
func (c *Cond) Broadcast()
Broadcast会唤醒所有等待c的goroutine。
调用Broadcast的时候,可以加锁,也可以不加锁。
Signal
func (c *Cond) Signal()
Signal只唤醒1个等待c的goroutine。
调用Signal的时候,可以加锁,也可以不加锁。
Wait
func (c *Cond) Wait()
Wait()c.LWait()c.L
Wait()
Wait()C.L
取而代之的是, 调用者应该在循环中调用Wait。(简单来说,只要想使用condition,就必须加锁。)
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
举个例子
下面这个例子,可以比较好的说明Cond的使用方法。
package main
import (
"fmt"
"sync"
"time"
)
var sharedRsc = false
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for sharedRsc == false {
fmt.Println("goroutine1 wait")
c.Wait()
}
fmt.Println("goroutine1", sharedRsc)
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for sharedRsc == false {
fmt.Println("goroutine2 wait")
c.Wait()
}
fmt.Println("goroutine2", sharedRsc)
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
time.Sleep(2 * time.Second)
c.L.Lock()
fmt.Println("main goroutine ready")
sharedRsc = true
c.Broadcast()
fmt.Println("main goroutine broadcast")
c.L.Unlock()
wg.Wait()
}
执行结果如下。
goroutine1 wait
goroutine2 wait
main goroutine ready
main goroutine broadcast
goroutine2 true
goroutine1 true
wg.Done()
修改1
我们做个修改,删除main goroutine中的2s延时。
代码就不贴了。
执行结果如下。
main goroutine ready
main goroutine broadcast
goroutine2 true
goroutine1 true
很有意思,两个goroutine都没有进入Wait状态。
原因是,main goroutine执行的更快,在goroutine1/goroutine2加锁之前就已经获得了锁,并完成了修改sharedRsc、发出Broadcast信号。
当子goroutine调用Wait之前检验condition时,条件已经满足,因此就没有必要再去调用Wait了。
修改2
如果我们在子goroutine中不做校验呢?
我们会得到1个死锁。
main goroutine ready
main goroutine broadcast
goroutine2 wait
goroutine1 true
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x414028, 0x19)
/usr/local/go/src/runtime/sema.go:56 +0x40
sync.(*WaitGroup).Wait(0x414020, 0x40c108)
/usr/local/go/src/sync/waitgroup.go:130 +0x60
main.main()
/tmp/sandbox947808816/prog.go:44 +0x2c0
goroutine 6 [sync.Cond.Wait]:
runtime.goparkunlock(...)
/usr/local/go/src/runtime/proc.go:307
sync.runtime_notifyListWait(0x43e268, 0x0)
/usr/local/go/src/runtime/sema.go:510 +0x120
sync.(*Cond).Wait(0x43e260, 0x40c108)
/usr/local/go/src/sync/cond.go:56 +0xe0
main.main.func2(0x43e260, 0x414020)
/tmp/sandbox947808816/prog.go:31 +0xc0
created by main.main
/tmp/sandbox947808816/prog.go:27 +0x140
为什么呢?
main goroutine(goroutine 1)先执行,并停留在 wg.Wait()中,等待子goroutine的wg.Done();而子goroutine(goroutine 6)没有判断条件直接调用了cond.Wait。
我们知道cond.Wait会释放锁并等待其他goroutine调用Broadcast或者Signal来通知其恢复执行,除此之外没有其他的恢复途径。但此时main goroutine已经调用了Broadcast并进入了等待状态,没有任何goroutine会去拯救还在cond.Wait中的子goroutine了,而该子goroutine也没有机会调用wg.Done()去恢复main goroutine,造成了死锁。
因此,一定要注意,Broadcast必须要在所有的Wait之后(当然了,可以通过条件判断来决定要不要进Wait)。
一个真实的例子
我们来看看k8s中使用Cond实现的FIFO,它是如何处理条件的消费的。
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, FIFOClosedError
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
...
}
}
func NewFIFO(keyFunc KeyFunc) *FIFO {
f := &FIFO{
items: map[string]interface{}{},
queue: []string{},
keyFunc: keyFunc,
}
f.cond.L = &f.lock
return f
}
f.lock.Lock()f.cond.Wait()len(f.queue)
f.lockf.queuef.queue[0]
Ref: