限流算法

常见的限流算法有漏桶算法(Leaky Bucket),令牌桶算法(Token Bucket)

  • 漏桶算法(Leaky Bucket)
    原理类似于漏桶的进水和出水,流入漏桶的水流大于流出漏桶的水流时,漏桶就会慢慢变满,水满后就会溢出。请求访问服务,当突发请求大于服务承受数量时,服务器拒绝访问,服务器以一定的速度响应请求。

  • 令牌桶算法(Token Bucket)
    和漏桶算法不同的是,令牌桶算法以一定的速率向桶里放入token, 当桶满时就暂停加入token或者丢弃token。每个请求获得一个token才可以进行下一步,没有token就会阻塞或被拒绝。令牌桶允许突发流量,如突然将桶内令牌消耗完。

golang实现令牌桶算法


type Bucket struct {
	startTime time.Time
	// token桶的容量
	capacity int64
	// 时间间隔,
	fillInterval time.Duration
	// 每个时间间隔加入quantum个token
	quantum int64
	// 当前可用的token
	avail int64
	// 当前的时间间隔
	availTick int64

	mu sync.Mutex
}

func (tb *Bucket) adjust(now time.Time) (currentTick int64) {
	currentTick = int64(now.Sub(tb.startTime) / tb.fillInterval)

	// token已经满,直接返回
	if tb.avail >= tb.capacity {
		return
	}

	// 更新token,
	tb.avail += (currentTick - tb.availTick) * tb.quantum
	if tb.avail > tb.capacity {
		// token大于容量,溢出处理
		tb.avail = tb.capacity
	}

	// 保存当前时间间隔
	tb.availTick = currentTick
	return
}

func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
	if count <= 0 {
		return 0
	}
	tb.mu.Lock()
	defer tb.mu.Unlock()

    // 调整当前时间间隔的token数
	tb.adjust(now)
	if tb.avail <= 0 {
		return 0
	}
	if count > tb.avail {
		count = tb.avail
	}

	// 消耗count个token
	tb.avail -= count
	return count
}

func (tb *Bucket) TakeAvailable(count int64) int64 {
	return tb.takeAvailable(time.Now(), count)
}


分布式限流

同一个微服务通常会部署多个实例,在多个实例之间做限流实际是分布式限流问题。从redis 2.6版本开始,内嵌了Lua解析器。可以利用Redis原子性执行脚本的特点来实现分布式限流。

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/garyburd/redigo/redis"
)

// INCR同时设置过期时间
var redisINCRAndExpireCmd = redis.NewScript(1, `
local current
current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
    redis.call("expire",KEYS[1],ARGV[1])
end
return current
`)

type RedisPool struct {
	pool *redis.Pool
}

func (r *RedisPool) INCR(key string, seconds int) (int, error) {
	if key == "" {
		return 0, fmt.Errorf("INCR key参数不能为空")
	}

	c := r.pool.Get()
	defer c.Close()

	return redis.Int(redisINCRAndExpireCmd.Do(c, key, seconds))
}

func formatKey(keyPrefix string, count int, seconds int64) (key string, err error) {
	if seconds < 1 {
		err = errors.New("时间间隔小于1秒")
		return
	}
	key = fmt.Sprintf("%s:ratelimiter:%d:%d", keyPrefix, count, seconds)
	return key, nil
}

// 限制Duration时间内的访问量为count次, QPS
func IsRateLimitExceed(p *RedisPool, keyPrefix string, count int, duration time.Duration) (exceed bool, current int, err error) {
	seconds := int64(duration / time.Second)
	key, err := formatKey(keyPrefix, count, seconds)
	if err != nil {
		return false, 0, err
	}
	current, err = p.INCR(key, int(seconds))
	if err != nil {
		return false, 0, err
	}
	return current > count, current, nil
}

参考