前言

asongGochannelGosyncchannel
Golang

Go语言互斥锁设计实现

mutex介绍

syncmutexLock()Unlock()Go1.18TryLock()
Lock()LockgoroutinepanicUnlock()UnLockpanictryLock()TryLock
mutex
type Mutex struct {
    state int32
    sema  uint32
}
statesemagoroutine
statemutexedmutexWokenmutexStarvinggoroutine
const (
   mutexLocked = 1 << iota // 示意互斥锁的锁定状态
   mutexWoken // 示意从失常模式被从唤醒
   mutexStarving // 以后的互斥锁进入饥饿状态
   mutexWaiterShift = iota // 以后互斥锁上期待者的数量
)
mutexgouroutinegoroutinegoroutineGo1.9goroutine1msgoroutine
mutexmutex

Lock加锁

Lock
func (m *Mutex) Lock() {
    // 判断以后锁的状态,如果锁是齐全闲暇的,即m.state为0,则对其加锁,将m.state的值赋为1
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

下面的代码次要两局部逻辑:

CASstategoroutinelockSlowgoroutinelockSlow
lockSlowfor
CAS

初始化状态

locakSlow
func (m *Mutex) lockSlow() {
    var waitStartTime int64 
    starving := false
    awoke := false
    iter := 0
    old := m.state
    ........
}
waitStartTimewaiterstarvingawokegoroutineiterold

自旋

自旋的判断条件十分刻薄:

for {
    // 判断是否容许进入自旋 两个条件,条件1是以后锁不能处于饥饿状态
    // 条件2是在runtime_canSpin内实现,其逻辑是在多核CPU运行,自旋的次数小于4
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      // !awoke 判断以后goroutine不是在唤醒状态
      // old&mutexWoken == 0 示意没有其余正在唤醒的goroutine
      // old>>mutexWaiterShift != 0 示意期待队列中有正在期待的goroutine
      // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将以后锁的低2位的Woken状态位设置为1,示意已被唤醒, 这是为了告诉在解锁Unlock()中不要再唤醒其余的waiter了
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    // 设置以后goroutine唤醒胜利
          awoke = true
            }
      // 进行自旋
            runtime_doSpin()
      // 自旋次数
            iter++
      // 记录以后锁的状态
            old = m.state
            continue
        }
}
goroutinemutex
old&(mutexLocked|mutexStarving) == mutexLocked
mutexLocked
mutexStarving
mutexLocked|mutexStarving&
runtime_canSpin()
// / go/go1.18/src/runtime/proc.go
const active_spin     = 4
func sync_runtime_canSpin(i int) bool {
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

自旋条件如下:

CPUGOMAXPROCS>1
goroutineruntime_doSpin
const active_spin_cnt = 30
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ    again
    RET
30PAUSECPUCPU

这就是整个自旋操作的逻辑,这个就是为了优化 期待阻塞->唤醒->参加抢占锁这个过程不高效,所以应用自旋进行优化,在冀望在这个过程中锁被开释。

抢锁筹备冀望状态

mutexLockedmutexStarvingmutexWokenmutexWaiterShift
mutexLocked
    // 基于old状态申明到一个新状态
        new := old
        // 新状态处于非饥饿的条件下才能够加锁
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
mutexWaiterShift
//如果old曾经处于加锁或者饥饿状态,则期待者依照FIFO的程序排队
if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
mutexStarving
// 如果以后锁处于饥饿模式,并且已被加锁,则将低3位的Starving状态位设置为1,示意饥饿
if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
mutexWoken
// 以后goroutine的waiter被唤醒,则重置flag
if awoke {
            // 唤醒状态不统一,间接抛出异样
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
     // 新状态革除唤醒标记,因为前面的goroutine只会阻塞或者抢锁胜利
     // 如果是挂起状态,那就须要期待其余开释锁的goroutine来唤醒。
     // 如果其余goroutine在unlock的时候发现Woken的地位不是0,则就不会去唤醒,那该goroutine就无奈在被唤醒后加锁
            new &^= mutexWoken
}
CAS
CAS
// 这里尝试将锁的状态更新为冀望状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
  // 如果原来锁的状态是没有加锁的并且不处于饥饿状态,则示意以后goroutine曾经获取到锁了,间接推出即可
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 到这里就示意goroutine还没有获取到锁,waitStartTime是goroutine开始期待的工夫,waitStartTime != 0就示意以后goroutine曾经期待过了,则须要将其搁置在期待队列队头,否则就排到队列队尾
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
      // 阻塞期待
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 被信号量唤醒后查看以后goroutine是否应该示意为饥饿
     // 1. 以后goroutine曾经饥饿
     // 2. goroutine曾经期待了1ms以上
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  // 再次获取以后锁的状态
            old = m.state
   // 如果以后处于饥饿模式,
            if old&mutexStarving != 0 {
        // 如果以后锁既不是被获取也不是被唤醒状态,或者期待队列为空 这代表锁状态产生了不统一的问题
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
        // 以后goroutine曾经获取了锁,期待队列-1
                delta := int32(mutexLocked - 1<<mutexWaiterShift
         // 以后goroutine非饥饿状态 或者 期待队列只剩下一个waiter,则退出饥饿模式(革除饥饿标识位)              
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
        // 更新状态值并停止for循环,拿到锁退出
                atomic.AddInt32(&m.state, delta)
                break
            }
      // 设置以后goroutine为唤醒状态,且重置自璇次数
            awoke = true
            iter = 0
        } else {
      // 锁被其余goroutine占用了,还原状态持续for循环
            old = m.state
        }
CASruntime.sync_runtime_SemacquireMutexgoroutineruntime.sync_runtime_SemacquireMutexgoroutinegoroutinegoroutine

解锁

UnLock
func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}
AddInt320goroutineunlockSlow
func (m *Mutex) unlockSlow(new int32) {
  // 这里示意解锁了一个没有上锁的锁,则间接产生panic
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
  // 失常模式的开释锁逻辑
    if new&mutexStarving == 0 {
        old := new
        for {
      // 如果没有期待者则间接返回即可
      // 如果锁处于加锁的状态,示意曾经有goroutine获取到了锁,能够返回
      // 如果锁处于唤醒状态,这表明有期待的goroutine被唤醒了,不必尝试获取其余goroutine了
      // 如果锁处于饥饿模式,锁之后会间接给期待队头goroutine
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后waiter队列-1
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 抢占胜利唤醒一个goroutine
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
      // 执行抢占不胜利时从新更新一下状态信息,下次for循环持续解决
            old = m.state
        }
    } else {
    // 饥饿模式开释锁逻辑,间接唤醒期待队列goroutine
        runtime_Semrelease(&m.sema, true, 1)
    }
}
goroutinefunc runtime_Semrelease(s *uint32, handoff bool, skipframes int)handoff is true, pass count directly to the first waiter.

非阻塞加锁

Go1.18TryLock()
func (m *Mutex) TryLock() bool {
  // 记录以后状态
    old := m.state
  //  处于加锁状态/饥饿状态间接获取锁失败
    if old&(mutexLocked|mutexStarving) != 0 {
        return false
    }
    // 尝试获取锁,获取失败间接获取失败
    if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
        return false
    }


    return true
}
TryLock
  • 判断以后锁的状态,如果锁处于加锁状态或饥饿状态间接获取锁失败
  • 尝试获取锁,获取失败间接获取锁失败
TryLock

总结

通读源码后你会发现互斥锁的逻辑真的十分复杂,代码量尽管不多,然而很难以了解,一些细节点还须要大家多看看几遍能力了解其为什么这样做,文末咱们再总结一下互斥锁的知识点:

goroutinegoroutineGo1.9goroutine1msMutexgoroutine1msMutexgoroutinegoroutineunlockedslowmutexLockedgoroutineMutexMutexLock

本文之后你对互斥锁有什么不了解的吗?欢送评论区批评指正~;

好啦,本文到这里就完结了,我是asong,咱们下期见。

欢送关注公众号:Golang梦工厂