简介
map 是 golang 中经常使用到的数据结构,只是使用的话,其实背一背八股就差不多了,但是要想在使用的时候能够对整个结构有一个清晰的认识,还是得去看一下源码是怎么实现的。
这篇文章就从结构和操作出发,一步步分析 golang 中有关 map 的实现原理。
go1.19.12 linux/amd64
数据结构
本文只讲解 golang 中 map 结构的原理,有关哈希表数据结构的内容,这里不会涉及。
先来看看 map 的结构图,会更好地理解 map 的底层结构。
hmap
用 make 关键字创建一个 map 时,创建的其实是 hmap 结构
type hmap struct {
// Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
// Make sure this stays in sync with the compiler's definition.
count int // # live cells == size of map. Must be first (used by len() builtin)
flags uint8
B uint8 // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
hash0 uint32 // hash seed
buckets unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
nevacuate uintptr // progress counter for evacuation (buckets less than this have been evacuated)
extra *mapextra // optional fields
}
字段作用:
loadFactor * 2^BGC
bmap
其实就是上面说的桶,这里只有一个tophash 字段,类型是长度为 8 的数组,每个数组元素长度为 8bit。
tophash用于保存桶中八个key 对应 hash 值的高8位,在查找 key 时可以提高效率。
查找时先通过 key 的低位哈希定位到桶序号,然后依次将key 的高位哈希和 tophash[i]进行比较,如果不同则 key 一定不在该位置,如果相同才会去比较 key 是否相同。
const (
bucketCnt = 1 << bucketCntBits
)
// A bucket for a Go map.
type bmap struct {
// tophash generally contains the top byte of the hash value
// for each key in this bucket. If tophash[0] < minTopHash,
// tophash[0] is a bucket evacuation state instead.
tophash [bucketCnt]uint8
// Followed by bucketCnt keys and then bucketCnt elems.
// NOTE: packing all the keys together and then all the elems together makes the
// code a bit more complicated than alternating key/elem/key/elem/... but it allows
// us to eliminate padding which would be needed for, e.g., map[int64]int8.
// Followed by an overflow pointer.
}
这里并没有看到有关保存key、value的结构,是因为哈希表中 key 和 value 类型并不固定,所以这里很难确定要用什么类型去定义字段。
cmd/compile/internal/reflectdata.reflect.goMapBucketType
func MapBucketType(t *types.Type) *types.Type {
// 检查 t.MapType().Bucket 是否已经存在,如果存在则直接返回
if t.MapType().Bucket != nil {
return t.MapType().Bucket
}
// 获取键值对的类型
keytype := t.Key()
elemtype := t.Elem()
// 计算键值对的大小
types.CalcSize(keytype)
types.CalcSize(elemtype)
if keytype.Size() > MAXKEYSIZE {
keytype = types.NewPtr(keytype)
}
if elemtype.Size() > MAXELEMSIZE {
elemtype = types.NewPtr(elemtype)
}
// 用于存储存储桶结构的字段
field := make([]*types.Field, 0, 5)
// The first field is: uint8 topbits[BUCKETSIZE].
// topbits字段,类型为[8]uint8
arr := types.NewArray(types.Types[types.TUINT8], BUCKETSIZE)
field = append(field, makefield("topbits", arr))
// keys字段,类型为[8]keytype
arr = types.NewArray(keytype, BUCKETSIZE)
arr.SetNoalg(true)
keys := makefield("keys", arr)
field = append(field, keys)
// elems字段,类型为[8]valuetype
arr = types.NewArray(elemtype, BUCKETSIZE)
arr.SetNoalg(true)
elems := makefield("elems", arr)
field = append(field, elems)
// If keys and elems have no pointers, the map implementation
// can keep a list of overflow pointers on the side so that
// buckets can be marked as having no pointers.
// Arrange for the bucket to have no pointers by changing
// the type of the overflow field to uintptr in this case.
// See comment on hmap.overflow in runtime/map.go.
// overflow字段
otyp := types.Types[types.TUNSAFEPTR]
if !elemtype.HasPointers() && !keytype.HasPointers() {
otyp = types.Types[types.TUINTPTR]
}
overflow := makefield("overflow", otyp)
field = append(field, overflow)
// link up fields
// 计算桶的大小
bucket := types.NewStruct(types.NoPkg, field[:])
bucket.SetNoalg(true)
types.CalcSize(bucket)
// Check invariants that map code depends on.
if !types.IsComparable(t.Key()) {
base.Fatalf("unsupported map key type for %v", t)
}
if BUCKETSIZE < 8 {
base.Fatalf("bucket size too small for proper alignment")
}
if uint8(keytype.Alignment()) > BUCKETSIZE {
base.Fatalf("key align too big for %v", t)
}
if uint8(elemtype.Alignment()) > BUCKETSIZE {
base.Fatalf("elem align too big for %v", t)
}
if keytype.Size() > MAXKEYSIZE {
base.Fatalf("key size to large for %v", t)
}
if elemtype.Size() > MAXELEMSIZE {
base.Fatalf("elem size to large for %v", t)
}
if t.Key().Size() > MAXKEYSIZE && !keytype.IsPtr() {
base.Fatalf("key indirect incorrect for %v", t)
}
if t.Elem().Size() > MAXELEMSIZE && !elemtype.IsPtr() {
base.Fatalf("elem indirect incorrect for %v", t)
}
if keytype.Size()%keytype.Alignment() != 0 {
base.Fatalf("key size not a multiple of key align for %v", t)
}
if elemtype.Size()%elemtype.Alignment() != 0 {
base.Fatalf("elem size not a multiple of elem align for %v", t)
}
if uint8(bucket.Alignment())%uint8(keytype.Alignment()) != 0 {
base.Fatalf("bucket align not multiple of key align %v", t)
}
if uint8(bucket.Alignment())%uint8(elemtype.Alignment()) != 0 {
base.Fatalf("bucket align not multiple of elem align %v", t)
}
if keys.Offset%keytype.Alignment() != 0 {
base.Fatalf("bad alignment of keys in bmap for %v", t)
}
if elems.Offset%elemtype.Alignment() != 0 {
base.Fatalf("bad alignment of elems in bmap for %v", t)
}
// Double-check that overflow field is final memory in struct,
// with no padding at end.
if overflow.Offset != bucket.Size()-int64(types.PtrSize) {
base.Fatalf("bad offset of overflow in bmap for %v", t)
}
// 将 bucket 和 map 结构关联
t.MapType().Bucket = bucket
bucket.StructType().Map = t
return bucket
}
推导后的 bmap结构如下,这里其实就是一个完整的桶结构,包含了 tophash,key,value 和溢出桶指针。
type bmap struct {
topbits [8]uint8
keys [8]keytype
elems [8]valuetype
overflow uintptr
}
mapextra
上述推导后的 bmap结构中,overflow 字段并不总是 uintptr类型。在MapBucketType函数有这样一段判断:
otyp := types.Types[types.TUNSAFEPTR]
if !elemtype.HasPointers() && !keytype.HasPointers() {
otyp = types.Types[types.TUINTPTR]
}
overflow := makefield("overflow", otyp)
field = append(field, overflow)
可以看出当 key 和 value中不包含指针时,overflow是uintptr类型,而包含指针时,是 unsafe.Pointer类型。
区别在于: uintptr 类型不会被 GC 认为是引用,而 unsafe.Pointer类型会认为是引用。
这样的目的是: 如果 key 和 value 中不包含指针,那么就将桶标记为不含指针(uintptr类型),从而避免扫描整个 map(uintptr不被认为是引用,不会扫描)。
但是这样有个问题: 如果桶中有溢出桶,overflow 应该指向对应的溢出桶地址,但是由于 uintptr 类型不被认为是引用,这部分内存可能就会被 GC 回收掉。
为了避免这个问题,需要有个地方能够直接引用这些溢出桶,也就是hmap 的extra字段,该字段为*mapextra类型。
// mapextra holds fields that are not present on all maps.
type mapextra struct {
// If both key and elem do not contain pointers and are inline, then we mark bucket
// type as containing no pointers. This avoids scanning such maps.
// However, bmap.overflow is a pointer. In order to keep overflow buckets
// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
// overflow and oldoverflow are only used if key and elem do not contain pointers.
// overflow contains overflow buckets for hmap.buckets.
// oldoverflow contains overflow buckets for hmap.oldbuckets.
// The indirection allows to store a pointer to the slice in hiter.
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow holds a pointer to a free overflow bucket.
nextOverflow *bmap
}
字段含义如下:
- overflow: 指向所有的溢出桶,只用于 key 和 value 中不含指针的场景
- oldoverflow: 指向所有旧桶的溢出桶,只用于 key 和 value 中不含指针的场景
- nextOverflow: 指向所有的预分配的溢出桶,预分配的用完了值就变成nil
创建
通常我们会使用三种方式进行 map 的创建:
m := map[int]int{1:1}m := make(map[int]int)m := make(map[int]int, 3)
通过汇编代码可定位到创建 map 的几个函数。
- makemap_small
- makemap64
- makemap
makemap_small
在以下几种情况下创建 map 时,会调用此函数:
- make(map[k]v)
- make(map[k]v, hint),且 hint不超过 8 时
func makemap_small() *hmap {
h := new(hmap)
h.hash0 = fastrand()
return h
}
这里只是简单地分配了一段hmap大小的内存空间,并给随机种子赋值,并没有分配哈希桶的逻辑。
当 hint 不超过 8 时,只需一个哈希桶即可,可以在写入的时候进行bucket的创建。
makemap64
func makemap64(t *maptype, hint int64, h *hmap) *hmap {
if int64(int(hint)) != hint {
hint = 0
}
return makemap(t, int(hint), h)
}
主要调用 makemap 来完成内存的分配。
makemap
函数签名如下:
func makemap(t *maptype, hint int, h *hmap) *hmap {...}
当通过make(map[k]v, hint)创建 map 且提供的 hint 大于 8 时,会调用此函数进行实际的创建工作。
注意到这里的第二个参数名称是 hint,而不是 size 或 count,表明这里的 hint 只是个参考值,实际分配内存时使用的不一定是这个值。在后续的源码中可以看出这一点。
理想情况下,一个桶中只放一个 key 是效率最高的,因为定位到该桶后,只需要比较一次 key就能获取到对应的值。因此先假定需要创建hint个桶,但是需要判断一下这样会不会导致内存溢出,如果会导致溢出,就会把 hint 置为 0。
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}
...
}
接下来的逻辑是: 创建一个 hmap 结构并初始化随机种子,确定需要分配的桶的个数,分配桶空间等。
loadFactor = 哈希表中元素个数 / 桶的个数loadFactor = 哈希表中元素个数 / (2^B)
经过统计,最优的负载因子是 6.5,该数值下命中率和内存利用率都能达到一个不错的效果。
// Picking loadFactor: too large and we have lots of overflow
// buckets, too small and we waste a lot of space. I wrote
// a simple program to check some stats for different loads:
// (64-bit, 8 byte keys and elems)
// loadFactor %overflow bytes/entry hitprobe missprobe
// 4.00 2.13 20.77 3.00 4.00
// 4.50 4.05 17.30 3.25 4.50
// 5.00 6.85 14.77 3.50 5.00
// 5.50 10.55 12.94 3.75 5.50
// 6.00 15.27 11.67 4.00 6.00
// 6.50 20.90 10.79 4.25 6.50
// 7.00 27.14 10.15 4.50 7.00
// 7.50 34.03 9.73 4.75 7.50
// 8.00 41.10 9.40 5.00 8.00
//
// %overflow = percentage of buckets which have an overflow bucket
// bytes/entry = overhead bytes used per key/elem pair
// hitprobe = # of entries to check when looking up a present key
// missprobe = # of entries to check when looking up an absent key
回到makemap函数中,该函数先将 B 初始化为0,然后逐步自增,直到桶个数为2^B时,负载因子小于 6.5,这样就找到了合适的 B。
func makemap(t *maptype, hint int, h *hmap) *hmap {
...
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
...
}
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
const (
loadFactorNum = 13
loadFactorDen = 2
)
确定了 B 之后,就可以为 map 分配桶了。如果 B = 0,暂时不会分配 bucket,延迟到赋值时由mapassign来完成分配。
func makemap(t *maptype, hint int, h *hmap) *hmap {
...
// allocate initial hash table
// if B == 0, the buckets field is allocated lazily later (in mapassign)
// If hint is large zeroing this memory could take a while.
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
makeBucketArray定义如下,该函数接受三个参数:
- t: 待分配桶所属的 map 指针
- b: 桶数量的对数,值和B相等
- dirtyalloc: 指向一个已经分配的桶数组的指针
返回两个指针:
- buckets: 指向普通的桶数组的指针
- nextOverflow: 指向下一个可用的溢出桶数组的指针
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
// 基本桶个数,即 2^B
base := bucketShift(b)
// 所有桶的个数,包括基本桶和溢出桶
nbuckets := base
// b >= 4 时可能会需要溢出桶,因此这里需要计算溢出桶的数量,b < 4时不可能有溢出桶
if b >= 4 {
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
// dirtyalloc == nil,表示没有预先分配的内存,因此需要调用newarray()函数来创建一个新的桶数组。
// 注意到这里是分配nbuckets大小的内存,所以基本桶和溢出桶的内存空间是连续的
if dirtyalloc == nil {
buckets = newarray(t.bucket, int(nbuckets))
} else {
// dirtyalloc != nil,表示有预分配的内存,这里不需要再重新创建,只需要将该部分数据清空即可重用
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}
// 基本桶个数 != 所有桶个数,表明需要溢出桶
if base != nbuckets {
// buckets的内存空间是连续的,前base个桶是基本桶,后面的是溢出桶部分,nextOverflow就是溢出桶的开始位置
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
// 最后一个溢出桶的地址
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
2 ^ 3 * 6.5 = 52
哈希
哈希表利用哈希函数进行查找优化,能够极大地提高查找效率。
在正式地讲解查找和写入操作之前,先来看看 golang 的 map 中是怎么利用哈希的。
我们知道,哈希本质是对输入的数据进行一系列的数学运算,得到一个定长的值,由于在大多数情况,不同的输入往往会有不同的哈希值,因此如果数据不一样,通过比较哈希值就能知道,从而避免比较原本的数据(可能很长)。
golang 将key的哈希值分成低位和高位两部分,低位哈希值用于确定 key 所在的桶序号,高位哈希值用于初步确定key是否在桶中。
map 中已存在的 key 的高位哈希会被保存在bmap 的tophash中,除此以外,tophash 还可以用于保存一些标记。
const (
// Possible tophash values. We reserve a few possibilities for special marks.
// Each bucket (including its overflow buckets, if any) will have either all or none of its
// entries in the evacuated* states (except during the evacuate() method, which only happens
// during map writes and thus no one else can observe the map during that time).
emptyRest = 0 // this cell is empty, and there are no more non-empty cells at higher indexes or overflows.
emptyOne = 1 // this cell is empty
evacuatedX = 2 // key/elem is valid. Entry has been evacuated to first half of larger table.
evacuatedY = 3 // same as above, but evacuated to second half of larger table.
evacuatedEmpty = 4 // cell is empty, bucket is evacuated.
minTopHash = 5 // minimum tophash for a normal filled cell.
)
emptyOne
tophash[i] == emptyOne 表明桶的当前位置是空的,可能从来没存过数据,也可能是被 delete 了。
查找时,说明当前位置是空的,可以跳过当前位置去判断桶的 i+1位置;写入时,说明当前位置没有别的数据,是可写的。
emptyRest
tophash[i] == emptyRest 说明桶的当前位置和后续位置,包括溢出桶都是空的。
查找时,说明 key 不存在,因为在当前桶和溢出桶中都找不到;写入时,说明当前位置是可写的。
从上述定义中可知,emptyRest 值就是 0,所以刚初始化的 map 中,tophash 默认就是 emptyRest。
evacuatedX和evacuatedY
这两个状态和扩容有关系,建议结合扩容这一章一起看。
在增量扩容模式下,桶数变成原来的 2 倍,前部分称为 X 区,后部分称为 Y 区,桶中的数据搬迁时可能会搬到 X 区,也可能会搬到 Y 区。搬迁后旧桶中的 tophash[i]就被置为evacuatedX或evacuatedY。
在遍历过程中,如果由于写操作导致了扩容,就能根据这个标记知道数据是否搬迁,如果没搬迁直接从旧桶中查找,否则需要根据evacuatedX或evacuatedY 确定元素的搬迁位置,在新桶中查找。
evacuatedEmpty
和evacuatedX、evacuatedY 类似,区别在于: 如果搬迁前该位置本来就没有数据,那么搬迁后 tophash[i]就会被置为evacuatedEmpty。
minTopHash
该值规定了所有 key 高位哈希值的最小值,这样是为了避免某一个 key 的高位哈希值和上面 5 个值一样,导致无法区分是真实的 key 的高位哈希还是标记。
实际情况中,高位哈希出现任何值都是可能的,因此如果某个 key 的高位哈希小于 minTopHash,则会在此基础上加上 minTopHash作为其高位哈希,以保证不会小于minTopHash。
func tophash(hash uintptr) uint8 {
top := uint8(hash >> (goarch.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
return top
}
比如某个 key 的高位哈希算出来是 00000001,该值刚好是 emptyOne,则加上minTopHash(二进制为 00000101),将 00000110作为其高位哈希。
写入
m[k] = v
mapassignreflect.mapassign_faststrmapassign
下面分段来进行解析:
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
...
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
// 计算当前 key 的哈希值
hash := t.hasher(key, uintptr(h.hash0))
// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.
// 将flag 置为hashWriting,表明当前正在进行写操作
h.flags ^= hashWriting
...
}
我们都知道,map 在使用前,必须通过 make 关键字进行初始化,不能像切片一样,申明后直接使用。
是因为这里进行了判断,如果 hmap 为 nil,直接就 panic 了。
hashWriting
makemap_small
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
...
}
上面我们知道了 key 的哈希值,接下来需要定位这个 key 在哪个桶中,以及桶中是否已经存在了。这里还涉及扩容的操作,放在单独的章节里解释。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
again:
// 利用位运算确定桶的位置,等价于: hash % (2^B)
bucket := hash & bucketMask(h.B)
// 如果正在扩容,需要先完成扩容
if h.growing() {
growWork(t, h, bucket)
}
// 该桶的地址
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 计算高位哈希
top := tophash(hash)
// 目标 key 在桶中的索引
var inserti *uint8
// key-value 的地址
var insertk unsafe.Pointer
var elem unsafe.Pointer
...
}
现在知道了目标 key 所在的桶和高位哈希,接下来需要通过一系列的操作得到目标 k-v应该在桶中的位置。
- 遍历桶的tophash,针对每一个索引 i,将 tophash[i]与高位哈希进行比较,如果不相等,说明key 不在该位置。如果相等,说明 key 可能在该位置,需要继续去比较keys[i]是否与目标 key 相等。
- 如果key 不在当前桶,那就继续遍历当前桶的溢出桶。
- 溢出桶可能也找不到合适的位置插入当前的 k-v,说明可能没有足够的空间了,此时可能需要扩容。扩容条件是: 负载因子过高或溢出桶数量过多。
- 如果不需要扩容,那就创建新的溢出桶
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
bucketloop:
for {
// 逐个遍历桶中的每个位置(一个桶有八个位置,对应 8 个k-v)
for i := uintptr(0); i < bucketCnt; i++ {
// 位置 i 保存的高位哈希与当前 key 的高位哈希不等,说明keys[i]与 key 不同
if b.tophash[i] != top {
// 虽然哈希值不等,但是tophash[i]保存的是状态哈希且该状态表明当前位置是空的,此时可以将 k-v 插入当前位置
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 位置 i 保存的高位哈希与当前 key 的高位哈希相等,但是 key 不一定相等(可能有哈希冲突),还需要进一步比较 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 虽然高位哈希相等,但是 key 不相等,但是由于此处已经有别的 key 存在,所以不能直接插在此处
if !t.key.equal(key, k) {
continue
}
// 到这里说明找到了 key 在桶中的位置
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
// 定位到目标 value 应该存放的地址
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
goto done
}
// 遍历完都没找到合适的位置,那就继续遍历当前桶的溢出桶(通过当前桶的overflow字段)
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
// 负载因子过高或有太多的溢出桶,且当前没有进行扩容,那么就执行扩容,然后重试前面的操作
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
// 不需要扩容,创建新的溢出桶
if inserti == nil {
// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
// store new key/elem at insert position
if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.key, insertk, key)
// 存放在新的溢出桶的第一个位置
*inserti = top
// 元素个数增加
h.count++
...
}
经过这一系列步骤之后,我们拿到了待写入数据应该写入的位置(即elem),mapassign 函数返回后,由其他函数进行数据的写入。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
done:
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
// 还原标记,表示写入完成
h.flags &^= hashWriting
if t.indirectelem() {
elem = *((*unsafe.Pointer)(elem))
}
return elem
}
读取
针对 map 的读取操作,通常有以下两种形式:
- val := m[key]
- val, ok := m[key]
分别对应源码中的两个函数:
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {...}
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {...}
原理上都差不多,下面以第一个函数为例来看看map 是如何访问的。
我们知道,针对一个未通过 make 初始化或元素个数为 0的 map进行读操作,能够获取到元素类型的零值,首先就是这段逻辑:
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0])
}
...
}
然后进行并发写检测,和写操作类型,都是利用 flag 进行判断的。写操作时,会把 flag 置为 hashWriting,这时候读取可能会有问题,因此这里需要判断。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}
...
}
剩下的逻辑和写操作基本是一致的,包括: 利用哈希的低位确定 key 所在的桶、利用哈希高位逐个对比桶的tophash[i]、对 key 进行精确对比、必要时需要查找溢出桶。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
// 计算 hash 值
hash := t.hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
// 定位 key 所在的桶
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
b = oldb
}
}
// 计算 key 的高位 hash
top := tophash(hash)
bucketloop:
// 如果当前桶未找到,则去查询下一个溢出桶
for ; b != nil; b = b.overflow(t) {
// 遍历当前桶的所有位置,一共 8 个
for i := uintptr(0); i < bucketCnt; i++ {
// 高位 hash 不等,可能有两种情况:
// 1. tophash[i]为emptyRest,表明桶的当前位置及后续位置都是空的,key 必然不存在,因此需要查找下一个桶
// 2. tophash[i]不等于 key 的高位哈希,说明当前位置存的不是目标 key,需要查找下一个位置,即 i+1
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 高位哈希相等,但是 key 不一定相同,因此还需要针对 key 进行精确比较
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if t.key.equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
// 找到目标 key
return e
}
}
}
// map中不存在该 key,返回数据类型的零值
return unsafe.Pointer(&zeroVal[0])
}
删除
mapdelete
总体逻辑和读写类似,都是利用低位哈希定位到 key 所在的桶,用高位哈希定位在桶中位置,以及对 key进行精确匹配,可能还涉及溢出桶的查找。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
...
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return
}
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0))
// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write (delete).
h.flags ^= hashWriting
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break search
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.key.equal(key, k2) {
continue
}
...
}
...
}
...
}
不同之处在于,使用 delete 删除 key时,并没有真的将保存 key-value的内存清空,而是直接将 tophash[i]置为特定的状态标记(emptyOne 或 emptyRest),以表明该位置是空的。当然,如果 key 或 value 中含有指针,还需要将指针置为 nil,以保证删除的key 和 value 可以被 GC 回收掉。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
...
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
...
// 如果 key 中含有指针,将指针置空,以保证可以被 GC 清理掉
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
memclrHasPointers(k, t.key.size)
}
// 如果 value 中含有指针,将指针置空,以保证可以被 GC 清理掉
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
memclrNoHeapPointers(e, t.elem.size)
}
...
}
接下来设置标记。
首先可以肯定的是,删除当前位置的 key 后,当前位置的 tophash[i]一定可以置为 emptyOne,但这还不够。
还需要进一步判断当前位置的 tophash[i]是否可以置为emptyRest,因为查找或写入时如果知道 tophash 是 emptyRest,那么就能直接知道当前位置及之后的位置是空的,这样可以避免非必要的遍历了。
以下情况下,当前位置的 tophash 不能设置为emptyRest:
- 当前位置处于桶的最后,且当前桶存在溢出桶,且该溢出桶的 tophash[0]不是 emptyRest
- 当前桶不位于桶的最后,且桶中下一个位置的 tophash 不是 emptyRest
这两种情况下说明当前桶的后面有非空的位置,可能在同一个桶的后面,也可能在溢出桶中。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
...
b.tophash[i] = emptyOne
// If the bucket now ends in a bunch of emptyOne states,
// change those to emptyRest states.
// It would be nice to make this a separate function, but
// for loops are not currently inlineable.
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
...
notLast:
h.count--
// Reset the hash seed to make it more difficult for attackers to
// repeatedly trigger hash collisions. See issue 25237.
if h.count == 0 {
h.hash0 = fastrand()
}
break search
}
}
...
}
以上两种情况之外,就可以直接将当前位置的 tophash置为 emptyRest 。
Golang 的设计更进一步,不仅会设置当前位置,还试图修改更前面的位置的标记。
- 在同一个桶中,当前位置置为 emptyRest后,继续访问前一个位置,如果该位置标记是 emptyOne,则将该位置也置为emptyRest,直到遇到第一个非 emptyOne的位置
- 如果当前桶的第一个位置也置为 emptyRest 了,则定位到上一个桶,执行上述操作
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
...
for {
// 将当前位置的 tophash 置为 emptyRest
b.tophash[i] = emptyRest
// 当前位置位于所在桶的头部
if i == 0 {
// bOrig 指向第一个桶,即通过低位哈希获取到的位置,如果当前桶已经是第一个桶,则无需操作
if b == bOrig {
break // beginning of initial bucket, we're done.
}
// 寻找当前桶的前一个桶
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
// i = 7,相当于是把当前桶的前一个桶的 tophash[7]置为 emptyRest
i = bucketCnt - 1
} else {
// 从后往前遍历桶中的每个位置
i--
}
if b.tophash[i] != emptyOne {
break
}
}
notLast:
h.count--
// Reset the hash seed to make it more difficult for attackers to
// repeatedly trigger hash collisions. See issue 25237.
if h.count == 0 {
h.hash0 = fastrand()
}
break search
}
}
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
}
通过上述操作,在查找时如果tophash[i] == emptyRest,就能知道该位置及以后的位置都是空的,就不需要再查找后续位置和溢出桶了。
清空
golang 中的并没有提供 clear 关键字将 map 一次性清空,如果想要清空 map,有两种方式:
- 创建新的 map 并赋值给原本的 map 变量,原本的 map 会被 GC 掉
- 通过循环 + delete 删除
方式一会创建新的map,不够干净,且上一个 map 分配的空间无法重用;直观上,方式二迭代+删除会比较慢。
而实际上,编译器会对迭代 + 删除这种方式进行优化,在源码层面对应下面的函数,其主要思路就是: 将map 中的各种标记清空、将桶数组和溢出桶的内存清零并复用。
// mapclear deletes all keys from a map.
func mapclear(t *maptype, h *hmap) {
...
if h == nil || h.count == 0 {
return
}
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
h.flags ^= hashWriting
h.flags &^= sameSizeGrow
h.oldbuckets = nil
h.nevacuate = 0
h.noverflow = 0
h.count = 0
// Reset the hash seed to make it more difficult for attackers to
// repeatedly trigger hash collisions. See issue 25237.
h.hash0 = fastrand()
// Keep the mapextra allocation but clear any extra information.
if h.extra != nil {
*h.extra = mapextra{}
}
// makeBucketArray clears the memory pointed to by h.buckets
// and recovers any overflow buckets by generating them
// as if h.buckets was newly alloced.
_, nextOverflow := makeBucketArray(t, h.B, h.buckets)
if nextOverflow != nil {
// If overflow buckets are created then h.extra
// will have been allocated during initial bucket creation.
h.extra.nextOverflow = nextOverflow
}
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
}
扩容
在讲mapassign函数时说到,如果负载因子过高或者 map 中溢出桶过多,会触发扩容。
golang 将扩容分成两种:
- 等量扩容
- 增量扩容
负载因子的计算和桶的数量(即 B)有关,因此如果负载因子过大,则说明B过小,此时需要将 B 加 1,对应的桶数变成了原来的 2 倍,这就是增量扩容;
否则,可能是溢出桶太多了,实际数据并没有那么多,这样会影响查找效率。比如连续插入数据后删除,导致溢出桶很多。这种情况下只需要将松散的数据集中起来即可,桶数量保持不变,这就是等量扩容。
func hashGrow(t *maptype, h *hmap) {
// 默认进行增量扩容
bigger := uint8(1)
// 如果负载因子没有超过阈值,则使用等量扩容
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
// 扩容后原来的桶数据保存在oldbuckets中
oldbuckets := h.buckets
// 创建新桶和溢出桶。增量模式下,桶数量为原来的 2 倍;等量模式下,桶数量不变
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 更新 map 的桶数
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
// 设置新桶,此时buckets是空的,所有的数据都在oldbuckets上
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
// 原本的溢出桶数据保存到oldoverflow中
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
// 设置预分配的溢出桶地址
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
// the actual copying of the hash table data is done incrementally
// by growWork() and evacuate().
}
无论哪种模式,原本的普通桶和溢出桶数据分别保存到了oldbuckets和oldoverflow中,等待下一步的整理和搬迁。但是如果有非常多的 k-v 需要迁移,会非常影响性能。Golang 采用了渐进式迁移的方式,将数据迁移分散在写和删除操作中,即下面这段代码:
这里 growWork 会执行两次evacuate,第一次用于搬迁当前 key 所在的桶,而第二次用于更新搬迁进度,比如当前搬迁进度nevacuate为 0,那么growWork会先搬迁 key 所在的桶,然后搬迁 0 号桶。
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}
func growWork(t *maptype, h *hmap, bucket uintptr) {
// make sure we evacuate the oldbucket corresponding
// to the bucket we're about to use
// bucket&h.oldbucketmask() 用于确定当前正在操作的桶在扩容前的位置
evacuate(t, h, bucket&h.oldbucketmask())
// evacuate one more oldbucket to make progress on growing
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
结合定位桶的方式,不难得出以下结论:
- 等量扩容模式下,桶数量不变,掩码和扩容前是一致的,因此扩容前后 key 所在的桶序号不变
- 增量扩容模式下,桶数量变成原来的 2倍,掩码也比之前多了 1 位,key 所在的桶序号可能会改变。比如原本的 B 是 5,计算位置时只需取 key 的低位哈希后 5 位,而增量扩容后B是 6,计算位置时需取 key 的低位哈希后 6位。桶序号是否改变取决于 key 的低位哈希的倒数第 6 位是 0 还是 1。
假定扩容前的 B = 5,桶数量为32,扩容后 B = 6,桶数量为 64。扩容后前 32 个桶称为 X 区,后 32 个桶称为 Y 区。
对等量扩容来说,搬迁后只会到 X 区(只有 X 区),而对于增量扩容,搬迁后可能位于 X 区,也可能位于 Y 区,具体取决于低位哈希的最高位是 1 还是 0。这个概念对理解扩容很重要,下面回到源码。
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 定位到需要搬迁的旧桶
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 扩容前的桶数
newbit := h.noldbuckets()
// 当前桶的 tophash[0] 不是 evacuatedX、evacuatedY和evacuatedEmpty 中任意一个,表明当前桶未搬迁过
if !evacuated(b) {
var xy [2]evacDst
// 假定是等量扩容,扩容前后 key 在桶中的序号不变,即保存到 X 区,这里 x.b 就是 X 区桶的地址
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
// 如果是增量扩容,序号可能改变,可能保存到 X 区或 Y 区,这里 y.b 就是 Y 区桶的地址
if !h.sameSizeGrow() {
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
// 遍历旧桶该位置下所有的桶,包括普通桶和溢出桶
for ; b != nil; b = b.overflow(t) {
// key 的地址
k := add(unsafe.Pointer(b), dataOffset)
// value 的地址
e := add(k, bucketCnt*uintptr(t.keysize))
// 遍历同一个桶中的 8 个位置
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
// 当前位置的高位哈希为 emptyRest 或 emptyOne,说明当前位置是空的
// 将 tophash 设置为evacuatedEmpty,表示已搬迁,继续处理下一个位置
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
// 增量模式下
if !h.sameSizeGrow() {
// 计算哈希
hash := t.hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
// If key != key (NaNs), then the hash could be (and probably
// will be) entirely different from the old hash. Moreover,
// it isn't reproducible. Reproducibility is required in the
// presence of iterators, as our evacuation decision must
// match whatever decision the iterator made.
// Fortunately, we have the freedom to send these keys either
// way. Also, tophash is meaningless for these kinds of keys.
// We let the low bit of tophash drive the evacuation decision.
// We recompute a new random tophash for the next level so
// these keys will get evenly distributed across all buckets
// after multiple grows.
useY = top & 1
top = tophash(hash)
} else {
// 判断在新的容量下,低位哈希的最高是否是 1,如果是 1,则 useY = 1,表示数据会被搬迁到 Y 区
if hash&newbit != 0 {
useY = 1
}
}
}
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
// 如果useY=1,则evacuatedX + useY其实就是evacuatedY。这里tophash[i] = evacuatedX或evacuatedY 表示数据被搬迁到 X 区或 Y 区
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // evacuation destination
// i记录当前正在搬迁的数据在桶中的位置,如果是 8,表明这个桶中所有元素都已经搬迁了,接下来搬迁溢出桶
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
// 设置tophash[i]
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
// 将旧桶的k-v复制到dst
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.key, dst.k, k) // copy elem
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
// 标记下一个数据的位置
dst.i++
// These updates might push these pointers past the end of the
// key or elem arrays. That's ok, as we have the overflow pointer
// at the end of the bucket to protect against pointing past the
// end of the bucket.
// dst指向下一个位置
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// Unlink the overflow buckets & clear key/elem to help GC.
// 如果没有协程使用旧桶的数据且存在溢出桶
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// Preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
// 搬迁状态,nevacuate左边的桶都已经搬迁完了
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
// 搬迁进度加 1,表示当前位置的桶已经搬迁了
h.nevacuate++
// Experiments suggest that 1024 is overkill by at least an order of magnitude.
// Put it in there as a safeguard anyway, to ensure O(1) behavior.
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
// 寻找一个未搬迁的位置
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 所有的桶都搬迁完了,做一些清理操作
if h.nevacuate == newbit { // newbit == # of oldbuckets
// Growing is all done. Free old main bucket array.
// 置空旧桶指针,旧桶所有数据已经搬迁到新桶,因此可以放心清理
h.oldbuckets = nil
// Can discard old overflow buckets as well.
// If they are still referenced by an iterator,
// then the iterator holds a pointers to the slice.
// 置空旧的溢出桶指针
if h.extra != nil {
h.extra.oldoverflow = nil
}
// 清除sameSizeGrow标记
h.flags &^= sameSizeGrow
}
}
遍历
在应用层面只需要一个简单的 for-range即可实现 map 的遍历,但是在源码层面,实现起来并不那么简单。
上面说到,map 的搬迁操作是分散在各个写操作和删除操作中的,所以遍历过程中,数据会分散在新桶和旧桶中。golang 采用迭代器模式实现数据的访问,依赖于下面的结构:
type hiter struct {
// 指向遍历的 key 的指针
key unsafe.Pointer // Must be in first position. Write nil to indicate iteration end (see cmd/compile/internal/walk/range.go).
// 指向遍历的 value 的指针
elem unsafe.Pointer // Must be in second position (see cmd/compile/internal/walk/range.go).
t *maptype
h *hmap
// map 的桶数组
buckets unsafe.Pointer // bucket ptr at hash_iter initialization time
// 当前遍历到的桶
bptr *bmap // current bucket
overflow *[]*bmap // keeps overflow buckets of hmap.buckets alive
oldoverflow *[]*bmap // keeps overflow buckets of hmap.oldbuckets alive
// 开始遍历的桶索引
startBucket uintptr // bucket iteration started at
offset uint8 // intra-bucket offset to start from during iteration (should be big enough to hold bucketCnt-1)
// 是否已经遍历到桶数组尾部
wrapped bool // already wrapped around from end of bucket array to beginning
B uint8
// 当前遍历的 k-v 在桶中的位置
i uint8
bucket uintptr
checkBucket uintptr
}
遍历流程开始时,会执行 mapinit 函数,创建并初始化hiter对象。为了防止开发者过于依赖 map 遍历的顺序,golang 采用随机的方式确定开始遍历的桶序号和数据的起始索引号(同一个桶内)。
func mapiterinit(t *maptype, h *hmap, it *hiter) {
...
// decide where to start
var r uintptr
// 生成随机数
if h.B > 31-bucketCntBits {
r = uintptr(fastrand64())
} else {
r = uintptr(fastrand())
}
// 随机确定开始遍历的桶序号
it.startBucket = r & bucketMask(h.B)
// 随机确定在桶中开始遍历的索引号
it.offset = uint8(r >> h.B & (bucketCnt - 1))
// iterator state
it.bucket = it.startBucket
// Remember we have an iterator.
// Can run concurrently with another mapiterinit().
// 设置标记
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
mapiternext(it)
}
接下来进入mapiternext进行遍历。
func mapiternext(it *hiter)
map 不是并发安全的结构,如果在遍历过程中发现有其他的协程在进行写操作,会抛出 fatal error
if h.flags&hashWriting != 0 {
fatal("concurrent map iteration and map write")
}
接下来开始遍历,会依次遍历桶数组中的每个桶及其后面的溢出桶。
func mapiternext(it *hiter) {
...
next:
if b == nil {
// 当前又回到了起始位置且已经遍历过桶数组的末尾,说明遍历完了
if bucket == it.startBucket && it.wrapped {
// end of iteration
it.key = nil
it.elem = nil
return
}
// 如果处于扩容阶段
if h.growing() && it.B == h.B {
// Iterator was started in the middle of a grow, and the grow isn't done yet.
// If the bucket we're looking at hasn't been filled in yet (i.e. the old
// bucket hasn't been evacuated) then we need to iterate through the old
// bucket and only return the ones that will be migrated to this bucket.
oldbucket := bucket & it.h.oldbucketmask()
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 当前桶的数据还未搬迁,将checkBucket置为当前桶
if !evacuated(b) {
checkBucket = bucket
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
// 遍历的桶号加 1,如果到了桶数组末尾,将wrapped置为 true,桶号置0,因为可能还需要遍历前面的桶
bucket++
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0
}
// 遍历桶中的每个位置
for ; i < bucketCnt; i++ {
offi := (i + it.offset) & (bucketCnt - 1)
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
// TODO: emptyRest is hard to use here, as we start iterating
// in the middle of a bucket. It's feasible, just tricky.
continue
}
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))
// 如果桶是旧桶数组中未迁移完成的桶,需要按照其在新桶的顺序进行遍历
if checkBucket != noCheck && !h.sameSizeGrow() {
// Special case: iterator was started during a grow to a larger size
// and the grow is not done yet. We're working on a bucket whose
// oldbucket has not been evacuated yet. Or at least, it wasn't
// evacuated when we started the bucket. So we're iterating
// through the oldbucket, skipping any keys that will go
// to the other new bucket (each oldbucket expands to two
// buckets during a grow).
if t.reflexivekey() || t.key.equal(k, k) {
// If the item in the oldbucket is not destined for
// the current new bucket in the iteration, skip it.
hash := t.hasher(k, uintptr(h.hash0))
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {
// Hash isn't repeatable if k != k (NaNs). We need a
// repeatable and randomish choice of which direction
// to send NaNs during evacuation. We'll use the low
// bit of tophash to decide which way NaNs go.
// NOTE: this case is why we need two evacuate tophash
// values, evacuatedX and evacuatedY, that differ in
// their low bit.
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}
}
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey() || t.key.equal(k, k)) {
// This is the golden data, we can return it.
// OR
// key!=key, so the entry can't be deleted or updated, so we can just return it.
// That's lucky for us because when key!=key we can't look it up successfully.
it.key = k
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
// 执行 mapaccessK 函数进行查找,并将 k-v 保存在 iter 对象中
// The hash table has grown since the iterator was started.
// The golden data for this key is now somewhere else.
// Check the current hash table for the data.
// This code handles the case where the key
// has been deleted, updated, or deleted and reinserted.
// NOTE: we need to regrab the key as it has potentially been
// updated to an equal() but not identical key (e.g. +0.0 vs -0.0).
rk, re := mapaccessK(t, h, k)
if rk == nil {
continue // key has been deleted
}
it.key = rk
it.elem = re
}
it.bucket = bucket
if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
it.bptr = b
}
it.i = i + 1
it.checkBucket = checkBucket
return
}
b = b.overflow(t)
i = 0
goto next
}
结论
通过上述分析,不难得出以下结论:
- 创建 map 时尽量指定 hint 的大小,避免后续带来的扩容问题
- 如果 k-v 能用不含指针的类型,就尽量不用指针,可以减轻 GC 压力
- 使用delete 删除map中的 key 时不会真的释放内存,只是做个标记
- map 不是并发安全的
- map 本质上是使用了拉链法去解决哈希冲突问题,区别在于链上的每个节点可以保存 8 个 k-v