前言

asong

Golang版本:1.118

读写锁简介

goroutinegoroutinegoroutine

为什么要有读锁

有些朋友可能会有疑惑,为什么要有读锁,读操作又不会修改数据,多线程同时读取相同的资源就是安全的,为什么还要加一个读锁呢?

Golangintcount++count++countcount1countcount++
package main

import "sync"

const maxValue = 3

type test struct {
 rw sync.RWMutex
 index int
}

func (t *test) Get() int {
 return t.index
}

func (t *test)Set() {
 t.rw.Lock()
 t.index++
 if t.index >= maxValue{
  t.index =0
 }
 t.rw.Unlock()
}

func main()  {
 t := test{}
 sw := sync.WaitGroup{}
 for i:=0; i < 100000; i++{
  sw.Add(2)
  go func() {
   t.Set()
   sw.Done()
  }()
  go func() {
   val := t.Get()
   if val >= maxValue{
    print("get value error| value=", val, "\n")
   }
   sw.Done()
  }()
 }
 sw.Wait()
}

运行结果:

get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
.....

每次运行结果都是不固定的,因为我们没有加读锁,如果允许同时读和写,读取到的数据有可能就是中间状态,所以我们可以总结出来读锁是很有必要的,读锁可以防止读到写中间的值。

读写锁的插队策略

Go
Go
goroutineG1G2G3G4G5G1G2G3goroutine
G4goroutineG4G5G3G5
G1G2G3G3
G3goroutineG4G5

读写锁的实现

RWMutex
type RWMutex struct {
 w           Mutex  // held if there are pending writers
 writerSem   uint32 // semaphore for writers to wait for completing readers
 readerSem   uint32 // semaphore for readers to wait for completing writers
 readerCount int32  // number of pending readers
 readerWait  int32  // number of departing readers
}
wwriterSemgoroutinegoroutinegoroutinereaderSemgoroutinegoroutinegoroutineredaerCountgoroutinereaderWaitgoroutine

读锁

读锁的对应方法如下:

func (rw *RWMutex) RLock() {
  // 原子操作readerCount 只要值不是负数就表示获取读锁成功
 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
  // 有一个正在等待的写锁,为了避免饥饿后面进来的读锁进行阻塞等待
  runtime_SemacquireMutex(&rw.readerSem, false, 0)
 }
}

精简了竞态检测的方法,读锁方法就只有两行代码了,逻辑如下:

readerCountreadercount1goroutineruntime_SemacquireMutex

非阻塞加读锁

Go1.18
func (rw *RWMutex) TryRLock() bool {
 for {
    // 读取readerCount值能知道当前是否有写锁在阻塞等待,如果值为负数,那么后面的读锁就会被阻塞住
  c := atomic.LoadInt32(&rw.readerCount)
  if c < 0 {
   if race.Enabled {
    race.Enable()
   }
   return false
  }
    // 尝试获取读锁,for循环不断尝试
  if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
   if race.Enabled {
    race.Enable()
    race.Acquire(unsafe.Pointer(&rw.readerSem))
   }
   return true
  }
 }
}
for

释放读锁

释放读锁代码主要分为两部分,第一部分:

func (rw *RWMutex) RUnlock() {
  // 将readerCount的值减1,如果值等于等于0直接退出即可;否则进入rUnlockSlow处理
 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
  // Outlined slow-path to allow the fast-path to be inlined
  rw.rUnlockSlow(r)
 }
}
goroutine0
rUnlockSlow
func (rw *RWMutex) rUnlockSlow(r int32) {
  // r+1等于0表示没有加读锁就释放读锁,异常场景要抛出异常
  // r+1 == -rwmutexMaxReaders 也表示没有加读锁就是释放读锁
  // 因为写锁加锁成功后会将readerCout的值减去rwmutexMaxReaders
 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
  race.Enable()
  throw("sync: RUnlock of unlocked RWMutex")
 }
 // 如果有写锁正在等待读锁时会更新readerWait的值,所以一步递减rw.readerWait值
  // 如果readerWait在原子操作后的值等于0了说明当前阻塞写锁的读锁都已经释放了,需要唤醒等待的写锁
 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
  // The last reader unblocks the writer.
  runtime_Semrelease(&rw.writerSem, false, 1)
 }
}

解读一下这段代码:

r+10goroutiner+1 == -rwmutexMaxReadersreaderCountrwmutexMaxReadersreaderWaitgoroutinereaderWaitreaderWait0

写锁

写锁对应的方法如下:

const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
 // First, resolve competition with other writers.
  // 写锁也就是互斥锁,复用互斥锁的能力来解决与其他写锁的竞争
  // 如果写锁已经被获取了,其他goroutine在获取写锁时会进入自旋或者休眠
 rw.w.Lock()
 // 将readerCount设置为负值,告诉读锁现在有一个正在等待运行的写锁(获取互斥锁成功)
 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
 // 获取互斥锁成功并不代表goroutine获取写锁成功,我们默认最大有2^30的读操作数目,减去这个最大数目
  // 后仍然不为0则表示前面还有读锁,需要等待读锁释放并更新写操作被阻塞时等待的读操作goroutine个数;
 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
  runtime_SemacquireMutex(&rw.writerSem, false, 0)
 }
}

代码量不是很大,但是理解起来还有一点复杂,我尝试用文字来解析一下,主要分为两部分:

mutexgoroutinerwmutexMaxReaders = 1 << 302^302^30readerCount2^30r0readerWaitgoroutinereaderWaitreaderWait0

非阻塞加写锁

Go语言1.18
func (rw *RWMutex) TryLock() bool {
  // 先判断获取互斥锁是否成功,没有成功则直接返回false
 if !rw.w.TryLock() {
  if race.Enabled {
   race.Enable()
  }
  return false
 }
  // 互斥锁获取成功了,接下来就判断是否是否有读锁正在阻塞该写锁,如果没有直接更新readerCount为
  // 负数获取写锁成功;
 if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
  rw.w.Unlock()
  if race.Enabled {
   race.Enable()
  }
  return false
 }
 return true
}

释放写锁

func (rw *RWMutex) Unlock() {
 // Announce to readers there is no active writer.
  // 将readerCount的恢复为正数,也就是解除对读锁的互斥
 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
 if r >= rwmutexMaxReaders {
  race.Enable()
  throw("sync: Unlock of unlocked RWMutex")
 }
 // 如果后面还有读操作的goroutine则需要唤醒他们
 for i := 0; i < int(r); i++ {
  runtime_Semrelease(&rw.readerSem, false, 0)
 }
 // 释放互斥锁,写操作的goroutine和读操作的goroutine同时竞争
 rw.w.Unlock()
}
goroutine

总结

因为我们上文已经分享了互斥锁的实现方式,再来看读写锁就轻松许多了,文末我们再来总结一下读写锁:

readerCountreaderWaitgoroutinegoroutinegoroutinegoroutinegoroutinereaderWaitreaderWaitreaderWait0goroutine
mutexgoroutinereaderCount

读写锁的代码量不多,因为其复用了互斥锁的设计,针对读写锁的功能多做了一些工作,理解起来比互斥锁要容易很多,你学会了吗?宝贝~。

好啦,本文到这里就结束了,我是asong,我们下期见。

欢迎关注公众号:Golang梦工厂