目录

常见的限流算法

固定窗口计数器算法

固定窗口计数器算法将时间分为固定大小的窗口,例如1秒。在每个窗口中,服务会记录它接收到的请求数。如果在一个窗口中的请求数超过了预先设定的阈值,那么新的请求将被拒绝,直到进入下一个窗口。

这种算法简单易实现,但可能会导致窗口边界附近的请求突发。例如,如果窗口大小为1秒,阈值为100,那么在1秒的边界处,服务可能会在短时间内处理200个请求。

滑动窗口计数器算法

滑动窗口计数器算法试图解决固定窗口计数器算法中的请求突发问题。它将窗口分成更小的子窗口,例如将1秒分为10个100毫秒的子窗口。每次接收到请求时,服务会更新当前子窗口的计数器。服务会检查过去的N个子窗口的计数器之和,如果这个和超过阈值,那么新的请求将被拒绝。

这种算法可以更好地平滑请求流量,但实现起来相对复杂,因为需要跟踪多个子窗口的计数器。

令牌桶算法

令牌桶算法维护一个令牌桶,其中包含一定数量的令牌。令牌以恒定速率添加到桶中,直到达到桶的容量。每次接收到请求时,服务会尝试从桶中获取一个令牌。如果桶中有足够的令牌,请求被允许处理;如果没有足够的令牌,请求将被拒绝。

令牌桶算法允许短暂的请求突发,因为在低流量时期,令牌可以累积到桶的容量。这种算法在实践中表现良好,但实现起来相对复杂。

漏桶算法

漏桶算法使用一个队列模拟一个漏水的桶。请求作为水滴进入队列,以恒定速率从队列中移除并处理。如果队列已满,新的请求将被拒绝。

漏桶算法可以平滑请求流量,但它不能处理突发流量,因为请求处理速率是固定的。实现漏桶算法也相对复杂,因为需要在后台使用定时器或其他机制来以恒定速率处理队列中的请求。

time/rate

主要方法

NewLimiter(limit Limit, burst int) *Limiter(lim *Limiter) Allow() bool(lim *Limiter) AllowN(now time.Time, n int) boolAllow()(lim *Limiter) Wait(ctx context.Context) error(lim *Limiter) WaitN(ctx context.Context, n int) error(lim *Limiter) Reserve() *ReservationReservation(lim *Limiter) ReserveN(now time.Time, n int) *ReservationReserve()

各个方法的作用

NewLimiterAllowAllowNWaitWaitNReserveReserveN

time/rate 是如何实现限流的

time/ratelimitburst
AllowAllowNWaitWaitNReserveReserveN
time/rate

源码解析

令牌桶限流器的定义

rate.goLimiter
type Limiter struct {
    mu     sync.Mutex
    limit  Limit
    tokens float64
    // last 是上次令牌桶更新的时间
    last time.Time
    // 用于调整令牌桶更新时间的时钟
    clock Clock
    // 用于在 Wait 系列方法中进行休眠的定时器
    sleepFn func(time.Duration)
}
Limiterlimittokenslast

令牌桶更新

time/ratereserveN
func (lim *Limiter) reserveN(now time.Time, n int) *Reservation {
    lim.mu.Lock()
    defer lim.mu.Unlock()
    // 更新令牌桶
    now, tokens := lim.advance(now)
    // 计算需要的令牌数与当前可用令牌数之间的差值
    delta := float64(n) - tokens
    // 计算等待时间
    waitDuration := lim.limit.durationFromTokens(delta)
    // 更新令牌桶状态
    tokens -= float64(n)
    lim.last = now.Add(waitDuration)
    lim.tokens = tokens
    return &Reservation{
        ok:        true,
        lim:       lim,
        tokens:    n,
        timeToAct: now.Add(waitDuration),
    }
}
reserveNadvance
func (lim *Limiter) advance(now time.Time) (time.Time, float64) {
    last := lim.last
    // 计算上次更新以来经过的时间
    elapsed := now.Sub(last)
    // 根据经过的时间计算生成的令牌数
    delta := elapsed.Seconds() * float64(lim.limit)
    // 更新令牌桶中的令牌数,但不超过令牌桶容量
    tokens := math.Min(lim.tokens+delta, float64(lim.limit.Burst()))
    return now, tokens
}
advance

令牌预留和等待

reserveN

才能获得足够的令牌。最后,更新令牌桶状态,将所需令牌数从当前令牌数中减去。

reserveNReservationReservation
type Reservation struct {
    ok        bool
    lim       *Limiter
    tokens    int
    timeToAct time.Time
}
ReservationDelayCancel

公开 API

time/rateAllowAllowNWaitWaitNReserveReserveNreserveNAllow
func (lim *Limiter) Allow() bool {
    return lim.AllowN(time.Now(), 1)
}
func (lim *Limiter) AllowN(now time.Time, n int) bool {
    return lim.reserveN(now, n).Delay() == 0
}
WaitWaitN
func (lim *Limiter) Wait(ctx context.Context) error {
    return lim.WaitN(ctx, 1)
}
func (lim *Limiter) WaitN(ctx context.Context, n int) error {
    if n > lim.limit.Burst() {
        return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.limit.Burst())
    }
    r := lim.ReserveN(time.Now(), n)
    delay := r.DelayFrom(time.Now())
    if delay == 0 {
        return nil
    }
    t := lim.clock.AfterFunc(delay, r.Cancel)
    defer t.Stop()
    select {
    case <-ctx.Done():
        r.Cancel()
        return ctx.Err()
    case <-t.C:
        return nil
    }
}
time/ratereserveN