目录
  • 引言
  • Mutex结构
  • 饥饿模式和正常模式
    • 正常模式
    • 饥饿模式
    • 状态的切换
  • 加锁和解锁
    • 加锁
      • 自旋
      • 计算锁的新状态
      • 更新锁状态
    • 解锁
  • 可能遇到的问题
    • 锁拷贝
    • panic导致没有unlock

引言

Golang的并发编程令人着迷,使用轻量的协程、基于CSP的channel、简单的go func()就可以开始并发编程,在并发编程中,往往离不开锁的概念。

sync.Mutex

Mutex结构

sync/mutex.go
type Mutex struct {
   state int32
   sema  uint32
}
statesema

互斥锁的状态定义在常量中:

const (
   mutexLocked = 1 << iota // 1 ,处于锁定状态; 2^0
   mutexWoken // 2 ;从正常模式被从唤醒;  2^1
   mutexStarving // 4 ;处于饥饿状态;    2^2
   mutexWaiterShift = iota // 3 ;获得互斥锁上等待的Goroutine个数需要左移的位数: 1 << mutexWaiterShift
   starvationThresholdNs = 1e6 // 锁进入饥饿状态的等待时间
)

0即其他状态。

sema

一个mutex对象仅占用8个字节,让人不禁感叹其设计的巧妙

饥饿模式和正常模式

正常模式

在正常模式下,等待的协程会按照先进先出的顺序得到锁 在正常模式下,刚被唤醒的goroutine与新创建的goroutine竞争时,大概率无法获得锁。

饥饿模式

为了避免正常模式下,goroutine被“饿死”的情况,go在1.19版本引入了饥饿模式,保证了Mutex的公平性

在饥饿模式中,互斥锁会直接交给等待队列最前面的goroutine。新的goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。

状态的切换

在正常模式下,一旦Goroutine超过1ms没有获取到锁,它就会将当前互斥锁切换饥饿模式

如果一个goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

加锁和解锁

加锁

func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      return
   }
   // 原注释: Slow path (outlined so that the fast path can be inlined)
   // 将
   m.lockSlow()
}

可以看到,当前互斥锁的状态为0时,尝试将当前锁状态设置为更新锁定状态,且这些操作是原子的。

lockSlow
var waitStartTime int64
starving := false //
awoke := false
iter := 0
old := m.state

随后进入一个很大的for循环,让我们来逐步分析

自旋

for {
     // 1 && 2
   if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      //  3.
      if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
         atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
         awoke = true
      }
      runtime_doSpin()
      iter++
      old = m.state
      continue
   }

old&(mutexLocked|mutexStarving) == mutexLocked

当且仅当当前锁状态为mutexLocked时,表达式为true

runtime_canSpin(iter)
  • 运行在拥有多个CPU的机器上;
  • 当前Goroutine为了获取该锁进入自旋的次数小于四次;
  • 当前机器上至少存在一个正在运行的处理器 P,并且处理的运行队列为空;
awokemutexWoken
runtime_doSpin()PAUSE
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
   procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET

计算锁的新状态

停止了自旋后,

new := old
// 1.
if old&mutexStarving == 0 {
   new |= mutexLocked
}
// 2.
if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
}
// 3 && 4.
if starving && old&mutexLocked != 0 {
   new |= mutexStarving
}
// 5.
if awoke {
   if new&mutexWoken == 0 {
      throw("sync: inconsistent mutex state")
   }
   new &^= mutexWoken
}
old&mutexStarving == 0mutexLockedmutexStarvingmutexLocked被唤醒过且抢锁失败

更新锁状态

// 1.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
      if old&(mutexLocked|mutexStarving) == 0 {
         break // locked the mutex with CAS
      }
      // 2.
      queueLifo := waitStartTime != 0
      if waitStartTime == 0 {
         waitStartTime = runtime_nanotime()
      }
      // 3.
      runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 4.
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      old = m.state
      // 5.
      if old&mutexStarving != 0 {
         /
         if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
         }
         delta := int32(mutexLocked - 1<<mutexWaiterShift)
         if !starving || old>>mutexWaiterShift == 1 {
            delta -= mutexStarving
         }
         atomic.AddInt32(&m.state, delta)
         break
      }
      awoke = true
      iter = 0
   } else {
      old = m.state
   }
}
mutexLockedwaitStartTimeruntime_SemacquireMutexruntime_SemacquireMutexruntime_SemacquireMutexstarvingstarvingstarvationThresholdNsstarving

之后会按照饥饿模式与正常模式,走不同的逻辑

  • - 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
  • - 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;

解锁

func (m *Mutex) Unlock() {
   // 1.
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // 2.
      m.unlockSlow(new)
   }
}
unlockSlow
func (m *Mutex) unlockSlow(new int32) {
    // 1.
   if (new+mutexLocked)&mutexLocked == 0 {
      throw("sync: unlock of unlocked mutex")
   }
   if new&mutexStarving == 0 {
      old := new
      for {
      // 2.
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 2.1.
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
         // 2.2.
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
   // 3.
      runtime_Semrelease(&m.sema, true, 1)
   }
}
new+mutexLocked

2. 如果等待者数量等于0,或者锁的状态已经变为mutexWoken、mutexStarving、mutexStarving,则直接返回

  • 将waiterCount数量-1,尝试选择一个goroutine唤醒
  • 尝试更新锁状态,如果更新锁状态成功,则唤醒队尾的一个gorountine
2

可能遇到的问题

锁拷贝

mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
mu2mu1
mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
mu1.Unlock()

可以看到发生了error

panic导致没有unlock

当lock()之后,可能由于代码问题导致程序发生了panic,那么mutex无法被及时unlock(),由于其他协程还在等待锁,此时可能触发死锁

func TestWithLock() {
   nums := 100
   wg := &sync.WaitGroup{}
   safeSlice := SafeSlice{
      s:    []int{},
      lock: new(sync.RWMutex),
   }
   i := 0
   for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
      wg.Add(1)
      go func() {
         defer func() {
            if r := recover(); r != nil {
               log.Println("recover")
            }
            wg.Done()
         }()
         safeSlice.lock.Lock()
         safeSlice.s = append(safeSlice.s, i)
         if i == 98{
            panic("123")
         }
         i++
         safeSlice.lock.Unlock()
      }()
   }
   wg.Wait()
   log.Println(len(safeSlice.s))
}

修改:

func TestWithLock() {
   nums := 100
   wg := &sync.WaitGroup{}
   safeSlice := SafeSlice{
      s:    []int{},
      lock: new(sync.RWMutex),
   }
   i := 0
   for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
      wg.Add(1)
      go func() {
         defer func() {
            if r := recover(); r != nil {
            }
            safeSlice.lock.Unlock()
            wg.Done()
         }()
         safeSlice.lock.Lock()
         safeSlice.s = append(safeSlice.s, i)
         if i == 98{
            panic("123")
         }
         i++
      }()
   }
   wg.Wait()
   log.Println(len(safeSlice.s))
}