原子操作就是不可中断的操作,这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界是看不到原子操作的中间状态。

简介

sync/atomicXXTypeint32int64uint32uint64uintptr
AddXXTypeLoadXXTypeStoreXXTypeCompareAndSwapXXTypeSwapXXType

原子操作与互斥锁区别

sync包中的同步的原语Mutex常用于保证并发安全,其与atomic有什么区别呢:

lock-freeCPU

对于一个变量更新,原子操作通常会更有效率,并且更能利用计算机多核的优势。

原子操作

所有原子操作方法的被操作数形参必须是指针类型,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的CPU指令,确保同一时间只有一个goroutine能够进行操作。

增加操作

所有增减操作以Add为前缀:

func AddInt32(addr *int32, delta int32) (new int32)func AddInt64(addr *int64, delta int64) (new int64)func AddUint32(addr *uint32, delta uint32) (new uint32)func AddUint64(addr *uint64, delta uint64) (new uint64)func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

原子减操作:

   var opts int64 = 100

   for i := 0; i < 50; i++ { 
       atomic.AddInt64(&opts, -1) // 减操作
       time.Sleep(time.Millisecond)
   }

Uint64减操作

对于有符号整数,直接Add对应的负数即可实现减操作;但是对于无符号整数,则会报错。

AddUint64(&x, ^uint64(c-1))AddUint64(&x, ^uint64(0))

比较并交换操作

比较并交换操作以CompareAndSwap为前缀:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
old
func atomicAddOp(tmp int64) {
for {
       oldValue := value
       if atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) {
           return
       }
   }
}

载入操作

载入操作都以Load为前缀(可避免读取到写入一半的数据):

func LoadInt32(addr *int32) (val int32)func LoadInt64(addr *int64) (val int64)func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)func LoadUint32(addr *uint32) (val uint32)func LoadUint64(addr *uint64) (val uint64)func LoadUintptr(addr *uintptr) (val uintptr)

atomic.Value

sync/atomicValue
func(v *Value) Load() (x interface{})func(v *Value) Store(x interface{})

value存储的类型要求:

  • 不能存储nil(存nil会抛出panic);
  • value中存储的第一个值,决定了其后续的值类型(以后只能存储此类型的值);
  • 尝试存储不同的类型,会抛出panic;
import (
  "sync/atomic"
  "time"
)

func loadConfig() map[string]string {
  // 从数据库或者文件系统中读取配置信息,然后以map的形式存放在内存里
  return make(map[string]string)
}

func requests() chan int {
  // 将从外界中接收到的请求放入到channel里
  return make(chan int)
}

func main() {
  // config变量用来存放该服务的配置信息
  var config atomic.Value
  // 初始化时从别的地方加载配置文件,并存到config变量里
  config.Store(loadConfig())
  go func() {
    // 每10秒钟定时的拉取最新的配置信息,并且更新到config变量里
    for {
      time.Sleep(10 * time.Second)
      // 对应于赋值操作 config = loadConfig()
      config.Store(loadConfig())
    }
  }()
  // 创建工作线程,每个工作线程都会根据它所读取到的最新的配置信息来处理请求
  for i := 0; i < 10; i++ {
    go func() {
      for r := range requests() {
        // 对应于取值操作 c := config
        // 由于Load()返回的是一个interface{}类型,所以我们要先强制转换一下
        c := config.Load().(map[string]string)
        // 这里是根据配置信息处理请求的逻辑...
        _, _ = r, c
      }
    }()
  }
}

value读取

nil^uintptr(0)
func (v *Value) Load() (x interface{}) {
    // 将*Value指针类型转换为*ifaceWords指针类型
	vp := (*ifaceWords)(unsafe.Pointer(v))
	// 原子性的获取到v的类型typ的指针
	typ := LoadPointer(&vp.typ)
	// 如果没有写入或者正在写入,先返回,^uintptr(0)代表过渡状态
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		return nil
	}
	// 原子性的获取到v的真正的值data的指针,然后返回
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}
ifaceWords
  • typ代表原始类型
  • data代表真正的值
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}

value存储

runtime_procPinruntime_procUnpin
func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	// 将现有的值和要写入的值转换为ifaceWords类型(以便获取到它们的原始类型和真正的值)
	vp := (*ifaceWords)(unsafe.Pointer(v))
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	for {
		// 获取现有的值的type
		typ := LoadPointer(&vp.typ)
		// 如果typ为nil说明这是第一次Store
		if typ == nil {
			// 第一次,就锁定当前的processor,禁止其他goroutine抢占
			runtime_procPin()
			// 使用CAS操作,先尝试将typ设置为^uintptr(0)这个中间状态
			// 如果失败,则证明已经有别的线程抢先完成了赋值操作
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin()
				continue
			}
			// 如果设置成功,就原子性的更新对应的指针,最后解除抢占锁
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin()
			return
		}
		// typ为^uintptr(0)说明第一次写入还没有完成,继续循环等待
		if uintptr(typ) == ^uintptr(0) {
			continue
		}
		// 写入的类型和现有的类型不一致,则panic
		if typ != xp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
		// 更新data
		StorePointer(&vp.data, xp.data)
		return
	}
}