golang 中用sync.RWMutex实现了读写锁的协程间同步。
主要的数据结构如下。w是互斥锁,readerCount记录正在读取操作的协程数量同时也实现读锁。互斥锁w和readerCount共同实现了写锁。readerWait用来读锁都释放后唤醒写锁的记录数。writerSem,readerSem分别是等待读完成和写完成的信号量。
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
读加锁:
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
这是读加锁的主要代码,通过atomic.AddInt32 的原子操作给rw.readerCount加一,如果rw.readerCount结果不为零,获得读锁。如果为零,此时已经有协程加上了写锁。协程调取函数runtime_SemacquireMutex睡眠。
读解锁:
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
读解锁也是通过atomic.AddInt32 的原子操作解锁。解锁后判断r,如果小于零则可能有写锁在等待。
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
rw.readerWait记录了在获取写锁时,存在多少读锁。归零后就可以唤醒等待的写锁。
写加锁:
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
写加锁分两步,首先是加互斥锁。其他加写锁的需要等待。然后给rw.readerCount 减去 rwmutexMaxReaders。rwmutexMaxReaders的值为1 << 30,一个很大的值。可以理解为readerCount永远不可能大于rwmutexMaxReaders,所以readerCount小于零。所以加读锁的时候,判断readerCount是否小于零来检查是否有写锁。
写锁加上之后还需要等待正在读的协程读完。r的值就是正在读的协程数量,加到readerWait上,等待readerWait为零后,唤醒写锁。
写解锁:
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
首先,写解锁把readerCount加上rwmutexMaxReaders,释放读锁。然后唤醒所有的正在睡眠的读协程。然后释放互斥锁。
讨论:会不会出现读饥饿和写饥饿的情况。
会不会出现不停的有读获取到锁,从而导致写永远获取不到锁的状况。或者反之。
不会出现读饥饿,写饥饿情况。
写锁的获取,主要是等待rw.readerWait归零,rw.readerWait在有写锁在等待的时候不会再增加。此时新的读锁,由于写锁已经对readerCount进行了减rwmutexMaxReaders操作,会进入睡眠等待写锁唤醒,不会和写有竞争关系。所以写锁不会出现饥饿。
写锁的释放,首先释放读锁。然后唤醒所有读等待,测试读也可以获取到读锁,也不会出现读饥饿。
runtime_Semrelease 和 runtime_SemacquireMutex:
这两个函数是同步中关键的函数,网上也没找到相关介绍。根据他们的注释和操作有一个简单地猜测
runtime_SemacquireMutex (func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) )函数,作用是阻塞当前协程进入睡眠状态,然后加入到睡眠队列。注释上说,这个函数和func runtime_Semacquire(s *uint32)函数差不多,会对s减一,每次调用s的值都会改变,可能s的值就是对协程的映射,有可能就是个计数。lifo为true的时候,函数会把协程放到等待队列的头部。
runtime_Semrelease (func runtime_Semrelease(s *uint32, handoff bool, skipframes int))函数,是唤醒等待队列中第一个协程。有可能是根据s的值,唤醒映射的协程,也可能是就唤醒头部协程。