原子操作就是不可中断的操作,这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界是看不到原子操作的中间状态。
简介
sync/atomicXXTypeint32int64uint32uint64uintptr
AddXXTypeLoadXXTypeStoreXXTypeCompareAndSwapXXTypeSwapXXType
原子操作与互斥锁区别
sync包中的同步的原语Mutex常用于保证并发安全,其与atomic有什么区别呢:
lock-freeCPU
对于一个变量更新,原子操作通常会更有效率,并且更能利用计算机多核的优势。
原子操作
所有原子操作方法的被操作数形参必须是指针类型,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的CPU指令,确保同一时间只有一个goroutine能够进行操作。
增加操作
所有增减操作以Add为前缀:
func AddInt32(addr *int32, delta int32) (new int32)func AddInt64(addr *int64, delta int64) (new int64)func AddUint32(addr *uint32, delta uint32) (new uint32)func AddUint64(addr *uint64, delta uint64) (new uint64)func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
原子减操作:
var opts int64 = 100
for i := 0; i < 50; i++ {
atomic.AddInt64(&opts, -1) // 减操作
time.Sleep(time.Millisecond)
}
Uint64减操作
对于有符号整数,直接Add对应的负数即可实现减操作;但是对于无符号整数,则会报错。
AddUint64(&x, ^uint64(c-1))AddUint64(&x, ^uint64(0))
比较并交换操作
比较并交换操作以CompareAndSwap为前缀:
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
old
func atomicAddOp(tmp int64) {
for {
oldValue := value
if atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {
return
}
}
}
载入操作
载入操作都以Load为前缀(可避免读取到写入一半的数据):
func LoadInt32(addr *int32) (val int32)func LoadInt64(addr *int64) (val int64)func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)func LoadUint32(addr *uint32) (val uint32)func LoadUint64(addr *uint64) (val uint64)func LoadUintptr(addr *uintptr) (val uintptr)
atomic.Value
sync/atomicValue
func(v *Value) Load() (x interface{})func(v *Value) Store(x interface{})
value存储的类型要求:
- 不能存储nil(存nil会抛出panic);
- value中存储的第一个值,决定了其后续的值类型(以后只能存储此类型的值);
- 尝试存储不同的类型,会抛出panic;
import (
"sync/atomic"
"time"
)
func loadConfig() map[string]string {
// 从数据库或者文件系统中读取配置信息,然后以map的形式存放在内存里
return make(map[string]string)
}
func requests() chan int {
// 将从外界中接收到的请求放入到channel里
return make(chan int)
}
func main() {
// config变量用来存放该服务的配置信息
var config atomic.Value
// 初始化时从别的地方加载配置文件,并存到config变量里
config.Store(loadConfig())
go func() {
// 每10秒钟定时的拉取最新的配置信息,并且更新到config变量里
for {
time.Sleep(10 * time.Second)
// 对应于赋值操作 config = loadConfig()
config.Store(loadConfig())
}
}()
// 创建工作线程,每个工作线程都会根据它所读取到的最新的配置信息来处理请求
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
// 对应于取值操作 c := config
// 由于Load()返回的是一个interface{}类型,所以我们要先强制转换一下
c := config.Load().(map[string]string)
// 这里是根据配置信息处理请求的逻辑...
_, _ = r, c
}
}()
}
}
value读取
nil^uintptr(0)
func (v *Value) Load() (x interface{}) {
// 将*Value指针类型转换为*ifaceWords指针类型
vp := (*ifaceWords)(unsafe.Pointer(v))
// 原子性的获取到v的类型typ的指针
typ := LoadPointer(&vp.typ)
// 如果没有写入或者正在写入,先返回,^uintptr(0)代表过渡状态
if typ == nil || uintptr(typ) == ^uintptr(0) {
return nil
}
// 原子性的获取到v的真正的值data的指针,然后返回
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
ifaceWords
- typ代表原始类型
- data代表真正的值
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
value存储
runtime_procPinruntime_procUnpin
func (v *Value) Store(x interface{}) {
if x == nil {
panic("sync/atomic: store of nil value into Value")
}
// 将现有的值和要写入的值转换为ifaceWords类型(以便获取到它们的原始类型和真正的值)
vp := (*ifaceWords)(unsafe.Pointer(v))
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
// 获取现有的值的type
typ := LoadPointer(&vp.typ)
// 如果typ为nil说明这是第一次Store
if typ == nil {
// 第一次,就锁定当前的processor,禁止其他goroutine抢占
runtime_procPin()
// 使用CAS操作,先尝试将typ设置为^uintptr(0)这个中间状态
// 如果失败,则证明已经有别的线程抢先完成了赋值操作
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
runtime_procUnpin()
continue
}
// 如果设置成功,就原子性的更新对应的指针,最后解除抢占锁
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
// typ为^uintptr(0)说明第一次写入还没有完成,继续循环等待
if uintptr(typ) == ^uintptr(0) {
continue
}
// 写入的类型和现有的类型不一致,则panic
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
// 更新data
StorePointer(&vp.data, xp.data)
return
}
}