golang 中用sync.RWMutex实现了读写锁的协程间同步。

主要的数据结构如下。w是互斥锁,readerCount记录正在读取操作的协程数量同时也实现读锁。互斥锁w和readerCount共同实现了写锁。readerWait用来读锁都释放后唤醒写锁的记录数。writerSem,readerSem分别是等待读完成和写完成的信号量。

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}

读加锁:

    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
	    // A writer is pending, wait for it.
	    runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }

这是读加锁的主要代码,通过atomic.AddInt32 的原子操作给rw.readerCount加一,如果rw.readerCount结果不为零,获得读锁。如果为零,此时已经有协程加上了写锁。协程调取函数runtime_SemacquireMutex睡眠。

读解锁:

    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
	    // Outlined slow-path to allow the fast-path to be inlined
	    rw.rUnlockSlow(r)
    }

读解锁也是通过atomic.AddInt32 的原子操作解锁。解锁后判断r,如果小于零则可能有写锁在等待。

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}
rw.readerWait记录了在获取写锁时,存在多少读锁。归零后就可以唤醒等待的写锁。

写加锁:

	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}

写加锁分两步,首先是加互斥锁。其他加写锁的需要等待。然后给rw.readerCount 减去 rwmutexMaxReaders。rwmutexMaxReaders的值为1 << 30,一个很大的值。可以理解为readerCount永远不可能大于rwmutexMaxReaders,所以readerCount小于零。所以加读锁的时候,判断readerCount是否小于零来检查是否有写锁。

写锁加上之后还需要等待正在读的协程读完。r的值就是正在读的协程数量,加到readerWait上,等待readerWait为零后,唤醒写锁。

写解锁:

	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()

首先,写解锁把readerCount加上rwmutexMaxReaders,释放读锁。然后唤醒所有的正在睡眠的读协程。然后释放互斥锁。

 

讨论:会不会出现读饥饿和写饥饿的情况。

会不会出现不停的有读获取到锁,从而导致写永远获取不到锁的状况。或者反之。

不会出现读饥饿,写饥饿情况。

写锁的获取,主要是等待rw.readerWait归零,rw.readerWait在有写锁在等待的时候不会再增加。此时新的读锁,由于写锁已经对readerCount进行了减rwmutexMaxReaders操作,会进入睡眠等待写锁唤醒,不会和写有竞争关系。所以写锁不会出现饥饿。

写锁的释放,首先释放读锁。然后唤醒所有读等待,测试读也可以获取到读锁,也不会出现读饥饿。

 

runtime_Semrelease 和 runtime_SemacquireMutex:

这两个函数是同步中关键的函数,网上也没找到相关介绍。根据他们的注释和操作有一个简单地猜测

runtime_SemacquireMutex (func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) )函数,作用是阻塞当前协程进入睡眠状态,然后加入到睡眠队列。注释上说,这个函数和func runtime_Semacquire(s *uint32)函数差不多,会对s减一,每次调用s的值都会改变,可能s的值就是对协程的映射,有可能就是个计数。lifo为true的时候,函数会把协程放到等待队列的头部。

runtime_Semrelease (func runtime_Semrelease(s *uint32, handoff bool, skipframes int))函数,是唤醒等待队列中第一个协程。有可能是根据s的值,唤醒映射的协程,也可能是就唤醒头部协程。