readerCount
锁
type Mutex struct {
mu chan struct{}
}
func NewMutex() *Mutex {
return &Mutex{mu: make(chan struct{}, 1)}
}
func (m *Mutex) Lock() {
m.mu <- struct{}{}
}
func (m *Mutex) Unlock() {
<-m.mu
}
func main() {
mu := NewMutex()
inx := 0
wg := new(sync.WaitGroup)
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
mu.Lock()
inx++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(inx)
}
❯ go run -race main.go
100
源码解读
type Mutex struct {
state int32 //互斥锁的状态 waiter(29)阻塞协程数 starving(是否饥饿) woken(是否有协程被唤醒) locked(是否被锁定)
sema uint32 //信号量
}
加锁:
woken
自旋的好处:
更充分的利用CPU,尽量避免协程切换
自旋条件;
1. 多CPU
2. 当前协程为了获取此锁进入自旋数量<4
3. 当前机器上至少存在一个正在运行的处理器P且其运行队列为空
总结:当CPU闲着的时候可以让它忙一下。
饥饿模式:
当自旋的协程每次都抢到锁,为了防止正常阻塞的等待锁的协程不被饿死,当协程等待时间超过1ms时就会启动饥饿模式,处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。
如果当前协程是最后一个协程或者等待时间小于1ms就恢复为普通模式
读写锁很多都是直接参考于源码。
const rwMutexMaxReaders = 1 << 30 //一个无法达到的最大读数量
type RWMutex struct {
mu chan struct{} //互斥锁
readerCount int32 // 读数量
readerWait int32 // 写等待的读的数量
wchan chan struct{} //用于唤醒等待读的写
rchan chan struct{} //用于唤醒等待写的读
}
func NewRWMutex() *RWMutex {
return &RWMutex{mu: make(chan struct{}, 1), wchan: make(chan struct{}), rchan: make(chan struct{})}
}
func (rw *RWMutex) Lock() {
rw.mu <- struct{}{} //获取锁
//阻止之后的读操作,等待现有的读操作
if r := atomic.AddInt32(&rw.readerCount, -rwMutexMaxReaders) + rwMutexMaxReaders; r > 0 {
atomic.AddInt32(&rw.readerWait, r) //增加写阻塞时读数量
<-rw.wchan
}
}
func (rw *RWMutex) Unlock() {
//唤醒等待的读
if r := atomic.AddInt32(&rw.readerCount, rwMutexMaxReaders); r > 0 {
for i := 0; i < int(r); i++ {
rw.rchan <- struct{}{}
}
}
//解锁
<-rw.mu
}
func (rw *RWMutex) RLock() {
//增加读数量,如果有写就等待写
if r := atomic.AddInt32(&rw.readerCount, 1); r < 0 {
<-rw.rchan
}
}
func (rw *RWMutex) RUnlock() {
//减少读数量,有写等待就进一步判断如果自己是最后一个读就唤醒写
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if rwait := atomic.AddInt32(&rw.readerWait, -1); rwait == 0 {
rw.wchan <- struct{}{}
}
}
}
func main() {
rw := NewRWMutex()
num := 0
wg := new(sync.WaitGroup)
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
defer wg.Done()
switch i % 2 {
case 0:
rw.Lock()
defer rw.Unlock()
num++
case 1:
time.Sleep(time.Duration(rand.Intn(2)) * time.Millisecond)
rw.RLock()
defer rw.RUnlock()
fmt.Println(num)
}
}(i)
}
wg.Wait()
}
❯ go run -race main.go
...
源码分析
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers 写阻塞等待的信号量,最后一个读协程释放锁后会释放
readerSem uint32 // semaphore for readers to wait for completing writers 读阻塞等待的信号量,持有写锁的协程释放锁后会释放
readerCount int32 // number of pending readers 正在读协程的个数
readerWait int32 // number of departing readers 写阻塞时读协程的个数
}
const rwmutexMaxReaders = 1 << 30
readerCountreaderWaitreaderCountreaderCountreaderCountreaderWait