readerCount
type Mutex struct {
	mu chan struct{}
}

func NewMutex() *Mutex {
	return &Mutex{mu: make(chan struct{}, 1)}
}

func (m *Mutex) Lock() {
	m.mu <- struct{}{}
}

func (m *Mutex) Unlock() {
	<-m.mu
}

func main() {
	mu := NewMutex()
	inx := 0
	wg := new(sync.WaitGroup)
	wg.Add(100)
	for i := 0; i < 100; i++ {
		go func() {
			defer wg.Done()
			mu.Lock()
			inx++
			mu.Unlock()
		}()
	}
	wg.Wait()
	fmt.Println(inx)
}
❯ go run -race main.go
100

源码解读

type Mutex struct {
	state int32  //互斥锁的状态 waiter(29)阻塞协程数 starving(是否饥饿) woken(是否有协程被唤醒) locked(是否被锁定)
	sema  uint32 //信号量
}

加锁:

woken

自旋的好处:
更充分的利用CPU,尽量避免协程切换
自旋条件;
1. 多CPU
2. 当前协程为了获取此锁进入自旋数量<4
3. 当前机器上至少存在一个正在运行的处理器P且其运行队列为空
总结:当CPU闲着的时候可以让它忙一下。
饥饿模式:
当自旋的协程每次都抢到锁,为了防止正常阻塞的等待锁的协程不被饿死,当协程等待时间超过1ms时就会启动饥饿模式,处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。
如果当前协程是最后一个协程或者等待时间小于1ms就恢复为普通模式

读写锁

读写锁很多都是直接参考于源码。


const rwMutexMaxReaders = 1 << 30 //一个无法达到的最大读数量

type RWMutex struct {
	mu          chan struct{} //互斥锁
	readerCount int32 // 读数量
	readerWait  int32 // 写等待的读的数量
	wchan       chan struct{} //用于唤醒等待读的写
	rchan       chan struct{} //用于唤醒等待写的读
}

func NewRWMutex() *RWMutex {
	return &RWMutex{mu: make(chan struct{}, 1), wchan: make(chan struct{}), rchan: make(chan struct{})}
}

func (rw *RWMutex) Lock() {
	rw.mu <- struct{}{} //获取锁
	//阻止之后的读操作,等待现有的读操作
	if r := atomic.AddInt32(&rw.readerCount, -rwMutexMaxReaders) + rwMutexMaxReaders; r > 0 {
		atomic.AddInt32(&rw.readerWait, r) //增加写阻塞时读数量
		<-rw.wchan
	}
}

func (rw *RWMutex) Unlock() {
	//唤醒等待的读
	if r := atomic.AddInt32(&rw.readerCount, rwMutexMaxReaders); r > 0 {
		for i := 0; i < int(r); i++ {
			rw.rchan <- struct{}{}
		}
	}
	//解锁
	<-rw.mu
}

func (rw *RWMutex) RLock() {
	//增加读数量,如果有写就等待写
	if r := atomic.AddInt32(&rw.readerCount, 1); r < 0 {
		<-rw.rchan
	}
}

func (rw *RWMutex) RUnlock() {
	//减少读数量,有写等待就进一步判断如果自己是最后一个读就唤醒写
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		if rwait := atomic.AddInt32(&rw.readerWait, -1); rwait == 0 {
			rw.wchan <- struct{}{}
		}
	}
}

func main() {
	rw := NewRWMutex()
	num := 0
	wg := new(sync.WaitGroup)
	wg.Add(100)
	for i := 0; i < 100; i++ {
		go func(i int) {
			defer wg.Done()
			switch i % 2 {
			case 0:
				rw.Lock()
				defer rw.Unlock()
				num++
			case 1:
				time.Sleep(time.Duration(rand.Intn(2)) * time.Millisecond)
				rw.RLock()
				defer rw.RUnlock()
				fmt.Println(num)
			}
		}(i)
	}
	wg.Wait()
}
❯ go run -race main.go
...

源码分析

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers 写阻塞等待的信号量,最后一个读协程释放锁后会释放
	readerSem   uint32 // semaphore for readers to wait for completing writers 读阻塞等待的信号量,持有写锁的协程释放锁后会释放
	readerCount int32  // number of pending readers 正在读协程的个数
	readerWait  int32  // number of departing readers 写阻塞时读协程的个数
}

const rwmutexMaxReaders = 1 << 30
readerCountreaderWaitreaderCountreaderCountreaderCountreaderWait