简介

map 是 golang 中经常使用到的数据结构,只是使用的话,其实背一背八股就差不多了,但是要想在使用的时候能够对整个结构有一个清晰的认识,还是得去看一下源码是怎么实现的。

这篇文章就从结构和操作出发,一步步分析 golang 中有关 map 的实现原理。

go1.19.12 linux/amd64

数据结构

本文只讲解 golang 中 map 结构的原理,有关哈希表数据结构的内容,这里不会涉及。

先来看看 map 的结构图,会更好地理解 map 的底层结构。

hmap

用 make 关键字创建一个 map 时,创建的其实是 hmap 结构

type hmap struct {
    // Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
    // Make sure this stays in sync with the compiler's definition.
    count     int // # live cells == size of map.  Must be first (used by len() builtin)
    flags     uint8
    B         uint8  // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
    noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
    hash0     uint32 // hash seed

    buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
    oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
    nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)

    extra *mapextra // optional fields
}

字段作用:

loadFactor * 2^BGC

bmap

其实就是上面说的桶,这里只有一个tophash 字段,类型是长度为 8 的数组,每个数组元素长度为 8bit。

tophash用于保存桶中八个key 对应 hash 值的高8位,在查找 key 时可以提高效率。

查找时先通过 key 的低位哈希定位到桶序号,然后依次将key 的高位哈希和 tophash[i]进行比较,如果不同则 key 一定不在该位置,如果相同才会去比较 key 是否相同。

const (
    bucketCnt     = 1 << bucketCntBits
)
// A bucket for a Go map.
type bmap struct {
    // tophash generally contains the top byte of the hash value
    // for each key in this bucket. If tophash[0] < minTopHash,
    // tophash[0] is a bucket evacuation state instead.
    tophash [bucketCnt]uint8
    // Followed by bucketCnt keys and then bucketCnt elems.
    // NOTE: packing all the keys together and then all the elems together makes the
    // code a bit more complicated than alternating key/elem/key/elem/... but it allows
    // us to eliminate padding which would be needed for, e.g., map[int64]int8.
    // Followed by an overflow pointer.
}

这里并没有看到有关保存key、value的结构,是因为哈希表中 key 和 value 类型并不固定,所以这里很难确定要用什么类型去定义字段。

cmd/compile/internal/reflectdata.reflect.goMapBucketType
func MapBucketType(t *types.Type) *types.Type {
  // 检查 t.MapType().Bucket 是否已经存在,如果存在则直接返回
    if t.MapType().Bucket != nil {
        return t.MapType().Bucket
    }

  // 获取键值对的类型
    keytype := t.Key()
    elemtype := t.Elem()

  // 计算键值对的大小
    types.CalcSize(keytype)
    types.CalcSize(elemtype)
    if keytype.Size() > MAXKEYSIZE {
        keytype = types.NewPtr(keytype)
    }
    if elemtype.Size() > MAXELEMSIZE {
        elemtype = types.NewPtr(elemtype)
    }

  // 用于存储存储桶结构的字段
    field := make([]*types.Field, 0, 5)

    // The first field is: uint8 topbits[BUCKETSIZE].
  // topbits字段,类型为[8]uint8
    arr := types.NewArray(types.Types[types.TUINT8], BUCKETSIZE)
    field = append(field, makefield("topbits", arr))

  // keys字段,类型为[8]keytype
    arr = types.NewArray(keytype, BUCKETSIZE)
    arr.SetNoalg(true)
    keys := makefield("keys", arr)
    field = append(field, keys)

  // elems字段,类型为[8]valuetype
    arr = types.NewArray(elemtype, BUCKETSIZE)
    arr.SetNoalg(true)
    elems := makefield("elems", arr)
    field = append(field, elems)

    // If keys and elems have no pointers, the map implementation
    // can keep a list of overflow pointers on the side so that
    // buckets can be marked as having no pointers.
    // Arrange for the bucket to have no pointers by changing
    // the type of the overflow field to uintptr in this case.
    // See comment on hmap.overflow in runtime/map.go.
  // overflow字段
    otyp := types.Types[types.TUNSAFEPTR]
    if !elemtype.HasPointers() && !keytype.HasPointers() {
        otyp = types.Types[types.TUINTPTR]
    }
    overflow := makefield("overflow", otyp)
    field = append(field, overflow)

    // link up fields
  // 计算桶的大小
    bucket := types.NewStruct(types.NoPkg, field[:])
    bucket.SetNoalg(true)
    types.CalcSize(bucket)

    // Check invariants that map code depends on.
    if !types.IsComparable(t.Key()) {
        base.Fatalf("unsupported map key type for %v", t)
    }
    if BUCKETSIZE < 8 {
        base.Fatalf("bucket size too small for proper alignment")
    }
    if uint8(keytype.Alignment()) > BUCKETSIZE {
        base.Fatalf("key align too big for %v", t)
    }
    if uint8(elemtype.Alignment()) > BUCKETSIZE {
        base.Fatalf("elem align too big for %v", t)
    }
    if keytype.Size() > MAXKEYSIZE {
        base.Fatalf("key size to large for %v", t)
    }
    if elemtype.Size() > MAXELEMSIZE {
        base.Fatalf("elem size to large for %v", t)
    }
    if t.Key().Size() > MAXKEYSIZE && !keytype.IsPtr() {
        base.Fatalf("key indirect incorrect for %v", t)
    }
    if t.Elem().Size() > MAXELEMSIZE && !elemtype.IsPtr() {
        base.Fatalf("elem indirect incorrect for %v", t)
    }
    if keytype.Size()%keytype.Alignment() != 0 {
        base.Fatalf("key size not a multiple of key align for %v", t)
    }
    if elemtype.Size()%elemtype.Alignment() != 0 {
        base.Fatalf("elem size not a multiple of elem align for %v", t)
    }
    if uint8(bucket.Alignment())%uint8(keytype.Alignment()) != 0 {
        base.Fatalf("bucket align not multiple of key align %v", t)
    }
    if uint8(bucket.Alignment())%uint8(elemtype.Alignment()) != 0 {
        base.Fatalf("bucket align not multiple of elem align %v", t)
    }
    if keys.Offset%keytype.Alignment() != 0 {
        base.Fatalf("bad alignment of keys in bmap for %v", t)
    }
    if elems.Offset%elemtype.Alignment() != 0 {
        base.Fatalf("bad alignment of elems in bmap for %v", t)
    }

    // Double-check that overflow field is final memory in struct,
    // with no padding at end.
    if overflow.Offset != bucket.Size()-int64(types.PtrSize) {
        base.Fatalf("bad offset of overflow in bmap for %v", t)
    }
  // 将 bucket 和 map 结构关联
    t.MapType().Bucket = bucket
    bucket.StructType().Map = t
    return bucket
}

