目录
/lib/bloomfilter

概述

vmstorage
maxHourlySeriesmaxDailySeries

唯一序列指表示唯一的时间序列,如metrics{label1="value1",label2="value2"}属于一个时间序列,但多条不同值的metrics{label1="value1",label2="value2"}属于同一条时间序列。victoriaMetrics使用如下方式来获取时序的唯一标识:

func getLabelsHash(labels []prompbmarshal.Label) uint64 {
	bb := labelsHashBufPool.Get()
	b := bb.B[:0]
	for _, label := range labels {
		b = append(b, label.Name...)
		b = append(b, label.Value...)
	}
	h := xxhash.Sum64(b)
	bb.B = b
	labelsHashBufPool.Put(bb)
	return h
}

限速器的初始化

victoriaMetrics使用了一个类似限速器的概念,限制每小时/每天新增的唯一序列,但与普通的限速器不同的是,它需要在序列级别进行限制,即判断某个序列是否是新的唯一序列,如果是,则需要进一步判断一段时间内缓存中新的时序数目是否超过限制,而不是简单地在请求层面进行限制。

hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
dailySeriesLimiter = bloomfilter.NewLimiter(*maxDailySeries, 24*time.Hour)

下面是新建限速器的函数,传入一个最大(序列)值,以及一个刷新时间。该函数中会:

maxItemsrefreshInterval
func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {
	l := &Limiter{
		maxItems: maxItems,
		stopCh:   make(chan struct{}),
	}
	l.v.Store(newLimiter(maxItems)) //1
	l.wg.Add(1)
	go func() {
		defer l.wg.Done()
		t := time.NewTicker(refreshInterval)
		defer t.Stop()
		for {
			select {
			case <-t.C:
				l.v.Store(newLimiter(maxItems))//2
			case <-l.stopCh:
				return
			}
		}
	}()
	return l
}
AddgetLabelsHashAdd

如果当前存储的元素个数大于等于允许的最大元素,则通过过滤器判断缓存中是否已经存在该元素;否则将该元素直接加入过滤器中,后续允许将该元素加入到缓存中。

func (l *Limiter) Add(h uint64) bool {
	lm := l.v.Load().(*limiter)
	return lm.Add(h)
}
func (l *limiter) Add(h uint64) bool {
	currentItems := atomic.LoadUint64(&l.currentItems)
	if currentItems >= uint64(l.f.maxItems) {
		return l.f.Has(h)
	}
	if l.f.Add(h) {
		atomic.AddUint64(&l.currentItems, 1)
	}
	return true
}
HasAdd
bitsPerItembitsCountbitsbits63
func newFilter(maxItems int) *filter {
	bitsCount := maxItems * bitsPerItem
	bits := make([]uint64, (bitsCount+63)/64)
	return &filter{
		maxItems: maxItems,
		bits:     bits,
	}
}
bitsPerItemhashesCountfilter_test.go
	if p > 0.003 {
		t.Fatalf("too big false hits share for maxItems=%d: %.5f, falseHits: %d", maxItems, p, falseHits)
	}

AddAddhashesCount
hhw & maskwAddbits[i]bits[i]
func (f *filter) Add(h uint64) bool {
	bits := f.bits
	maxBits := uint64(len(bits)) * 64 //1
	bp := (*[8]byte)(unsafe.Pointer(&h))//2
	b := bp[:]
	isNew := false
	for i := 0; i < hashesCount; i++ {//3
		hi := xxhash.Sum64(b)//4
		h++ //5
		idx := hi % maxBits //6
		i := idx / 64 //7
		j := idx % 64 //8
		mask := uint64(1) << j //9
		w := atomic.LoadUint64(&bits[i])//10
		for (w & mask) == 0 {//11
			wNew := w | mask //12
			if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {//13
				isNew = true//14
				break
			}
			w = atomic.LoadUint64(&bits[i])//14
		}
	}
	return isNew
}
AddHasAddbits[i]
func (f *filter) Has(h uint64) bool {
	bits := f.bits
	maxBits := uint64(len(bits)) * 64
	bp := (*[8]byte)(unsafe.Pointer(&h))
	b := bp[:]
	for i := 0; i < hashesCount; i++ {
		hi := xxhash.Sum64(b)
		h++
		idx := hi % maxBits
		i := idx / 64
		j := idx % 64
		mask := uint64(1) << j
		w := atomic.LoadUint64(&bits[i])
		if (w & mask) == 0 {
			return false
		}
	}
	return true
}

总结

由于victoriaMetrics的过滤器采用的是布隆过滤器,因此它的限速并不精准,在源码条件下, 大约有3%的偏差。但同样地,由于采用了布隆过滤器,降低了所需的内存以及相关计算资源。此外victoriaMetrics的过滤器实现了并发访问。

在大流量场景中,如果需要对请求进行相对精准的过滤,可以考虑使用布隆过滤器,降低所需要的资源,但前提是过滤的结果能够忍受一定程度的漏失率。

您可能感兴趣的文章: