syncsync.WaitGroupsync.Mapsync.Pool

什么是原子操作?

原子操作是变量级别的互斥锁。

Mutexsync/atomic

我们可以用下图来表示:

 

vvv
sync/atomic

原子操作的使用场景是什么?

拿一个简单的例子来说明一下原子操作的使用场景:

func TestAtomic(t *testing.T) {
	var sum = 0
	var wg sync.WaitGroup
	wg.Add(1000)

	// 启动 1000 个协程,每个协程对 sum 做加法操作
	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			sum++
		}()
	}

	// 等待所有的协程都执行完毕
	wg.Wait()
	fmt.Println(sum) // 这里输出多少呢?
}

我们可以在自己的电脑上运行一下这段代码,看看输出的结果是多少。 不出意外的话,应该每次可能都不一样,而且应该也不是 1000,这是为什么呢?

sumsumsumsum
sumsum

在这种场景下,我们可以使用原子操作来实现并发安全的加法操作:

func TestAtomic1(t *testing.T) {
	// 将 sum 的类型改成 int32,因为原子操作只能针对 int32、int64、uint32、uint64、uintptr 这几种类型
	var sum int32 = 0
	var wg sync.WaitGroup
	wg.Add(1000)

    // 启动 1000 个协程,每个协程对 sum 做加法操作
	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			// 将 sum++ 改成下面这样
			atomic.AddInt32(&sum, 1)
		}()
	}

	wg.Wait()
	fmt.Println(sum) // 输出 1000
}

在上面这个例子中,我们每次执行都能得到 1000 这个结果。

因为使用原子操作的时候,同一时刻只能有一个 CPU 对变量进行读或写,所以就不会出现上面的问题了。

所以很多需要对变量做并发读写的地方,我们都可以考虑一下,是否可以使用原子操作来实现并发安全的操作(而不是使用互斥锁,互斥锁效率相比原子操作要低一些)。

原子操作的使用场景也是和互斥锁类似的,但是不一样的是,我们的锁粒度只是一个变量而已。也就是说,当我们不允许多个 CPU 同时对变量进行读写的时候(保证变量同一时刻只能一个 CPU 操作),就可以使用原子操作。

原子操作是怎么实现的?

看完上面原子操作的介绍,有没有觉得原子操作很神奇,居然有这么好用的东西。那它到底是怎么实现的呢?

一般情况下,原子操作的实现需要特殊的 CPU 指令或者系统调用。 这些指令或者系统调用可以保证在执行期间不会被其他操作或事件中断,从而保证操作的原子性。

LOCKLOCKLOCK

x86 LOCK 的时候发生了什么

LOCK
  1. CPU 会将当前处理器缓存中的数据写回到内存中。(因此我们总能读取到最新的数据)
  2. 然后锁定该内存地址,防止其他 CPU 修改该地址的数据。
  3. 一旦当前 CPU 对该地址的操作完成,CPU 会释放该内存地址的锁定,其他 CPU 才能继续对该地址进行访问。

其他架构的 CPU 可能会略有不同,但是原理是一样的。

原子操作有什么特征?

  1. 不会被中断:原子操作是一个不可分割的操作,要么全部执行,要么全部不执行,不会出现中间状态。这是保证原子性的基本前提。同时,原子操作过程中不会有上下文切换的过程。
  2. 操作对象是共享变量:原子操作通常是对共享变量进行的,也就是说,多个协程可以同时访问这个变量,因此需要采用原子操作来保证数据的一致性和正确性。
  3. 并发安全:原子操作是并发安全的,可以保证多个协程同时进行操作时不会出现数据竞争问题(虽然说是同时,但是实际上在操作那个变量的时候是互斥的)。
  4. 无需加锁:原子操作不需要使用互斥锁来保证数据的一致性和正确性,因此可以避免互斥锁的使用带来的性能损失。
  5. 适用场景比较局限:原子操作适用于操作单个变量,如果需要同时并发读写多个变量,可能需要考虑使用互斥锁。

go 里面有哪些原子操作?

AddCompareAndSwapLoadStoreSwap

增减(Add)

Addint32int64uint32uint64uintptr
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
int32int64

比较并交换(CompareAndSwap)

CASCASoldnewaddrboolswap

也就是说,这个操作可能是不成功的。这很正常,在并发环境下,多个协程对同一个变量进行操作,肯定会存在竞争的情况。 在这种情况下,偶尔的失败是正常的,我们只需要在失败的时候,重新尝试即可。 因为原子操作需要的时间往往是比较短的,因此在失败的时候,我们可以通过自旋的方式来再次进行尝试。