推导后的 bmap结构如下,这里其实就是一个完整的桶结构,包含了 tophash,key,value 和溢出桶指针。

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    elems    [8]valuetype
    overflow uintptr
}

mapextra

上述推导后的 bmap结构中,overflow 字段并不总是 uintptr类型。在MapBucketType函数有这样一段判断:

otyp := types.Types[types.TUNSAFEPTR]
if !elemtype.HasPointers() && !keytype.HasPointers() {
    otyp = types.Types[types.TUINTPTR]
}
overflow := makefield("overflow", otyp)
field = append(field, overflow)

可以看出当 key 和 value中不包含指针时,overflow是uintptr类型,而包含指针时,是 unsafe.Pointer类型。

区别在于: uintptr 类型不会被 GC 认为是引用,而 unsafe.Pointer类型会认为是引用。

这样的目的是: 如果 key 和 value 中不包含指针,那么就将桶标记为不含指针(uintptr类型),从而避免扫描整个 map(uintptr不被认为是引用,不会扫描)。

但是这样有个问题: 如果桶中有溢出桶,overflow 应该指向对应的溢出桶地址,但是由于 uintptr 类型不被认为是引用,这部分内存可能就会被 GC 回收掉。

为了避免这个问题,需要有个地方能够直接引用这些溢出桶,也就是hmap 的extra字段,该字段为*mapextra类型。

// mapextra holds fields that are not present on all maps.
type mapextra struct {
    // If both key and elem do not contain pointers and are inline, then we mark bucket
    // type as containing no pointers. This avoids scanning such maps.
    // However, bmap.overflow is a pointer. In order to keep overflow buckets
    // alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
    // overflow and oldoverflow are only used if key and elem do not contain pointers.
    // overflow contains overflow buckets for hmap.buckets.
    // oldoverflow contains overflow buckets for hmap.oldbuckets.
    // The indirection allows to store a pointer to the slice in hiter.
    overflow    *[]*bmap
    oldoverflow *[]*bmap

    // nextOverflow holds a pointer to a free overflow bucket.
    nextOverflow *bmap
}

字段含义如下:

  • overflow: 指向所有的溢出桶,只用于 key 和 value 中不含指针的场景
  • oldoverflow: 指向所有旧桶的溢出桶,只用于 key 和 value 中不含指针的场景
  • nextOverflow: 指向所有的预分配的溢出桶,预分配的用完了值就变成nil

创建

通常我们会使用三种方式进行 map 的创建:

m := map[int]int{1:1}m := make(map[int]int)m := make(map[int]int, 3)

通过汇编代码可定位到创建 map 的几个函数。

  • makemap_small
  • makemap64
  • makemap

makemap_small

在以下几种情况下创建 map 时,会调用此函数:

  • make(map[k]v)
  • make(map[k]v, hint),且 hint不超过 8 时
func makemap_small() *hmap {
    h := new(hmap)
    h.hash0 = fastrand()
    return h
}

这里只是简单地分配了一段hmap大小的内存空间,并给随机种子赋值,并没有分配哈希桶的逻辑。

当 hint 不超过 8 时,只需一个哈希桶即可,可以在写入的时候进行bucket的创建。

makemap64

func makemap64(t *maptype, hint int64, h *hmap) *hmap {
    if int64(int(hint)) != hint {
        hint = 0
    }
    return makemap(t, int(hint), h)
}

主要调用 makemap 来完成内存的分配。

makemap

函数签名如下:

func makemap(t *maptype, hint int, h *hmap) *hmap {...}

当通过make(map[k]v, hint)创建 map 且提供的 hint 大于 8 时,会调用此函数进行实际的创建工作。

注意到这里的第二个参数名称是 hint,而不是 size 或 count,表明这里的 hint 只是个参考值,实际分配内存时使用的不一定是这个值。在后续的源码中可以看出这一点。

理想情况下,一个桶中只放一个 key 是效率最高的,因为定位到该桶后,只需要比较一次 key就能获取到对应的值。因此先假定需要创建hint个桶,但是需要判断一下这样会不会导致内存溢出,如果会导致溢出,就会把 hint 置为 0。

func makemap(t *maptype, hint int, h *hmap) *hmap {
    mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
    if overflow || mem > maxAlloc {
        hint = 0
    }
  ...
}

接下来的逻辑是: 创建一个 hmap 结构并初始化随机种子,确定需要分配的桶的个数,分配桶空间等。

loadFactor = 哈希表中元素个数 / 桶的个数loadFactor = 哈希表中元素个数 / (2^B)

经过统计,最优的负载因子是 6.5,该数值下命中率和内存利用率都能达到一个不错的效果。

// Picking loadFactor: too large and we have lots of overflow
// buckets, too small and we waste a lot of space. I wrote
// a simple program to check some stats for different loads:
// (64-bit, 8 byte keys and elems)
//  loadFactor    %overflow  bytes/entry     hitprobe    missprobe
//        4.00         2.13        20.77         3.00         4.00
//        4.50         4.05        17.30         3.25         4.50
//        5.00         6.85        14.77         3.50         5.00
//        5.50        10.55        12.94         3.75         5.50
//        6.00        15.27        11.67         4.00         6.00
//        6.50        20.90        10.79         4.25         6.50
//        7.00        27.14        10.15         4.50         7.00
//        7.50        34.03         9.73         4.75         7.50
//        8.00        41.10         9.40         5.00         8.00
//
// %overflow   = percentage of buckets which have an overflow bucket
// bytes/entry = overhead bytes used per key/elem pair
// hitprobe    = # of entries to check when looking up a present key
// missprobe   = # of entries to check when looking up an absent key

回到makemap函数中,该函数先将 B 初始化为0,然后逐步自增,直到桶个数为2^B时,负载因子小于 6.5,这样就找到了合适的 B。

func makemap(t *maptype, hint int, h *hmap) *hmap {
  ...
    B := uint8(0)
    for overLoadFactor(hint, B) {
        B++
    }
    h.B = B
  ...
}

func overLoadFactor(count int, B uint8) bool {
    return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

const (
    loadFactorNum = 13
    loadFactorDen = 2
)

确定了 B 之后,就可以为 map 分配桶了。如果 B = 0,暂时不会分配 bucket,延迟到赋值时由mapassign来完成分配。

func makemap(t *maptype, hint int, h *hmap) *hmap {
  ...
    // allocate initial hash table
    // if B == 0, the buckets field is allocated lazily later (in mapassign)
    // If hint is large zeroing this memory could take a while.
    if h.B != 0 {
        var nextOverflow *bmap
        h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
        if nextOverflow != nil {
            h.extra = new(mapextra)
            h.extra.nextOverflow = nextOverflow
        }
    }
    return h
}

makeBucketArray定义如下,该函数接受三个参数:

  • t: 待分配桶所属的 map 指针
  • b: 桶数量的对数,值和B相等
  • dirtyalloc: 指向一个已经分配的桶数组的指针

返回两个指针:

  • buckets: 指向普通的桶数组的指针
  • nextOverflow: 指向下一个可用的溢出桶数组的指针
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
  // 基本桶个数,即 2^B
    base := bucketShift(b)
  // 所有桶的个数,包括基本桶和溢出桶
    nbuckets := base
  // b >= 4 时可能会需要溢出桶,因此这里需要计算溢出桶的数量,b < 4时不可能有溢出桶
    if b >= 4 {
        nbuckets += bucketShift(b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }

  // dirtyalloc == nil,表示没有预先分配的内存,因此需要调用newarray()函数来创建一个新的桶数组。
  // 注意到这里是分配nbuckets大小的内存,所以基本桶和溢出桶的内存空间是连续的
    if dirtyalloc == nil {
        buckets = newarray(t.bucket, int(nbuckets))
    } else {
    // dirtyalloc != nil,表示有预分配的内存,这里不需要再重新创建,只需要将该部分数据清空即可重用
        buckets = dirtyalloc
        size := t.bucket.size * nbuckets
        if t.bucket.ptrdata != 0 {
            memclrHasPointers(buckets, size)
        } else {
            memclrNoHeapPointers(buckets, size)
        }
    }

  // 基本桶个数 != 所有桶个数,表明需要溢出桶
    if base != nbuckets {
    // buckets的内存空间是连续的,前base个桶是基本桶,后面的是溢出桶部分,nextOverflow就是溢出桶的开始位置
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
    // 最后一个溢出桶的地址
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}
2 ^ 3 * 6.5 = 52

哈希

哈希表利用哈希函数进行查找优化,能够极大地提高查找效率。

在正式地讲解查找和写入操作之前,先来看看 golang 的 map 中是怎么利用哈希的。

我们知道,哈希本质是对输入的数据进行一系列的数学运算,得到一个定长的值,由于在大多数情况,不同的输入往往会有不同的哈希值,因此如果数据不一样,通过比较哈希值就能知道,从而避免比较原本的数据(可能很长)。

golang 将key的哈希值分成低位和高位两部分,低位哈希值用于确定 key 所在的桶序号,高位哈希值用于初步确定key是否在桶中。

map 中已存在的 key 的高位哈希会被保存在bmap 的tophash中,除此以外,tophash 还可以用于保存一些标记。

const (
    // Possible tophash values. We reserve a few possibilities for special marks.
    // Each bucket (including its overflow buckets, if any) will have either all or none of its
    // entries in the evacuated* states (except during the evacuate() method, which only happens
    // during map writes and thus no one else can observe the map during that time).
    emptyRest      = 0 // this cell is empty, and there are no more non-empty cells at higher indexes or overflows.
    emptyOne       = 1 // this cell is empty
    evacuatedX     = 2 // key/elem is valid.  Entry has been evacuated to first half of larger table.
    evacuatedY     = 3 // same as above, but evacuated to second half of larger table.
    evacuatedEmpty = 4 // cell is empty, bucket is evacuated.
    minTopHash     = 5 // minimum tophash for a normal filled cell.
)

emptyOne

tophash[i] == emptyOne 表明桶的当前位置是空的,可能从来没存过数据,也可能是被 delete 了。

查找时,说明当前位置是空的,可以跳过当前位置去判断桶的 i+1位置;写入时,说明当前位置没有别的数据,是可写的。

emptyRest

tophash[i] == emptyRest 说明桶的当前位置和后续位置,包括溢出桶都是空的。

查找时,说明 key 不存在,因为在当前桶和溢出桶中都找不到;写入时,说明当前位置是可写的。

从上述定义中可知,emptyRest 值就是 0,所以刚初始化的 map 中,tophash 默认就是 emptyRest。

evacuatedX和evacuatedY

这两个状态和扩容有关系,建议结合扩容这一章一起看。

在增量扩容模式下,桶数变成原来的 2 倍,前部分称为 X 区,后部分称为 Y 区,桶中的数据搬迁时可能会搬到 X 区,也可能会搬到 Y 区。搬迁后旧桶中的 tophash[i]就被置为evacuatedX或evacuatedY。

在遍历过程中,如果由于写操作导致了扩容,就能根据这个标记知道数据是否搬迁,如果没搬迁直接从旧桶中查找,否则需要根据evacuatedX或evacuatedY 确定元素的搬迁位置,在新桶中查找。

evacuatedEmpty

和evacuatedX、evacuatedY 类似,区别在于: 如果搬迁前该位置本来就没有数据,那么搬迁后 tophash[i]就会被置为evacuatedEmpty。

minTopHash

该值规定了所有 key 高位哈希值的最小值,这样是为了避免某一个 key 的高位哈希值和上面 5 个值一样,导致无法区分是真实的 key 的高位哈希还是标记。

实际情况中,高位哈希出现任何值都是可能的,因此如果某个 key 的高位哈希小于 minTopHash,则会在此基础上加上 minTopHash作为其高位哈希,以保证不会小于minTopHash。

func tophash(hash uintptr) uint8 {
    top := uint8(hash >> (goarch.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }
    return top
}

比如某个 key 的高位哈希算出来是 00000001,该值刚好是 emptyOne,则加上minTopHash(二进制为 00000101),将 00000110作为其高位哈希。

写入

m[k] = v
mapassignreflect.mapassign_faststrmapassign

下面分段来进行解析:

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if h == nil {
        panic(plainError("assignment to entry in nil map"))
    }
  ...
    if h.flags&hashWriting != 0 {
        fatal("concurrent map writes")
    }
  // 计算当前 key 的哈希值
    hash := t.hasher(key, uintptr(h.hash0))

    // Set hashWriting after calling t.hasher, since t.hasher may panic,
    // in which case we have not actually done a write.
  // 将flag 置为hashWriting,表明当前正在进行写操作
    h.flags ^= hashWriting
  ...
}

我们都知道,map 在使用前,必须通过 make 关键字进行初始化,不能像切片一样,申明后直接使用。

是因为这里进行了判断,如果 hmap 为 nil,直接就 panic 了。

hashWriting
makemap_small
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  ...
    if h.buckets == nil {
        h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
    }
  ...
}

上面我们知道了 key 的哈希值,接下来需要定位这个 key 在哪个桶中,以及桶中是否已经存在了。这里还涉及扩容的操作,放在单独的章节里解释。

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  ...
again:
  // 利用位运算确定桶的位置,等价于: hash % (2^B)
    bucket := hash & bucketMask(h.B)
  // 如果正在扩容,需要先完成扩容
    if h.growing() {
        growWork(t, h, bucket)
    }
  // 该桶的地址
    b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
  // 计算高位哈希
    top := tophash(hash)

  // 目标 key 在桶中的索引
    var inserti *uint8
  // key-value 的地址
    var insertk unsafe.Pointer
    var elem unsafe.Pointer
  ...
}

