锁是编程开发中用于并发控制的一种同步机制,提供多线程(或协程)之间并发读写一个共享数据的方法。在go语言中使用锁也很简单:
var loc sync.Mutex
var rwLoc sync.RWMutex
var idx int
var writeRatio = 3
func Inc(){
loc.Lock()
defer loc.Unlock()
timer := time.NewTimer(100 * time.Millisecond)
select{
case <- timer.C:
idx ++
}
}
func Dec(){
loc.Lock()
defer loc.Unlock()
timer := time.NewTimer(100 * time.Millisecond)
select{
case <- timer.C:
idx --
}
}
func main(){
wg := sync.WaitGroup{}
wg.Add(6)
for i := 0; i < 3; i++ {
go func() {
defer wg.Done()
Inc()
}()
go func(){
defer wg.Done()
Dec()
}()
}
wg.Wait()
fmt.Printf("i: %vn", idx)
}idx读写锁简单地说,读写锁就是一种能保证:
- 并发读操作之间不互斥;
- 并发写操作之间互斥;
- 并发读操作和写操作互斥;
RWMutexRLockRUnlockvar rwLoc sync.RWMutex
var idx int
func ReadRW() {
rwLoc.RLock()
defer rwLoc.RUnlock()
_, _ = fmt.Fprint(ioutil.Discard, idx)
}
func WriteRW() {
rwLoc.Lock()
defer rwLoc.Unlock()
idx = 3
}那么go是怎么实现读写锁的呢,让我们通过源码分析一下它的实现原理。
源码分析在看源码之前我们不妨先思考一下,如果自己实现,需要怎么设计这个数据结构来满足上面那三个要求,然后再参看源码会有更多理解。
首先,为了满足第二点和第三点要求,肯定需要一个互斥锁:
type RWMutex struct{
w Mutex // held if there are pending writers
...
}这个互斥锁是在写操作时使用的:
func (rw *RWMutex) Lock(){
...
rw.w.Lock()
...
}
func (rw *RWMutex) Unlock(){
...
rw.w.Unlock()
...
}而读操作之间是不互斥的,因此读操作的RLock()过程并不获取这个互斥锁。但读写之间是互斥的,那么RLock()如果不获取互斥锁又怎么能阻塞住写操作呢?go语言的实现是这样的:
通过一个int32变量记录当前正在读的goroutine数:
type RWMutex struct{
w Mutex // held if there are pending writers
readerCount int32 // number of pending readers
...
}每次调用Rlock方法时将readerCount加1,对应地,每次调用RUnlock方法时将readerCount减1:
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 如果readerCount小于0则通过同步原语阻塞住,否则将readerCount加1后即返回
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 如果readerCount减1后小于0,则调用rUnlockSlow方法,将这个方法剥离出来是为了RUnlock可以内联,这样能进一步提升读操作时的取锁性能
rw.rUnlockSlow(r)
}
}既然每次RLock时都会将readerCount增加,那判断它是否小于0有什么意义呢?这就需要和写操作的取锁过程Lock()参看:
// 总结一下Lock的流程:1. 阻塞新来的写操作;2. 阻塞新来的读操作;3. 等待之前的读操作完成;
func (rw *RWMutex) Lock() {
// 通过rw.w.Lock阻塞其它写操作
rw.w.Lock()
// 将readerCount减去一个最大数(2的30次方,RWMutex能支持的最大同时读操作数),这样readerCount将变成一个小于0的很小的数,
// 后续再调RLock方法时将会因为readerCount<0而阻塞住,这样也就阻塞住了新来的读请求
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 等待之前的读操作完成
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}写操作获取锁时通过将readerCount改成一个很小的数保证新来的读操作会因为readerCount<0而阻塞住;那之前未完成的读操作怎么处理呢?很简单,只要跟踪写操作Lock之前未完成的reader数就行了,这里通过一个int32变量readerWait来做这件事情:
type RWMutex struct{
w Mutex // held if there are pending writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
...
}每次写操作Lock时会将当前readerCount数量记在readerWait里。
回想一下,当写操作Lock后readerCount会小于0,这时reader unlock时会执行rUnlockSlow方法,现在可以来看它的实现过程了:
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
// 每个reader完成读操作后将readerWait减小1
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 当readerWait为0时代表writer等待的所有reader都已经完成了,可以唤醒writer了
runtime_Semrelease(&rw.writerSem, false, 1)
}
}最后再看写操作的释放锁过程:
func (rw *RWMutex) Unlock() {
// 将readerCount置回原来的值,这样reader又可以进入了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
// 唤醒那些等待的reader
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放互斥锁,这样新的writer可以获得锁
rw.w.Unlock()
}将上面这些过程梳理一下:
- 如果没有writer请求进来,则每个reader开始后只是将readerCount增1,完成后将readerCount减1,整个过程不阻塞,这样就做到“并发读操作之间不互斥”;
- 当有writer请求进来时首先通过互斥锁阻塞住新来的writer,做到“并发写操作之间互斥”;
- 然后将readerCount改成一个很小的值,从而阻塞住新来的reader;
- 记录writer进来之前未完成的reader数量,等待它们都完成后再唤醒writer;这样就做到了“并发读操作和写操作互斥”;
- writer结束后将readerCount置回原来的值,保证新的reader不会被阻塞,然后唤醒之前等待的reader,再将互斥锁释放,使后续writer不会被阻塞。
这就是go语言中读写锁的核心源码(简洁起见,这里将竞态部分的代码省略,TODO:竞态分析原理分析),相信看到这你已经对读写锁的实现原理了然于胸了,如果你感兴趣,不妨一起继续思考这几个问题。
思考r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0回想一下Lock方法:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}r!= 0atomic.AddInt32(&rw.readerWait, r)!=0r!=0readerWait+ratomic.AddInt32(&rw.readerWait, r)!=0r!=0r==0atomic.AddInt32Benchmark最后让我们通过Benchmark看看读写锁的性能提升有多少:
func Read() {
loc.Lock()
defer loc.Unlock()
_, _ = fmt.Fprint(ioutil.Discard, idx)
time.Sleep(1000 * time.Nanosecond)
}
func ReadRW() {
rwLoc.RLock()
defer rwLoc.RUnlock()
_, _ = fmt.Fprint(ioutil.Discard, idx)
time.Sleep(1000 * time.Nanosecond)
}
func Write() {
loc.Lock()
defer loc.Unlock()
idx = 3
time.Sleep(1000 * time.Nanosecond)
}
func WriteRW() {
rwLoc.Lock()
defer rwLoc.Unlock()
idx = 3
time.Sleep(1000 * time.Nanosecond)
}
func BenchmarkLock(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
foo++
if foo % writeRatio == 0 {
Write()
} else {
Read()
}
}
})
}
func BenchmarkRWLock(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
foo++
if foo % writeRatio == 0 {
WriteRW()
} else {
ReadRW()
}
}
})
}go test -bench='Benchmark.*Lock' -run=none mutex_test.go-run=nonecpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkLock
BenchmarkLock-12 235062 5025 ns/op
BenchmarkRWLock
BenchmarkRWLock-12 320209 3834 ns/op可以看出使用读写锁后耗时降低了24%左右。
上面writeRatio用于控制读、写的频率比例,即读:写=3,随着这个比例越高耗时降低的比例也越大,这里作个简单比较:
| writeRatio | 3 | 10 | 20 | 50 | 100 | 1000 |
|---|---|---|---|---|---|---|
| 耗时降低 | 24% | 71.3% | 83.7% | 90.9% | 93.5% | 95.7% |
可以看出当读的比例越高时,使用读写锁获得的性能提升比例越高。