限流的目的
保障服务稳定的三大利器:熔断、降级、服务限流。今天和大家谈谈限流算法的几种实现方式,本文所说的限流并非是Nginx、网关层面的限流,而是业务代码中的逻辑限流。
限流实现方式
常见的限流算法:固定窗口、滑动窗口、漏桶、令牌桶
固定窗口
思想
在一个固定的时间窗口内对请求计数,如果请求数达到阈值那么进行限流,到达时间临界点时重置计数
实现
import (
"sync"
"time"
)
type CountLimitRate struct {
duration time.Duration
rate int32
mu sync.Mutex
count int32
lastGetTime time.Time
}
func NewCountLimitRate(duration time.Duration, rate int32) *CountLimitRate {
return &CountLimitRate{
duration: duration,
rate: rate,
lastGetTime: time.Now(),
}
}
func (r *CountLimitRate) Acquire() bool {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
if now.Sub(r.lastGetTime) > r.duration {
r.Reset(now)
}
if r.count >= r.rate {
return false
}
r.count++
return true
}
func (r *CountLimitRate) Reset(getTime time.Time) {
r.lastGetTime = getTime
r.count = 0
}
问题
固定窗口会有边界峰值问题即突刺问题
比如窗口大小是1S,限制是1S内最多100个请求
前一S的最后200ms出现100个请求,后一S的前200ms出现100个请求,对于固定窗口来说是符合规定的,但此时就出现了边界峰值,1S内有200个请求,可能会压垮后端服务
滑动窗口
思想
滑动窗口算法将一个大的时间窗口分成多个小窗口(timeSlot),每次大窗口向后滑动一个小窗口,并保证大的窗口内流量不会超出最大值,大窗口内的流量是所有小窗口流量之和。
对于滑动时间窗口,我们可以把1s的时间窗口划分成10个小窗口即窗口有10个时间插槽timeSlot, 每个timeSlot统计某个100ms的请求数量。每经过100ms,有一个新的timeSlot加入窗口,早于当前时间1s的timeSlot出窗口,窗口内最多维护10个time slot。
实现
import (
"sync"
"time"
)
type timeSlot struct {
startTime time.Time
count int32
}
type SlideWindowLimitRate struct {
rate int32
windowDuration time.Duration
slotDuration time.Duration
mu sync.Mutex
slotList []*timeSlot
}
type SlideWindowDuration struct {
windowDuration time.Duration
slotDuration time.Duration
}
func NewSlideWindowLimitRate(rate int32, slideWindowDuration *SlideWindowDuration) *SlideWindowLimitRate {
return &SlideWindowLimitRate{
rate: rate,
windowDuration: slideWindowDuration.windowDuration,
slotDuration: slideWindowDuration.slotDuration,
slotList: make([]*timeSlot, 0, slideWindowDuration.windowDuration/slideWindowDuration.slotDuration),
}
}
func (r *SlideWindowLimitRate) Acquire() bool {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
discardSlotIdx := -1
for i := range r.slotList {
slot := r.slotList[i]
if slot.startTime.Add(r.slotDuration).After(now) {
break
}
discardSlotIdx = i
}
if discardSlotIdx > -1 {
r.slotList = r.slotList[discardSlotIdx+1:]
}
var reqCount int32 = 0
for i := range r.slotList {
reqCount += r.slotList[i].count
}
if reqCount >= r.rate {
return false
}
if len(r.slotList) > 0 {
r.slotList[len(r.slotList)-1].count++
} else {
r.slotList = append(r.slotList, r.newTimeSlot(now))
}
return true
}
func (r *SlideWindowLimitRate) newTimeSlot(startTime time.Time) *timeSlot {
return &timeSlot{startTime: startTime, count: 1}
}
相对于固定窗口的改进
如上图
前一S的后200ms即800~1000ms来个100个请求,窗口内100个请求就满了
下一S的前200ms即1000~1200ms再来100个请求,就会被限流拒绝
漏桶
思想
想象有一个木桶,桶的容量是固定的。当有请求到来时先放到木桶中,处理请求的worker以固定的速度从木桶中取出请求进行相应。如果木桶已经满了,直接返回请求限流错误。
实现
import (
"math"
"sync"
"time"
)
type LeakBucketLimitRate struct {
rate float64 //每秒速度
capacity float64
mu sync.Mutex
lastLeakTime time.Time
waterCapacity float64
}
func NewLeadBucketLimitRate(rate float64, capacity float64) *LeakBucketLimitRate {
return &LeakBucketLimitRate{
rate: rate,
capacity: capacity,
lastLeakTime: time.Now(),
waterCapacity: 0,
}
}
func (r *LeakBucketLimitRate) Acquire() bool {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
remainWaterCap := math.Max(0, r.waterCapacity-(now.Sub(r.lastLeakTime).Seconds()*r.rate))
r.lastLeakTime = now
if remainWaterCap < r.capacity {
r.waterCapacity = remainWaterCap + 1
return true
}
return false
}
适用场景
如果需要以固定的速率处理请求,不接收并发的高流量的请求,那么适合适用漏桶算法
令牌桶
思想
想象有一个木桶,桶的容量是固定的。会以固定的速率往木桶中放入令牌,如果能从木桶中拿到令牌那么允许处理请求,否则返回限流错误。
实现
import (
"math"
"sync"
"time"
)
type TokenBucketLimitRate struct {
rate float64
capacity float64
mu sync.Mutex
tokenCount float64
lastAddTokenTime time.Time
}
func NewTokenBucketLimitRate(rate float64, capacity float64) *TokenBucketLimitRate {
return &TokenBucketLimitRate{
rate: rate,
capacity: capacity,
tokenCount: 0,
lastAddTokenTime: time.Now(),
}
}
func (r *TokenBucketLimitRate) Acquire() bool {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
addTokenNum := now.Sub(r.lastAddTokenTime).Seconds() * r.rate
r.tokenCount = math.Min(r.capacity, r.tokenCount+addTokenNum)
r.lastAddTokenTime = now
if r.tokenCount > 0 {
r.tokenCount--
return true
}
return false
}
适用场景
因为令牌桶可以缓存令牌,所以可以应对突发的高并发流量,比如木桶最多可以存储500个令牌,那么就可以最多处理500并发量的请求