现在知道了目标 key 所在的桶和高位哈希,接下来需要通过一系列的操作得到目标 k-v应该在桶中的位置。

  • 遍历桶的tophash,针对每一个索引 i,将 tophash[i]与高位哈希进行比较,如果不相等,说明key 不在该位置。如果相等,说明 key 可能在该位置,需要继续去比较keys[i]是否与目标 key 相等。
  • 如果key 不在当前桶,那就继续遍历当前桶的溢出桶。
  • 溢出桶可能也找不到合适的位置插入当前的 k-v,说明可能没有足够的空间了,此时可能需要扩容。扩容条件是: 负载因子过高或溢出桶数量过多。
  • 如果不需要扩容,那就创建新的溢出桶
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  ...
bucketloop:
    for {
    // 逐个遍历桶中的每个位置(一个桶有八个位置,对应 8 个k-v)
        for i := uintptr(0); i < bucketCnt; i++ {
      // 位置 i 保存的高位哈希与当前 key 的高位哈希不等,说明keys[i]与 key 不同
            if b.tophash[i] != top {
        // 虽然哈希值不等,但是tophash[i]保存的是状态哈希且该状态表明当前位置是空的,此时可以将 k-v 插入当前位置
                if isEmpty(b.tophash[i]) && inserti == nil {
                    inserti = &b.tophash[i]
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                }
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
      // 位置 i 保存的高位哈希与当前 key 的高位哈希相等,但是 key 不一定相等(可能有哈希冲突),还需要进一步比较 key
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
      // 虽然高位哈希相等,但是 key 不相等,但是由于此处已经有别的 key 存在,所以不能直接插在此处
            if !t.key.equal(key, k) {
                continue
            }
            // 到这里说明找到了 key 在桶中的位置
            if t.needkeyupdate() {
                typedmemmove(t.key, k, key)
            }
      // 定位到目标 value 应该存放的地址
            elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
            goto done
        }
    // 遍历完都没找到合适的位置,那就继续遍历当前桶的溢出桶(通过当前桶的overflow字段)
        ovf := b.overflow(t)
        if ovf == nil {
            break
        }
        b = ovf
    }
    // If we hit the max load factor or we have too many overflow buckets,
    // and we're not already in the middle of growing, start growing.
  // 负载因子过高或有太多的溢出桶,且当前没有进行扩容,那么就执行扩容,然后重试前面的操作
    if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }
  // 不需要扩容,创建新的溢出桶
    if inserti == nil {
        // The current bucket and all the overflow buckets connected to it are full, allocate a new one.
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        elem = add(insertk, bucketCnt*uintptr(t.keysize))
    }

    // store new key/elem at insert position
    if t.indirectkey() {
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectelem() {
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(elem) = vmem
    }
    typedmemmove(t.key, insertk, key)
  // 存放在新的溢出桶的第一个位置
    *inserti = top
  // 元素个数增加
    h.count++
  ...
}

经过这一系列步骤之后,我们拿到了待写入数据应该写入的位置(即elem),mapassign 函数返回后,由其他函数进行数据的写入。

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
done:
    if h.flags&hashWriting == 0 {
        fatal("concurrent map writes")
    }
  // 还原标记,表示写入完成
    h.flags &^= hashWriting
    if t.indirectelem() {
        elem = *((*unsafe.Pointer)(elem))
    }
    return elem
}

读取

针对 map 的读取操作,通常有以下两种形式:

  • val := m[key]
  • val, ok := m[key]

分别对应源码中的两个函数:

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {...}
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {...}

原理上都差不多,下面以第一个函数为例来看看map 是如何访问的。

我们知道,针对一个未通过 make 初始化或元素个数为 0的 map进行读操作,能够获取到元素类型的零值,首先就是这段逻辑:

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    ...
    if h == nil || h.count == 0 {
        if t.hashMightPanic() {
            t.hasher(key, 0) // see issue 23734
        }
        return unsafe.Pointer(&zeroVal[0])
    }
  ...
}

然后进行并发写检测,和写操作类型,都是利用 flag 进行判断的。写操作时,会把 flag 置为 hashWriting,这时候读取可能会有问题,因此这里需要判断。

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    ...
    if h.flags&hashWriting != 0 {
        fatal("concurrent map read and map write")
    }
  ...
}

剩下的逻辑和写操作基本是一致的,包括: 利用哈希的低位确定 key 所在的桶、利用哈希高位逐个对比桶的tophash[i]、对 key 进行精确对比、必要时需要查找溢出桶。

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    ...
  // 计算 hash 值
    hash := t.hasher(key, uintptr(h.hash0))
    m := bucketMask(h.B)
  // 定位 key 所在的桶
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    if c := h.oldbuckets; c != nil {
        if !h.sameSizeGrow() {
            // There used to be half as many buckets; mask down one more power of two.
            m >>= 1
        }
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        if !evacuated(oldb) {
            b = oldb
        }
    }
  // 计算 key 的高位 hash
    top := tophash(hash)