在这种情况下,如果不自旋,那就需要将这个协程挂起,等待其他协程完成操作,然后再次尝试。这个过程相比自旋可能会更加耗时。 因为很有可能这次原子操作不成功,下一次就成功了。如果我们每次都将协程挂起,那么效率就会大大降低。

forsyncsync.Mapsync.Pool
CompareAndSwap
CompareAndSwapint32int64uint32uint64uintptrunsafe.Pointer
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

载入(Load)

原子性的读取操作接受一个对应类型的指针值,返回该指针指向的值。原子性读取意味着读取值的同时,当前计算机的任何 CPU 都不会进行针对值的读写操作。

Loadv := valuevvalue

Load 操作有下面这些:

func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

存储(Store)

Storeval*addrStoreStore*addr
val*addrsync/atomicLoadStoreStore
Store
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintpre, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

交换(Swap)

SwapStore*addr
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

原子操作任意类型的值 - atomic.Value

int32int64uint32uint64uintptrunsafe.Pointerstringstructatomic.Value
atomic.Valueany
atomic.Value
LoadValueStoreValueSwapValueCompareAndSwapValueoldnewValuetruefalse
atomic.Valueatomic.Valueatomic.Value

atomic.Value 源码分析

atomic.Value
// Value 提供一致类型值的原子加载和存储。
type Value struct {
	v any
}

Load - 读取

LoadStoreStorenil
// Load 返回由最近的 Store 设置的值。
func (v *Value) Load() (val any) {
	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))

	// 判断 atomic.Value 的类型
	typ := LoadPointer(&vp.typ)
	// 第一次 Store 还没有完成,直接返回 nil
	if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
		// firstStoreInProgress 是一个特殊的变量,存储到 typ 中用来表示第一次 Store 还没有完成
		return nil
	}

	// 获取 atomic.Value 的值
	data := LoadPointer(&vp.data)
	// 将 val 转换为 efaceWords 类型
	vlp := (*efaceWords)(unsafe.Pointer(&val))
	// 分别赋值给 val 的 typ 和 data
	vlp.typ = typ
	vlp.data = data
	return
}
atomic.ValueefaceWordsinterface{}/any
// 表示一个 interface{}/any 类型
type efaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}
valvalefaceWords
int32int64uint32uint64uintptrunsafe.Pointerinterface{}interface{}interface{}typedatatypeinterface{}datainterface{}unsafe.Pointerinterface{}typedatainterface{}

Store - 存储

StoreValuevalStore(nil)panic
// Store 将 Value 的值设置为 val。
func (v *Value) Store(val any) {
	// 不能存储 nil 值
	if val == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))
	// val 转换为 efaceWords
	vlp := (*efaceWords)(unsafe.Pointer(&val))
	
	// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
	for {
		// LoadPointer 可以保证获取到的是最新的
		typ := LoadPointer(&vp.typ)
		// 第一次 store 的时候 typ 还是 nil,说明是第一次 store
		if typ == nil {
			// 尝试开始第一次 Store。
			// 禁用抢占,以便其他 goroutines 可以自旋等待完成。
			// (如果允许抢占,那么其他 goroutine 自旋等待的时间可能会比较长,因为可能会需要进行协程调度。)
			runtime_procPin()
			// 抢占失败,意味着有其他 goroutine 成功 store 了,允许抢占,再次尝试 Store
			// 这也是一个原子操作。
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
				runtime_procUnpin()
				continue
			}
			// 完成第一次 store
			// 因为有 firstStoreInProgress 标识的保护,所以下面的两个原子操作是安全的。
			StorePointer(&vp.data, vlp.data) // 存储值(原子操作)
			StorePointer(&vp.typ, vlp.typ)   // 存储类型(原子操作)
			runtime_procUnpin()              // 允许抢占
			return
		}

		// 另外一个 goroutine 正在进行第一次 Store。自旋等待。
		if typ == unsafe.Pointer(&firstStoreInProgress) {
			continue
		}

		// 第一次 Store 已经完成了,下面不是第一次 Store 了。
		// 需要检查当前 Store 的类型跟第一次 Store 的类型是否一致,不一致就 panic。
		if typ != vlp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}

		// 后续的 Store 只需要 Store 值部分就可以了。
		// 因为 atomic.Value 只能保存一种类型的值。
		StorePointer(&vp.data, vlp.data)
		return
	}
}
Store
firstStoreInProgressStoregoroutineStoregoroutinetypdatafirstStoreInProgressStorePfirstStoreInProgressStoreStorefirstStoreInProgressinterface{}firstStoreInProgressStoreStoreatomic.Value

