计数器算法
计数器是一种最简单限流算法,其原理就是:在一段时间间隔内,对请求进行计数,与阀值进行比较判断是否需要限流,一旦到了时间临界点,将计数器清零
image.png

package main

import (
	"log"
	"sync"
	"time"
)

type Counter struct {
	rate int
	begin time.Time
	cycle time.Duration
	count int
	lock sync.Mutex
}

func (c *Counter) Set(r int, cycle time.Duration) {
	c.rate = r
	c.begin = time.Now()
	c.cycle = cycle
	c.count = 0
}

func (c *Counter) Reset(t time.Time) {
	c.begin = t
	c.count = 0
}

func (c *Counter) Allow() bool {
	c.lock.Lock()
	defer c.lock.Unlock()
	
	if c.count == c.rate-1 {
		now := time.Now()
		if now.Sub(c.begin) >= c.cycle {
			c.Reset(now)
			return true
		} else {
			return false
		}
	} else {
		c.count++
		return true
	}
}

func main() {
	var wg sync.WaitGroup
	var cr Counter
	cr.Set(3, time.Second)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		log.Println("创建请求:", i)
		go func(i int) {
			if cr.Allow() {
				log.Println("响应请求:", i)
			}
			wg.Done()
		}(i)
		time.Sleep(200 * time.Millisecond)
	}
	wg.Wait()
}

滑动窗口算法
滑动窗口是针对计数器存在的临界点缺陷,所谓 滑动窗口(Sliding window) 是一种流量控制技术,这个词出现在 TCP 协议中。滑动窗口把固定时间片进行划分,并且随着时间的流逝,进行移动,固定数量的可以移动的格子,进行计数并判断阀值

其实计数器就是滑动窗口啊,只不过只有一个格子而已,所以想让限流做的更精确只需要划分更多的格子就可以了,为了更精确我们也不知道到底该设置多少个格子,格子的数量影响着滑动窗口算法的精度,依然有时间片的概念,无法根本解决临界点问题

package main

import (
	"container/ring"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	limitCount int = 6
	limitBucket int = 6
	curCount int32 = 0
	head *ring.Ring
	wg sync.WaitGroup
)

func main() {
	head = ring.New(limitBucket)
	for i := 0; i < limitBucket; i++ {
		head.Value = 0
		head = head.Next()
	}
	
	// 滑动窗口
	go func() {
		//每秒滑动一个窗口
		timer := time.NewTimer(1 *  time.Second)
		for range timer.C {
			subCount := int32(0 - head.Value.(int))
			atomic.AddInt32(&curCount, subCount)
			
			head.Value = 0
			head = head.Next()
		}
	}()
	
	for i := 0; i < 10; i++ {
		fmt.Println("创建请求:", i)
		wg.Add(1)
		go func(i int) {
			n := atomic.AddInt32(&curCount, 1)
			if n > int32(limitCount) {
				atomic.AddInt32(&curCount, -1)
			} else {
				mu := sync.Mutex{}
				mu.Lock()
				pos := head.Prev()
				val := pos.Value.(int)
				val++
				pos.Value = val
				mu.Unlock()
				fmt.Println("响应请求:", i)
			}
			wg.Done()
		}(i)
		time.Sleep(200 * time.Millisecond)
	}
	wg.Wait()
}

漏桶算法
image.png

漏桶算法有以下特点:

  • 漏桶具有固定容量,出水速率是固定常量(流出请求)
  • 如果桶是空的,则不需流出水滴
  • 可以以任意速率流入水滴到漏桶(流入请求)
  • 如果流入水滴超出了桶的容量,则流入的水滴溢出(新请求被拒绝)

漏桶限制的是常量流出速率(即流出速率是一个固定常量值),所以最大的速率就是出水的速率,不能出现突发流量。

package main

import (
	"log"
	"math"
	"sync"
	"time"
)

type LeakyBucket struct {
	cap float64 //固定每秒出水速率
	rate float64 //桶的容量
	water float64 //桶中当前水量
	lastLeakMs int64 //桶上次漏水时间戳 ms
	
	lock sync.Mutex
}

func (l *LeakyBucket) Allow() bool {
	l.lock.Lock()
	defer l.lock.Unlock()
	
	now := time.Now().UnixNano() / 1e6
	eclipse := float64(now - l.lastLeakMs) * l.rate / 1000
	l.water = l.water - eclipse
	l.water = math.Max(0, l.water)
	l.lastLeakMs = now
	
	if (l.water + 1) < l.cap {
		l.water++
		return true
	} else {
		return false
	}
}

func (l *LeakyBucket) Set(r, c float64) {
	l.rate = r
	l.cap = c
	l.water = 0
	l.lastLeakMs = time.Now().UnixNano() / 1e6
}

func main() {
	var wg sync.WaitGroup
	var lb LeakyBucket
	lb.Set(2, 3)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		log.Println("创建请求:", i)
		go func(i int) {
			if lb.Allow() {
				log.Println("响应请求:", i)
			}
			wg.Done()
		}(i)
		time.Sleep(100 * time.Millisecond)
	}
	wg.Wait()
}

令牌桶算法
image.png

令牌桶有以下特点:

  • 令牌按固定的速率被放入令牌桶中
  • 桶中最多存放 B 个令牌,当桶满时,新添加的令牌被丢弃或拒绝
  • 如果桶中的令牌不足 N 个,则不会删除令牌,且请求将被限流(丢弃或阻塞等待)

令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌...),并允许一定程度突发流量

package main

import (
	"log"
	"sync"
	"time"
)

type TokenBucket struct {
	rate int64 //固定的token放入速率, r/s
	cap int64 //桶的容量
	tokens int64 //桶中当前token数量
	lastTokenSec int64 //桶上次放token的时间戳 s
	
	lock sync.Mutex
}

func (t *TokenBucket) Set(r, c int64) {
	t.rate = r
	t.cap = c
	t.lastTokenSec = time.Now().Unix()
}

func (t *TokenBucket) Allow() bool {
	t.lock.Lock()
	defer t.lock.Unlock()
	
	now := time.Now().Unix()
	t.tokens = t.tokens + (now - t.lastTokenSec) * t.rate
	if t.tokens > t.cap {
		t.tokens = t.cap
	}
	
	t.lastTokenSec = now
	if t.tokens > 0 {
		t.tokens--
		return true
	} else {
		return false
	}
}


func main() {
	var wg sync.WaitGroup
	var tb TokenBucket
	tb.Set(3, 3)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		log.Println("创建请求:", i)
		go func(i int) {
			if tb.Allow() {
				log.Println("响应请求:", i)
			}
			wg.Done()
		}(i)
		time.Sleep(100 * time.Millisecond)
	}
	wg.Wait()
}