流量控制基本是《微服务》和高并发系统设计的入门课,即便是在早期各种负载均衡和网络组件(如nginx、iptable、TC)都有提供基础的QPS限制能力,如今演进到微服务框架、Sentinel、Service Mesh和Serverless都已经具备完备的配置化的限流的能力已经能够满足大多数场景了,但如果我们在一些不以服务作为颗粒的方式可能就不太适用了,比如以下几个场景:
常用限流主要就是滑动窗口和令牌桶,本文基于golang官方包的time/rate和go-zero中的TokenLimit、PeriodLimit来介绍如何使用
限流器 | 存储介质 | 执行效率 | 突发流量 | wait支持 |
---|---|---|---|---|
time/rate | 内存 | 高 | 支持 | 支持 |
PeriodLimit | redis | 一般 | 支持 | 不支持 |
TokenLimit | redis | 一般 | 不支持 | 不支持 |
突发流量指的是:当流量有一个小高峰,因为令牌桶(TokenLimit)当前桶中存在一定的token每秒有在生成token所以如果是一个10容量10并发的桶,第一秒能够支持20QPS,而滑动窗口(PeriodLimit)可以理解成一个没有容量的桶只能现用现生产。
wait支持指的是:阻塞程序执行等待token的生成而不是立马返回失败,比如队列限流使用场景下就比较合适,异步时间不敏感类型。
PS:go-zero提供的令牌桶当redis故障是会切换到内存令牌桶time/rate来降低影响,但是会影响限流的精准性
令牌桶基本原理:
为了保证原子性基于redis实现分布式令牌桶,lua脚本详解:
-- 每秒生成token数量即token生成速度
local rate = tonumber(ARGV[1])
-- 桶容量
local capacity = tonumber(ARGV[2])
-- 当前时间戳
local now = tonumber(ARGV[3])
-- 当前请求token数量
local requested = tonumber(ARGV[4])
-- 需要多少秒才能填满桶
local fill_time = capacity/rate
-- 向下取整,ttl为填满时间的2倍
local ttl = math.floor(fill_time*2)
-- 当前时间桶容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
-- 如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量
if last_tokens == nil then
last_tokens = capacity
end
-- 上一次刷新的时间
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
-- 第一次进入则设置刷新时间为0
if last_refreshed == nil then
last_refreshed = 0
end
-- 距离上次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
-- 距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的token
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 本次请求token数量是否足够
local allowed = filled_tokens >= requested
-- 桶剩余数量
local new_tokens = filled_tokens
-- 允许本次token申请,计算剩余数量
if allowed then
new_tokens = filled_tokens - requested
end
-- 设置剩余token数量
redis.call("setex", KEYS[1], ttl, new_tokens)
-- 设置刷新时间
redis.call("setex", KEYS[2], ttl, now)
return allowed
go-zero的限流相关组件在 core/limit 包下面
time/rate
func Test_TimeRate(t *testing.T) {
// New tokenLimiter
limiter := rate.NewLimiter(10, 100)
timer := time.NewTimer(time.Second * 10)
quit := make(chan struct{})
defer timer.Stop()
go func() {
<-timer.C
close(quit)
}()
var allowed, denied int32
var wait sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wait.Add(1)
go func() {
for {
select {
case <-quit:
wait.Done()
return
default:
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
err := limiter.Wait(ctx)
if err == nil {
atomic.AddInt32(&allowed, 1)
} else {
fmt.Println(err)
atomic.AddInt32(&denied, 1)
}
cancel()
}
}
}()
}
wait.Wait()
fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/10)
}
TokenLimit
func Test_TokenLimit(t *testing.T) {
s, err := miniredis.Run()
assert.Nil(t, err)
//store := redis.New("localhost:6379")
store := redis.New(s.Addr())
const (
burst = 100
rate = 10
seconds = 10
)
fmt.Println(store.Ping())
// New tokenLimiter
limiter := limit.NewTokenLimiter(rate, burst, store, "rate-test")
timer := time.NewTimer(time.Second * seconds)
quit := make(chan struct{})
defer timer.Stop()
go func() {
<-timer.C
close(quit)
}()
var allowed, denied int32
var wait sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wait.Add(1)
go func() {
for {
select {
case <-quit:
wait.Done()
return
default:
if limiter.Allow() {
atomic.AddInt32(&allowed, 1)
} else {
atomic.AddInt32(&denied, 1)
}
}
}
}()
}
wait.Wait()
fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/seconds)
}
PeriodLimit
func Test_PeriodLimit(t *testing.T) {
s, err := miniredis.Run()
assert.Nil(t, err)
//store := redis.New("localhost:6379")
store := redis.New(s.Addr())
const (
seconds = 1
total = 100
quota = 5
)
l := limit.NewPeriodLimit(seconds, quota, store, "periodlimit")
var allowed, hitQuota, overQuota int
for i := 0; i < total; i++ {
val, err := l.Take("first")
if err != nil {
t.Error(err)
}
switch val {
case limit.Allowed:
allowed++
case limit.HitQuota:
hitQuota++
case limit.OverQuota:
overQuota++
default:
t.Error("unknown status")
}
}
assert.Equal(t, quota-1, allowed)
assert.Equal(t, 1, hitQuota)
assert.Equal(t, total-quota, overQuota)
}