计数器算法
计数器是一种最简单限流算法,其原理就是:在一段时间间隔内,对请求进行计数,与阀值进行比较判断是否需要限流,一旦到了时间临界点,将计数器清零
package main
import (
"log"
"sync"
"time"
)
type Counter struct {
rate int
begin time.Time
cycle time.Duration
count int
lock sync.Mutex
}
func (c *Counter) Set(r int, cycle time.Duration) {
c.rate = r
c.begin = time.Now()
c.cycle = cycle
c.count = 0
}
func (c *Counter) Reset(t time.Time) {
c.begin = t
c.count = 0
}
func (c *Counter) Allow() bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.count == c.rate-1 {
now := time.Now()
if now.Sub(c.begin) >= c.cycle {
c.Reset(now)
return true
} else {
return false
}
} else {
c.count++
return true
}
}
func main() {
var wg sync.WaitGroup
var cr Counter
cr.Set(3, time.Second)
for i := 0; i < 10; i++ {
wg.Add(1)
log.Println("创建请求:", i)
go func(i int) {
if cr.Allow() {
log.Println("响应请求:", i)
}
wg.Done()
}(i)
time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}
滑动窗口算法
滑动窗口是针对计数器存在的临界点缺陷,所谓 滑动窗口(Sliding window) 是一种流量控制技术,这个词出现在 TCP 协议中。滑动窗口把固定时间片进行划分,并且随着时间的流逝,进行移动,固定数量的可以移动的格子,进行计数并判断阀值
其实计数器就是滑动窗口啊,只不过只有一个格子而已,所以想让限流做的更精确只需要划分更多的格子就可以了,为了更精确我们也不知道到底该设置多少个格子,格子的数量影响着滑动窗口算法的精度,依然有时间片的概念,无法根本解决临界点问题
package main
import (
"container/ring"
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
limitCount int = 6
limitBucket int = 6
curCount int32 = 0
head *ring.Ring
wg sync.WaitGroup
)
func main() {
head = ring.New(limitBucket)
for i := 0; i < limitBucket; i++ {
head.Value = 0
head = head.Next()
}
// 滑动窗口
go func() {
//每秒滑动一个窗口
timer := time.NewTimer(1 * time.Second)
for range timer.C {
subCount := int32(0 - head.Value.(int))
atomic.AddInt32(&curCount, subCount)
head.Value = 0
head = head.Next()
}
}()
for i := 0; i < 10; i++ {
fmt.Println("创建请求:", i)
wg.Add(1)
go func(i int) {
n := atomic.AddInt32(&curCount, 1)
if n > int32(limitCount) {
atomic.AddInt32(&curCount, -1)
} else {
mu := sync.Mutex{}
mu.Lock()
pos := head.Prev()
val := pos.Value.(int)
val++
pos.Value = val
mu.Unlock()
fmt.Println("响应请求:", i)
}
wg.Done()
}(i)
time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}
漏桶算法
漏桶算法有以下特点:
- 漏桶具有固定容量,出水速率是固定常量(流出请求)
- 如果桶是空的,则不需流出水滴
- 可以以任意速率流入水滴到漏桶(流入请求)
- 如果流入水滴超出了桶的容量,则流入的水滴溢出(新请求被拒绝)
漏桶限制的是常量流出速率(即流出速率是一个固定常量值),所以最大的速率就是出水的速率,不能出现突发流量。
package main
import (
"log"
"math"
"sync"
"time"
)
type LeakyBucket struct {
cap float64 //固定每秒出水速率
rate float64 //桶的容量
water float64 //桶中当前水量
lastLeakMs int64 //桶上次漏水时间戳 ms
lock sync.Mutex
}
func (l *LeakyBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()
now := time.Now().UnixNano() / 1e6
eclipse := float64(now - l.lastLeakMs) * l.rate / 1000
l.water = l.water - eclipse
l.water = math.Max(0, l.water)
l.lastLeakMs = now
if (l.water + 1) < l.cap {
l.water++
return true
} else {
return false
}
}
func (l *LeakyBucket) Set(r, c float64) {
l.rate = r
l.cap = c
l.water = 0
l.lastLeakMs = time.Now().UnixNano() / 1e6
}
func main() {
var wg sync.WaitGroup
var lb LeakyBucket
lb.Set(2, 3)
for i := 0; i < 10; i++ {
wg.Add(1)
log.Println("创建请求:", i)
go func(i int) {
if lb.Allow() {
log.Println("响应请求:", i)
}
wg.Done()
}(i)
time.Sleep(100 * time.Millisecond)
}
wg.Wait()
}
令牌桶算法
令牌桶有以下特点:
- 令牌按固定的速率被放入令牌桶中
- 桶中最多存放 B 个令牌,当桶满时,新添加的令牌被丢弃或拒绝
- 如果桶中的令牌不足 N 个,则不会删除令牌,且请求将被限流(丢弃或阻塞等待)
令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌...),并允许一定程度突发流量
package main
import (
"log"
"sync"
"time"
)
type TokenBucket struct {
rate int64 //固定的token放入速率, r/s
cap int64 //桶的容量
tokens int64 //桶中当前token数量
lastTokenSec int64 //桶上次放token的时间戳 s
lock sync.Mutex
}
func (t *TokenBucket) Set(r, c int64) {
t.rate = r
t.cap = c
t.lastTokenSec = time.Now().Unix()
}
func (t *TokenBucket) Allow() bool {
t.lock.Lock()
defer t.lock.Unlock()
now := time.Now().Unix()
t.tokens = t.tokens + (now - t.lastTokenSec) * t.rate
if t.tokens > t.cap {
t.tokens = t.cap
}
t.lastTokenSec = now
if t.tokens > 0 {
t.tokens--
return true
} else {
return false
}
}
func main() {
var wg sync.WaitGroup
var tb TokenBucket
tb.Set(3, 3)
for i := 0; i < 10; i++ {
wg.Add(1)
log.Println("创建请求:", i)
go func(i int) {
if tb.Allow() {
log.Println("响应请求:", i)
}
wg.Done()
}(i)
time.Sleep(100 * time.Millisecond)
}
wg.Wait()
}