bucketloop:
  // 如果当前桶未找到,则去查询下一个溢出桶
    for ; b != nil; b = b.overflow(t) {
    // 遍历当前桶的所有位置,一共 8 个
        for i := uintptr(0); i < bucketCnt; i++ {
      // 高位 hash 不等,可能有两种情况:
      // 1. tophash[i]为emptyRest,表明桶的当前位置及后续位置都是空的,key 必然不存在,因此需要查找下一个桶
      // 2. tophash[i]不等于 key 的高位哈希,说明当前位置存的不是目标 key,需要查找下一个位置,即 i+1
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
      // 高位哈希相等,但是 key 不一定相同,因此还需要针对 key 进行精确比较
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
            if t.key.equal(key, k) {
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                if t.indirectelem() {
                    e = *((*unsafe.Pointer)(e))
                }
        // 找到目标 key
                return e
            }
        }
    }
  // map中不存在该 key,返回数据类型的零值
    return unsafe.Pointer(&zeroVal[0])  
}

删除

mapdelete

总体逻辑和读写类似,都是利用低位哈希定位到 key 所在的桶,用高位哈希定位在桶中位置,以及对 key进行精确匹配,可能还涉及溢出桶的查找。

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
  ...
    if h == nil || h.count == 0 {
        if t.hashMightPanic() {
            t.hasher(key, 0) // see issue 23734
        }
        return
    }
    if h.flags&hashWriting != 0 {
        fatal("concurrent map writes")
    }

    hash := t.hasher(key, uintptr(h.hash0))

    // Set hashWriting after calling t.hasher, since t.hasher may panic,
    // in which case we have not actually done a write (delete).
    h.flags ^= hashWriting

    bucket := hash & bucketMask(h.B)
    if h.growing() {
        growWork(t, h, bucket)
    }
    b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
    bOrig := b
    top := tophash(hash)
search:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break search
                }
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            k2 := k
            if t.indirectkey() {
                k2 = *((*unsafe.Pointer)(k2))
            }
            if !t.key.equal(key, k2) {
                continue
            }
      ...
    }
    ...
  }
  ...
}

不同之处在于,使用 delete 删除 key时,并没有真的将保存 key-value的内存清空,而是直接将 tophash[i]置为特定的状态标记(emptyOne 或 emptyRest),以表明该位置是空的。当然,如果 key 或 value 中含有指针,还需要将指针置为 nil,以保证删除的key 和 value 可以被 GC 回收掉。

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
    ...
search:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
      ...
      // 如果 key 中含有指针,将指针置空,以保证可以被 GC 清理掉
            if t.indirectkey() {
                *(*unsafe.Pointer)(k) = nil
            } else if t.key.ptrdata != 0 {
                memclrHasPointers(k, t.key.size)
            }
      // 如果 value 中含有指针,将指针置空,以保证可以被 GC 清理掉
            e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
            if t.indirectelem() {
                *(*unsafe.Pointer)(e) = nil
            } else if t.elem.ptrdata != 0 {
                memclrHasPointers(e, t.elem.size)
            } else {
                memclrNoHeapPointers(e, t.elem.size)
            }
      ...
}

接下来设置标记。

首先可以肯定的是,删除当前位置的 key 后,当前位置的 tophash[i]一定可以置为 emptyOne,但这还不够。

还需要进一步判断当前位置的 tophash[i]是否可以置为emptyRest,因为查找或写入时如果知道 tophash 是 emptyRest,那么就能直接知道当前位置及之后的位置是空的,这样可以避免非必要的遍历了。

以下情况下,当前位置的 tophash 不能设置为emptyRest:

  • 当前位置处于桶的最后,且当前桶存在溢出桶,且该溢出桶的 tophash[0]不是 emptyRest
  • 当前桶不位于桶的最后,且桶中下一个位置的 tophash 不是 emptyRest

这两种情况下说明当前桶的后面有非空的位置,可能在同一个桶的后面,也可能在溢出桶中。

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
...
            b.tophash[i] = emptyOne
            // If the bucket now ends in a bunch of emptyOne states,
            // change those to emptyRest states.
            // It would be nice to make this a separate function, but
            // for loops are not currently inlineable.
            if i == bucketCnt-1 {
                if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
                    goto notLast
                }
            } else {
                if b.tophash[i+1] != emptyRest {
                    goto notLast
                }
            }
            ...
        notLast:
            h.count--
            // Reset the hash seed to make it more difficult for attackers to
            // repeatedly trigger hash collisions. See issue 25237.
            if h.count == 0 {
                h.hash0 = fastrand()
            }
            break search
        }
    }
...
}

以上两种情况之外,就可以直接将当前位置的 tophash置为 emptyRest 。

Golang 的设计更进一步,不仅会设置当前位置,还试图修改更前面的位置的标记。

  • 在同一个桶中,当前位置置为 emptyRest后,继续访问前一个位置,如果该位置标记是 emptyOne,则将该位置也置为emptyRest,直到遇到第一个非 emptyOne的位置
  • 如果当前桶的第一个位置也置为 emptyRest 了,则定位到上一个桶,执行上述操作
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
...
            for {
        // 将当前位置的 tophash 置为 emptyRest
                b.tophash[i] = emptyRest
        // 当前位置位于所在桶的头部
                if i == 0 {
          // bOrig 指向第一个桶,即通过低位哈希获取到的位置,如果当前桶已经是第一个桶,则无需操作
                    if b == bOrig {
                        break // beginning of initial bucket, we're done.
                    }
          // 寻找当前桶的前一个桶
                    c := b
                    for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
                    }
          // i = 7,相当于是把当前桶的前一个桶的 tophash[7]置为 emptyRest
                    i = bucketCnt - 1
                } else {
          // 从后往前遍历桶中的每个位置
                    i--
                }
                if b.tophash[i] != emptyOne {
                    break
                }
            }
        notLast:
            h.count--
            // Reset the hash seed to make it more difficult for attackers to
            // repeatedly trigger hash collisions. See issue 25237.
            if h.count == 0 {
                h.hash0 = fastrand()
            }
            break search
        }
    }

    if h.flags&hashWriting == 0 {
        fatal("concurrent map writes")
    }
    h.flags &^= hashWriting
}

通过上述操作,在查找时如果tophash[i] == emptyRest,就能知道该位置及以后的位置都是空的,就不需要再查找后续位置和溢出桶了。

清空

golang 中的并没有提供 clear 关键字将 map 一次性清空,如果想要清空 map,有两种方式:

  • 创建新的 map 并赋值给原本的 map 变量,原本的 map 会被 GC 掉
  • 通过循环 + delete 删除

方式一会创建新的map,不够干净,且上一个 map 分配的空间无法重用;直观上,方式二迭代+删除会比较慢。

