一、原生map
原生map在并发读写的时候,容易造成panic,这是因为原生map并不是线程安全的,对它进行并发读写操作的时候,会导致map里的数据发生错乱,因而导致panic。如下面例子
package main
import "time"
func main() {
m := make(map[int]int)
go func() {
for {
m[1] = 1
}
}()
go func() {
for {
_, _ = m[1]
}
}()
time.Sleep(2 * time.Second)
}
// 返回结果:fatal error: concurrent map read and map write
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := make(map[int]int)
s := &sync.RWMutex{}
go func() {
for {
s.Lock()
m[1] = 1
s.Unlock()
}
}()
go func() {
for {
s.RLock()
_, _ = m[1]
s.RUnlock()
}
}()
time.Sleep(2 * time.Second)
fmt.Println(m)
}
二、什么是sync.Map
sync.Map就像Go map[interface{}]interface{},但是它可以线程安全的对其进行读写操作,而无需其他锁定或者协调。
sync.Map结构体说明
字段 | 类型 | 说明 |
---|
mu | Mutex | 互斥锁,保护dirty字段 |
read | atomic.Value | 存读的数据。因为是atomic.Value类型,只读,所以并发是安全的。实际存的是readOnly的数据结构。 |
misses | int | 计数作用。每次从read中读失败,则计数+1。 |
dirty | map[interface{}]*entry | 包含最新写入的数据。当misses计数达到一定值,将其赋值给read。 |
m | map[interface{}]*entry | 单纯的map结构 |
amended | bool | Map.dirty的数据和这里的 m 中的数据不一样的时候,为true |
p | unsafe.Pointer | 指针类型 |
三、源码实现
Load
// 返回Map中对应的key的值,如果不存在则返回nil
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly) // 因read只读,线程安全,优先读取
e, ok := read.m[key] // 读取key对应的值
// 如果read没有,并且dirty有新数据,那么去dirty中查找
if !ok && read.amended {
m.mu.Lock() //加互斥锁
// 再次读取数据,避免加锁过程中m.dirty提升为m.read,这个时候m.read被替换了
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 如果read中还是不存在,并且dirty中有新数据
if !ok && read.amended {
e, ok = m.dirty[key] //读取数据
// 不管m.dirty中是否有数据,都将misses计数+1
// 然后在 m.misses >= len(m.dirty)下将dirty赋值给read
m.missLocked()
}
m.mu.Unlock() //解锁
}
// read中没有,dirty中也没有 返回nil
if !ok {
return nil, false
}
// 中read或者dirty中加载数据
return e.load()
}
// 返回key对应的值
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
func (m *Map) missLocked() {
m.misses++ //将misses计数+1
if m.misses < len(m.dirty) { // ?
return
}
// 将dirty置给read。
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
Store
// 设置或者更新数据
func (m *Map) Store(key, value interface{}) {
// 往只读的entry获得数据
read, _ := m.read.Load().(readOnly)
// 如果m.read存在这个键,并且没有被标记删除,则尝试更新
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// 如果read不存在或者已经被标记删除
m.mu.Lock() //加锁
read, _ = m.read.Load().(readOnly) //再次读取read中的数据
if e, ok := read.m[key]; ok { // read中存在key对应的数据
// 确保key值已经删除
if e.unexpungeLocked() {
m.dirty[key] = e //加入dirty中,这儿是指针
}
// 更新value值
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok { // dirty中存在key
// 更新value值
e.storeLocked(&value)
} else { // read中没有,dirty中也没有
// 如果read与dirty相同,则触发一次dirty刷新
// 因为当read重置的时候,dirty已置为nil了
if !read.amended { // amended=true时不一致,fasle则是一致
// 将read中未删除的数据加入到dirty中
m.dirtyLocked()
// amended标记为read与dirty不相同,因为后面即将加入新数据。
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value) //将这个entry加入到m.dirty中
}
m.mu.Unlock() //解锁
}
// 初始化entry
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
// 对entry尝试更新 (原子cas操作)
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
// unexpungeLocked确保该条目未标记为已删除
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
// 原子更新数据
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// 将read中未删除的数据加入到dirty中
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
// 遍历read
for k, e := range read.m {
// 通过此次操作,dirty中的元素都是未被删除的,
// 可见标记为expunged的元素不在dirty中!!!
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
// 判断entry是否被标记删除,并且将标记为nil的entry更新标记为expunge
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
// 将已经删除标记为nil的数据标记为expunged
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
// 重新设置p的值,然后进入for循环
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
LoadOrStore
参考Load和Store函数
LoadAndDelete、Delete
// 删除数据
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly) //往只读read总获得数据
e, ok := read.m[key]
// 如果read没有,并且dirty有新数据,那么去dirty中查找
// read.amended=true说明dirty和read中的数据不一致,有新数据
if !ok && read.amended {
m.mu.Lock() //加锁
//二次读取,以防read在加锁过程中发生变化
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 如果read没有,并且dirty有新数据,那么去dirty中查找
if !ok && read.amended {
// 直接删除数据
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked() // 不管m.dirty中是否有数据,都将misses计数+1
}
m.mu.Unlock() //解锁
}
// 如果read中有数据
if ok {
return e.delete() //尝试删除数据
}
return nil, false
}
// 尝试删除数据
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
// 调用了LoadAndDelete来删除
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
Range
// 遍历调用时刻 map 中的所有 k-v 对,将它们传给 f 函数,如果 f 返回 false,将停止遍历
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
// read和dirty数据不一致时为true
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
// 将dirty赋值给read
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
// 遍历, for range是安全的
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) { //按照传入的func退出循环
break
}
}
}
五、推荐阅读