Swap - 交换

SwapValuenewSwap(nil)panic
// Swap 将 Value 的值设置为 new 并返回旧值。
func (v *Value) Swap(new any) (old any) {
	// 不能存储 nil 值
	if new == nil {
		panic("sync/atomic: swap of nil value into Value")
	}

	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))
	// new 转换为 efaceWords
	np := (*efaceWords)(unsafe.Pointer(&new))
	
	// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
	for {
		// 下面这部分代码跟 Store 一样,不细说了。
		// 这部分代码是进行第一次存储的代码。
		typ := LoadPointer(&vp.typ)
		if typ == nil {
			runtime_procPin()
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
				runtime_procUnpin()
				continue
			}
			StorePointer(&vp.data, np.data)
			StorePointer(&vp.typ, np.typ)
			runtime_procUnpin()
			return nil
		}
		if typ == unsafe.Pointer(&firstStoreInProgress) {
			continue
		}
		if typ != np.typ {
			panic("sync/atomic: swap of inconsistently typed value into Value")
		}

		// ---- 下面是 Swap 的特有逻辑 ----
		// op 是返回值
		op := (*efaceWords)(unsafe.Pointer(&old))
		// 返回旧的值
		op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
		return old
	}
}

CompareAndSwap - 比较并交换

CompareAndSwapValueoldnewtruefalseCompareAndSwap(nil, nil)panic
// CompareAndSwap 比较并交换。
func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
	// 注意:old 是可以为 nil 的,new 不能为 nil。
	// old 是 nil 表示是第一次进行 Store 操作。
	if new == nil {
		panic("sync/atomic: compare and swap of nil value into Value")
	}

	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))
	// new 转换为 efaceWords
	np := (*efaceWords)(unsafe.Pointer(&new))
	// old 转换为 efaceWords
	op := (*efaceWords)(unsafe.Pointer(&old))

	// old 和 new 类型必须一致,且不能为 nil
	if op.typ != nil && np.typ != op.typ {
		panic("sync/atomic: compare and swap of inconsistently typed values")
	}

	// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
	for {
		// LoadPointer 可以保证获取到的 typ 是最新的
		typ := LoadPointer(&vp.typ)
		if typ == nil { // atomic.Value 是 nil,还没 Store 过
			// 准备进行第一次 Store,但是传递进来的 old 不是 nil,compare 这一步就失败了。直接返回 false
			if old != nil {
				return false
			}

			// 下面这部分代码跟 Store 一样,不细说了。 
			// 这部分代码是进行第一次存储的代码。
			runtime_procPin()
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
				runtime_procUnpin()
				continue
			}
			StorePointer(&vp.data, np.data)
			StorePointer(&vp.typ, np.typ)
			runtime_procUnpin()
			return true
		}
		if typ == unsafe.Pointer(&firstStoreInProgress) {
			continue
		}
		if typ != np.typ {
			panic("sync/atomic: compare and swap of inconsistently typed value into Value")
		}

		// 通过运行时相等性检查比较旧版本和当前版本。
		// 这允许对值类型进行比较,这是包函数所没有的。
		// 下面的 CompareAndSwapPointer 仅确保 vp.data 自 LoadPointer 以来没有更改。
		data := LoadPointer(&vp.data)
		var i any
		(*efaceWords)(unsafe.Pointer(&i)).typ = typ
		(*efaceWords)(unsafe.Pointer(&i)).data = data
		if i != old { // atomic.Value 跟 old 不相等
			return false
		}
		// 只做 val 部分的 cas 操作
		return CompareAndSwapPointer(&vp.data, data, np.data)
	}
}
data := LoadPointer(&vp.data)atomic.ValueCompareAndSwapPointer(&vp.data, old.data, np.data)CASoldinterface{}/anyolddatavp.dataCASfalse
interface{}interface{}interface{}
typdataanyi != oldinterface{}

其他原子类型

atomic.Valueint32int64uint32uint64uintptrunsafe.Pointer

对应的类型如下:

atomic.Booluint32atomic.Booluint32atomic.Int32int32atomic.Int64int64atomic.Uint32uint32atomic.Uint64uint64atomic.Uintptruintptratomic.Pointerunsafe.Pointer
atomic.Int32
// An Int32 is an atomic int32. The zero value is zero.
type Int32 struct {
	_ noCopy
	v int32
}

// Load atomically loads and returns the value stored in x.
func (x *Int32) Load() int32 { return LoadInt32(&x.v) }

// Store atomically stores val into x.
func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }

// Swap atomically stores new into x and returns the previous value.
func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }

// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {
	return CompareAndSwapInt32(&x.v, old, new)
}
atomic.Int32atomicint32

原子操作与互斥锁比较

那我们有了互斥锁,为什么还要有原子操作呢?我们进行比较一下就知道了:

原子操作互斥锁
保护的范围变量代码块
保护的粒度
性能
如何实现的硬件指令软件层面实现,逻辑较多

如果我们只需要对某一个变量做并发读写,那么使用原子操作就可以了,因为原子操作的性能比互斥锁高很多。 但是如果我们需要对多个变量做并发读写,那么就需要用到互斥锁了,这种场景往往是在一段代码中对不同变量做读写。

性能比较

我们前面这个表格提到了原子操作与互斥锁性能上有差异,我们写几行代码来进行比较一下:

// 系统信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
// 10.13 ns/op
func BenchmarkMutex(b *testing.B) {
   var mu sync.Mutex

   for i := 0; i < b.N; i++ {
      mu.Lock()
      mu.Unlock()
   }
}

// 5.849 ns/op
func BenchmarkAtomic(b *testing.B) {
   var sum atomic.Uint64

   for i := 0; i < b.N; i++ {
      sum.Add(uint64(1))
   }
}
MutexLock()UnLock()MutexAtomicsumAtomicMutex40%
MutexMutex40%Mutex

go 的 sync 包中的原子操作

sync.Mapsync.Pool

sync.Map 中的原子操作

sync.Mapentry
// 删除 entry
func (e *entry) delete() (value any, ok bool) {
	for {
		p := e.p.Load()
		// 已经被删除了,不需要再删除
		if p == nil || p == expunged {
			return nil, false
		}
		// 删除成功
		if e.p.CompareAndSwap(p, nil) {
			return *p, true
		}
	}
}

// 如果条目尚未删除,trySwap 将交换一个值。
func (e *entry) trySwap(i *any) (*any, bool) {
	for {
		p := e.p.Load()
		// 已经被删除了
		if p == expunged {
			return nil, false
		}
		// swap 成功
		if e.p.CompareAndSwap(p, i) {
			return p, true
		}
	}
}
forCompareAndSwapentry

如果我们也需要对变量做并发读写,也可以尝试一下这种 for + CompareAndSwap 的组合。

sync.WaitGroup 中的原子操作

sync.WaitGroupatomic.Uint64stateWaitGroupWaitGroupWaitGroupWaiter

如果不使用一个变量来记录这两个值,那么我们就需要使用两个变量来记录,这样就会导致我们需要对两个变量做并发读写, 在这种情况下,我们就需要使用互斥锁来保护这两个变量,这样就会导致性能的下降。

而使用一个变量来记录这两个值,我们就可以使用原子操作来保护这个变量,这样就可以保证并发读写的安全性,同时也能得到更好的性能:

// WaitGroup 的 Add 函数:高 32 位加上 delta
state := wg.state.Add(uint64(delta) << 32)

// WaitGroup 的 Wait 函数:低 32 位加 1
// 等待者的数量加 1
wg.state.CompareAndSwap(state, state+1)

CAS 操作有失败必然有成功

CASCASsumCAS
func TestCas(t *testing.T) {
	var sum int32 = 0
	var wg sync.WaitGroup
	wg.Add(1000)

	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			// 这一行是有可能会失败的
			atomic.CompareAndSwapInt32(&sum, sum, sum+1)
		}()
	}

	wg.Wait()
	fmt.Println(sum) // 不是 1000
}
atomic.AddInt32(&sum, 1)atomic.CompareAndSwapInt32(&sum, sum, sum+1)atomic.CompareAndSwapInt32(&sum, sum, sum+1)sumCASsumsum + 1
sumsumsumCASsumCAS
func TestCas(t *testing.T) {
	var sum int32 = 0
	var wg sync.WaitGroup
	wg.Add(1000)

	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			
			// cas 失败的时候,重新获取 sum 的值进行计算。
			// cas 成功则返回。
			for {
				if atomic.CompareAndSwapInt32(&sum, sum, sum+1) {
					return
				}
			}
		}()
	}

	wg.Wait()
	fmt.Println(sum)
}

总结

原子操作是并发编程中非常重要的一个概念,它可以保证并发读写的安全性,同时也能得到更好的性能。

最后,总结一下本文讲到的内容:

AddCompareAndSwapLoadStoreSwapatomic.Valuesync.WaitGroupsync.MapentryCASCASCAS

总的来说,原子操作本身其实没有太复杂的逻辑,我们理解了它的原理之后,就可以很容易的使用它了。