而实际上,编译器会对迭代 + 删除这种方式进行优化,在源码层面对应下面的函数,其主要思路就是: 将map 中的各种标记清空、将桶数组和溢出桶的内存清零并复用。

// mapclear deletes all keys from a map.
func mapclear(t *maptype, h *hmap) {
    ...
    if h == nil || h.count == 0 {
        return
    }

    if h.flags&hashWriting != 0 {
        fatal("concurrent map writes")
    }

    h.flags ^= hashWriting

    h.flags &^= sameSizeGrow
    h.oldbuckets = nil
    h.nevacuate = 0
    h.noverflow = 0
    h.count = 0

    // Reset the hash seed to make it more difficult for attackers to
    // repeatedly trigger hash collisions. See issue 25237.
    h.hash0 = fastrand()

    // Keep the mapextra allocation but clear any extra information.
    if h.extra != nil {
        *h.extra = mapextra{}
    }

    // makeBucketArray clears the memory pointed to by h.buckets
    // and recovers any overflow buckets by generating them
    // as if h.buckets was newly alloced.
    _, nextOverflow := makeBucketArray(t, h.B, h.buckets)
    if nextOverflow != nil {
        // If overflow buckets are created then h.extra
        // will have been allocated during initial bucket creation.
        h.extra.nextOverflow = nextOverflow
    }

    if h.flags&hashWriting == 0 {
        fatal("concurrent map writes")
    }
    h.flags &^= hashWriting
}

扩容

在讲mapassign函数时说到,如果负载因子过高或者 map 中溢出桶过多,会触发扩容。

golang 将扩容分成两种:

  • 等量扩容
  • 增量扩容

负载因子的计算和桶的数量(即 B)有关,因此如果负载因子过大,则说明B过小,此时需要将 B 加 1,对应的桶数变成了原来的 2 倍,这就是增量扩容;

否则,可能是溢出桶太多了,实际数据并没有那么多,这样会影响查找效率。比如连续插入数据后删除,导致溢出桶很多。这种情况下只需要将松散的数据集中起来即可,桶数量保持不变,这就是等量扩容。

func hashGrow(t *maptype, h *hmap) {
  // 默认进行增量扩容
    bigger := uint8(1)
  // 如果负载因子没有超过阈值,则使用等量扩容
    if !overLoadFactor(h.count+1, h.B) {
        bigger = 0
        h.flags |= sameSizeGrow
    }
  // 扩容后原来的桶数据保存在oldbuckets中
    oldbuckets := h.buckets
  // 创建新桶和溢出桶。增量模式下,桶数量为原来的 2 倍;等量模式下,桶数量不变
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        flags |= oldIterator
    }
    // 更新 map 的桶数
    h.B += bigger
    h.flags = flags
    h.oldbuckets = oldbuckets
  // 设置新桶,此时buckets是空的,所有的数据都在oldbuckets上
    h.buckets = newbuckets
    h.nevacuate = 0
    h.noverflow = 0

  // 原本的溢出桶数据保存到oldoverflow中
    if h.extra != nil && h.extra.overflow != nil {
        // Promote current overflow buckets to the old generation.
        if h.extra.oldoverflow != nil {
            throw("oldoverflow is not nil")
        }
        h.extra.oldoverflow = h.extra.overflow
        h.extra.overflow = nil
    }
  // 设置预分配的溢出桶地址
    if nextOverflow != nil {
        if h.extra == nil {
            h.extra = new(mapextra)
        }
        h.extra.nextOverflow = nextOverflow
    }

    // the actual copying of the hash table data is done incrementally
    // by growWork() and evacuate().
}

无论哪种模式,原本的普通桶和溢出桶数据分别保存到了oldbuckets和oldoverflow中,等待下一步的整理和搬迁。但是如果有非常多的 k-v 需要迁移,会非常影响性能。Golang 采用了渐进式迁移的方式,将数据迁移分散在写和删除操作中,即下面这段代码:

这里 growWork 会执行两次evacuate,第一次用于搬迁当前 key 所在的桶,而第二次用于更新搬迁进度,比如当前搬迁进度nevacuate为 0,那么growWork会先搬迁 key 所在的桶,然后搬迁 0 号桶。

bucket := hash & bucketMask(h.B)
if h.growing() {
  growWork(t, h, bucket)
}

func (h *hmap) growing() bool {
    return h.oldbuckets != nil
}

func growWork(t *maptype, h *hmap, bucket uintptr) {
    // make sure we evacuate the oldbucket corresponding
    // to the bucket we're about to use
  // bucket&h.oldbucketmask() 用于确定当前正在操作的桶在扩容前的位置
    evacuate(t, h, bucket&h.oldbucketmask())

    // evacuate one more oldbucket to make progress on growing
    if h.growing() {
        evacuate(t, h, h.nevacuate)
    }
}

结合定位桶的方式,不难得出以下结论:

  • 等量扩容模式下,桶数量不变,掩码和扩容前是一致的,因此扩容前后 key 所在的桶序号不变
  • 增量扩容模式下,桶数量变成原来的 2倍,掩码也比之前多了 1 位,key 所在的桶序号可能会改变。比如原本的 B 是 5,计算位置时只需取 key 的低位哈希后 5 位,而增量扩容后B是 6,计算位置时需取 key 的低位哈希后 6位。桶序号是否改变取决于 key 的低位哈希的倒数第 6 位是 0 还是 1。

假定扩容前的 B = 5,桶数量为32,扩容后 B = 6,桶数量为 64。扩容后前 32 个桶称为 X 区,后 32 个桶称为 Y 区。

对等量扩容来说,搬迁后只会到 X 区(只有 X 区),而对于增量扩容,搬迁后可能位于 X 区,也可能位于 Y 区,具体取决于低位哈希的最高位是 1 还是 0。这个概念对理解扩容很重要,下面回到源码。

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
  // 定位到需要搬迁的旧桶
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
  // 扩容前的桶数
    newbit := h.noldbuckets()
  // 当前桶的 tophash[0] 不是 evacuatedX、evacuatedY和evacuatedEmpty 中任意一个,表明当前桶未搬迁过
    if !evacuated(b) {
        var xy [2]evacDst
    // 假定是等量扩容,扩容前后 key 在桶中的序号不变,即保存到 X 区,这里 x.b 就是 X 区桶的地址
        x := &xy[0]
        x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        x.k = add(unsafe.Pointer(x.b), dataOffset)
        x.e = add(x.k, bucketCnt*uintptr(t.keysize))

    // 如果是增量扩容,序号可能改变,可能保存到 X 区或 Y 区,这里 y.b 就是 Y 区桶的地址
        if !h.sameSizeGrow() {
            y := &xy[1]
            y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            y.k = add(unsafe.Pointer(y.b), dataOffset)
            y.e = add(y.k, bucketCnt*uintptr(t.keysize))
        }

    // 遍历旧桶该位置下所有的桶,包括普通桶和溢出桶
        for ; b != nil; b = b.overflow(t) {
      // key 的地址
            k := add(unsafe.Pointer(b), dataOffset)
      // value 的地址
            e := add(k, bucketCnt*uintptr(t.keysize))
      // 遍历同一个桶中的 8 个位置
            for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
        // 当前位置的高位哈希为 emptyRest 或 emptyOne,说明当前位置是空的
        // 将 tophash 设置为evacuatedEmpty,表示已搬迁,继续处理下一个位置
                top := b.tophash[i]
                if isEmpty(top) {
                    b.tophash[i] = evacuatedEmpty
                    continue
                }
                if top < minTopHash {
                    throw("bad map state")
                }
                k2 := k
                if t.indirectkey() {
                    k2 = *((*unsafe.Pointer)(k2))
                }
                var useY uint8
        // 增量模式下
                if !h.sameSizeGrow() {
                    // 计算哈希
                    hash := t.hasher(k2, uintptr(h.hash0))
                    if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
                        // If key != key (NaNs), then the hash could be (and probably
                        // will be) entirely different from the old hash. Moreover,
                        // it isn't reproducible. Reproducibility is required in the
                        // presence of iterators, as our evacuation decision must
                        // match whatever decision the iterator made.
                        // Fortunately, we have the freedom to send these keys either
                        // way. Also, tophash is meaningless for these kinds of keys.
                        // We let the low bit of tophash drive the evacuation decision.
                        // We recompute a new random tophash for the next level so
                        // these keys will get evenly distributed across all buckets
                        // after multiple grows.
                        useY = top & 1
                        top = tophash(hash)
                    } else {
            // 判断在新的容量下,低位哈希的最高是否是 1,如果是 1,则 useY = 1,表示数据会被搬迁到 Y 区
                        if hash&newbit != 0 {
                            useY = 1
                        }
                    }
                }

                if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
                    throw("bad evacuatedN")
                }

        // 如果useY=1,则evacuatedX + useY其实就是evacuatedY。这里tophash[i] = evacuatedX或evacuatedY 表示数据被搬迁到 X 区或 Y 区
                b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
                dst := &xy[useY]                 // evacuation destination

        // i记录当前正在搬迁的数据在桶中的位置,如果是 8,表明这个桶中所有元素都已经搬迁了,接下来搬迁溢出桶
                if dst.i == bucketCnt {
                    dst.b = h.newoverflow(t, dst.b)
                    dst.i = 0
                    dst.k = add(unsafe.Pointer(dst.b), dataOffset)
                    dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
                }
        // 设置tophash[i]
                dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
        // 将旧桶的k-v复制到dst
                if t.indirectkey() {
                    *(*unsafe.Pointer)(dst.k) = k2 // copy pointer
                } else {
                    typedmemmove(t.key, dst.k, k) // copy elem
                }
                if t.indirectelem() {
                    *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
                } else {
                    typedmemmove(t.elem, dst.e, e)
                }
        // 标记下一个数据的位置
                dst.i++
                // These updates might push these pointers past the end of the
                // key or elem arrays.  That's ok, as we have the overflow pointer
                // at the end of the bucket to protect against pointing past the
                // end of the bucket.
        // dst指向下一个位置
                dst.k = add(dst.k, uintptr(t.keysize))
                dst.e = add(dst.e, uintptr(t.elemsize))
            }
        }
        // Unlink the overflow buckets & clear key/elem to help GC.
    // 如果没有协程使用旧桶的数据且存在溢出桶
        if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
            b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
            // Preserve b.tophash because the evacuation
            // state is maintained there.
            ptr := add(b, dataOffset)
            n := uintptr(t.bucketsize) - dataOffset
            memclrHasPointers(ptr, n)
        }
    }

  // 搬迁状态,nevacuate左边的桶都已经搬迁完了
    if oldbucket == h.nevacuate {
        advanceEvacuationMark(h, t, newbit)
    }
}

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
  // 搬迁进度加 1,表示当前位置的桶已经搬迁了
    h.nevacuate++
    // Experiments suggest that 1024 is overkill by at least an order of magnitude.
    // Put it in there as a safeguard anyway, to ensure O(1) behavior.
    stop := h.nevacuate + 1024
    if stop > newbit {
        stop = newbit
    }
  // 寻找一个未搬迁的位置
    for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
        h.nevacuate++
    }
  // 所有的桶都搬迁完了,做一些清理操作
    if h.nevacuate == newbit { // newbit == # of oldbuckets
        // Growing is all done. Free old main bucket array.
    // 置空旧桶指针,旧桶所有数据已经搬迁到新桶,因此可以放心清理
        h.oldbuckets = nil
        // Can discard old overflow buckets as well.
        // If they are still referenced by an iterator,
        // then the iterator holds a pointers to the slice.
    // 置空旧的溢出桶指针
        if h.extra != nil {
            h.extra.oldoverflow = nil
        }
    // 清除sameSizeGrow标记
        h.flags &^= sameSizeGrow
    }
}

遍历

在应用层面只需要一个简单的 for-range即可实现 map 的遍历,但是在源码层面,实现起来并不那么简单。

上面说到,map 的搬迁操作是分散在各个写操作和删除操作中的,所以遍历过程中,数据会分散在新桶和旧桶中。golang 采用迭代器模式实现数据的访问,依赖于下面的结构:

type hiter struct {
  // 指向遍历的 key 的指针
    key         unsafe.Pointer // Must be in first position.  Write nil to indicate iteration end (see cmd/compile/internal/walk/range.go).
  // 指向遍历的 value 的指针
    elem        unsafe.Pointer // Must be in second position (see cmd/compile/internal/walk/range.go).
    t           *maptype
    h           *hmap
  // map 的桶数组
    buckets     unsafe.Pointer // bucket ptr at hash_iter initialization time
  // 当前遍历到的桶
    bptr        *bmap          // current bucket
    overflow    *[]*bmap       // keeps overflow buckets of hmap.buckets alive
    oldoverflow *[]*bmap       // keeps overflow buckets of hmap.oldbuckets alive
  // 开始遍历的桶索引
    startBucket uintptr        // bucket iteration started at
    offset      uint8          // intra-bucket offset to start from during iteration (should be big enough to hold bucketCnt-1)
  // 是否已经遍历到桶数组尾部
    wrapped     bool           // already wrapped around from end of bucket array to beginning
    B           uint8
  // 当前遍历的 k-v 在桶中的位置
    i           uint8
    bucket      uintptr
    checkBucket uintptr
}

遍历流程开始时,会执行 mapinit 函数,创建并初始化hiter对象。为了防止开发者过于依赖 map 遍历的顺序,golang 采用随机的方式确定开始遍历的桶序号和数据的起始索引号(同一个桶内)。

func mapiterinit(t *maptype, h *hmap, it *hiter) {
  ...
    // decide where to start
    var r uintptr
  // 生成随机数
    if h.B > 31-bucketCntBits {
        r = uintptr(fastrand64())
    } else {
        r = uintptr(fastrand())
    }
  // 随机确定开始遍历的桶序号
    it.startBucket = r & bucketMask(h.B)
  // 随机确定在桶中开始遍历的索引号
    it.offset = uint8(r >> h.B & (bucketCnt - 1))

    // iterator state
    it.bucket = it.startBucket

    // Remember we have an iterator.
    // Can run concurrently with another mapiterinit().
  // 设置标记
    if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
        atomic.Or8(&h.flags, iterator|oldIterator)
    }

    mapiternext(it)
}

接下来进入mapiternext进行遍历。

func mapiternext(it *hiter)

map 不是并发安全的结构,如果在遍历过程中发现有其他的协程在进行写操作,会抛出 fatal error

if h.flags&hashWriting != 0 {
  fatal("concurrent map iteration and map write")
}

接下来开始遍历,会依次遍历桶数组中的每个桶及其后面的溢出桶。

func mapiternext(it *hiter) {
...
next:
    if b == nil {
    // 当前又回到了起始位置且已经遍历过桶数组的末尾,说明遍历完了
        if bucket == it.startBucket && it.wrapped {
            // end of iteration
            it.key = nil
            it.elem = nil
            return
        }
    // 如果处于扩容阶段
        if h.growing() && it.B == h.B {
            // Iterator was started in the middle of a grow, and the grow isn't done yet.
            // If the bucket we're looking at hasn't been filled in yet (i.e. the old
            // bucket hasn't been evacuated) then we need to iterate through the old
            // bucket and only return the ones that will be migrated to this bucket.
            oldbucket := bucket & it.h.oldbucketmask()
            b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
      // 当前桶的数据还未搬迁,将checkBucket置为当前桶
            if !evacuated(b) {
                checkBucket = bucket
            } else {
                b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
                checkBucket = noCheck
            }
        } else {
            b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
            checkBucket = noCheck
        }
    // 遍历的桶号加 1,如果到了桶数组末尾,将wrapped置为 true,桶号置0,因为可能还需要遍历前面的桶
        bucket++
        if bucket == bucketShift(it.B) {
            bucket = 0
            it.wrapped = true
        }
        i = 0
    }
  // 遍历桶中的每个位置
    for ; i < bucketCnt; i++ {
        offi := (i + it.offset) & (bucketCnt - 1)
        if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
            // TODO: emptyRest is hard to use here, as we start iterating
            // in the middle of a bucket. It's feasible, just tricky.
            continue
        }
        k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
        if t.indirectkey() {
            k = *((*unsafe.Pointer)(k))
        }
        e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))
    // 如果桶是旧桶数组中未迁移完成的桶,需要按照其在新桶的顺序进行遍历
        if checkBucket != noCheck && !h.sameSizeGrow() {
            // Special case: iterator was started during a grow to a larger size
            // and the grow is not done yet. We're working on a bucket whose
            // oldbucket has not been evacuated yet. Or at least, it wasn't
            // evacuated when we started the bucket. So we're iterating
            // through the oldbucket, skipping any keys that will go
            // to the other new bucket (each oldbucket expands to two
            // buckets during a grow).
            if t.reflexivekey() || t.key.equal(k, k) {
                // If the item in the oldbucket is not destined for
                // the current new bucket in the iteration, skip it.
                hash := t.hasher(k, uintptr(h.hash0))
                if hash&bucketMask(it.B) != checkBucket {
                    continue
                }
            } else {
                // Hash isn't repeatable if k != k (NaNs).  We need a
                // repeatable and randomish choice of which direction
                // to send NaNs during evacuation. We'll use the low
                // bit of tophash to decide which way NaNs go.
                // NOTE: this case is why we need two evacuate tophash
                // values, evacuatedX and evacuatedY, that differ in
                // their low bit.
                if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
                    continue
                }
            }
        }
        if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
            !(t.reflexivekey() || t.key.equal(k, k)) {
            // This is the golden data, we can return it.
            // OR
            // key!=key, so the entry can't be deleted or updated, so we can just return it.
            // That's lucky for us because when key!=key we can't look it up successfully.
            it.key = k
            if t.indirectelem() {
                e = *((*unsafe.Pointer)(e))
            }
            it.elem = e
        } else {
        // 执行 mapaccessK 函数进行查找,并将 k-v 保存在 iter 对象中 
            // The hash table has grown since the iterator was started.
            // The golden data for this key is now somewhere else.
            // Check the current hash table for the data.
            // This code handles the case where the key
            // has been deleted, updated, or deleted and reinserted.
            // NOTE: we need to regrab the key as it has potentially been
            // updated to an equal() but not identical key (e.g. +0.0 vs -0.0).
            rk, re := mapaccessK(t, h, k)
            if rk == nil {
                continue // key has been deleted
            }
            it.key = rk
            it.elem = re
        }
        it.bucket = bucket
        if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
            it.bptr = b
        }
        it.i = i + 1
        it.checkBucket = checkBucket
        return
    }
    b = b.overflow(t)
    i = 0
    goto next
}

结论

通过上述分析,不难得出以下结论:

  • 创建 map 时尽量指定 hint 的大小,避免后续带来的扩容问题
  • 如果 k-v 能用不含指针的类型,就尽量不用指针,可以减轻 GC 压力
  • 使用delete 删除map中的 key 时不会真的释放内存,只是做个标记
  • map 不是并发安全的
  • map 本质上是使用了拉链法去解决哈希冲突问题,区别在于链上的每个节点可以保存 8 个 k-v