• Go 限流控制《滑动窗口&令牌桶》:time/rate、TokenLimit、PeriodLimit


    一、前言

    流量控制基本是《微服务》和高并发系统设计的入门课,即便是在早期各种负载均衡和网络组件(如nginx、iptable、TC)都有提供基础的QPS限制能力,如今演进到微服务框架、Sentinel、Service Mesh和Serverless都已经具备完备的配置化的限流的能力已经能够满足大多数场景了,但如果我们在一些不以服务作为颗粒的方式可能就不太适用了,比如以下几个场景:

    • 调用外部三方服务存在频率限制
    • 队列消费控速
      • 设置最大消费线程数、每次拉取消息条数拉取间隔时间
      • 这样能够能精准控制队列的处理频率吗?(有些主流MQ中间件SDK有实现匀速器,如RocketMQ,但是单点限流不能多消费者分布式共享)
    • 当被超过频率限制时执行一段兜底逻辑

    常用限流主要就是滑动窗口和令牌桶,本文基于golang官方包的time/rate和go-zero中的TokenLimit、PeriodLimit来介绍如何使用

    二、time/rate、TokenLimit、PeriodLimit差异

    限流器存储介质执行效率突发流量wait支持
    time/rate内存支持支持
    PeriodLimitredis一般支持不支持
    TokenLimitredis一般不支持不支持

    突发流量指的是:当流量有一个小高峰,因为令牌桶(TokenLimit)当前桶中存在一定的token每秒有在生成token所以如果是一个10容量10并发的桶,第一秒能够支持20QPS,而滑动窗口(PeriodLimit)可以理解成一个没有容量的桶只能现用现生产。

    wait支持指的是:阻塞程序执行等待token的生成而不是立马返回失败,比如队列限流使用场景下就比较合适,异步时间不敏感类型。

    PS:go-zero提供的令牌桶当redis故障是会切换到内存令牌桶time/rate来降低影响,但是会影响限流的精准性

    令牌桶基本原理:

    • 单位时间按照一定速率匀速的生产 token 放入桶内,直到达到桶容量上限。
    • 处理请求,每次尝试获取一个或多个令牌,如果拿到则处理请求,失败则拒绝请求。
      在这里插入图片描述

    为了保证原子性基于redis实现分布式令牌桶,lua脚本详解:

    -- 每秒生成token数量即token生成速度
    local rate = tonumber(ARGV[1])
    -- 桶容量
    local capacity = tonumber(ARGV[2])
    -- 当前时间戳
    local now = tonumber(ARGV[3])
    -- 当前请求token数量
    local requested = tonumber(ARGV[4])
    -- 需要多少秒才能填满桶
    local fill_time = capacity/rate
    -- 向下取整,ttl为填满时间的2倍
    local ttl = math.floor(fill_time*2)
    -- 当前时间桶容量
    local last_tokens = tonumber(redis.call("get", KEYS[1]))
    -- 如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量
    if last_tokens == nil then
    last_tokens = capacity
    end
    -- 上一次刷新的时间
    local last_refreshed = tonumber(redis.call("get", KEYS[2]))
    -- 第一次进入则设置刷新时间为0
    if last_refreshed == nil then
    last_refreshed = 0
    end
    -- 距离上次请求的时间跨度
    local delta = math.max(0, now-last_refreshed)
    -- 距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的token
    local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
    -- 本次请求token数量是否足够
    local allowed = filled_tokens >= requested
    -- 桶剩余数量
    local new_tokens = filled_tokens
    -- 允许本次token申请,计算剩余数量
    if allowed then
    new_tokens = filled_tokens - requested
    end
    -- 设置剩余token数量
    redis.call("setex", KEYS[1], ttl, new_tokens)
    -- 设置刷新时间
    redis.call("setex", KEYS[2], ttl, now)
    
    return allowed
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    四、例子🌰

    go-zero的限流相关组件在 core/limit 包下面

    time/rate

    func Test_TimeRate(t *testing.T) {
    
    	// New tokenLimiter
    	limiter := rate.NewLimiter(10, 100)
    	timer := time.NewTimer(time.Second * 10)
    	quit := make(chan struct{})
    	defer timer.Stop()
    	go func() {
    		<-timer.C
    		close(quit)
    	}()
    
    	var allowed, denied int32
    	var wait sync.WaitGroup
    	for i := 0; i < runtime.NumCPU(); i++ {
    		wait.Add(1)
    		go func() {
    			for {
    				select {
    				case <-quit:
    					wait.Done()
    					return
    				default:
    					ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
    					err := limiter.Wait(ctx)
    					if err == nil {
    						atomic.AddInt32(&allowed, 1)
    					} else {
    						fmt.Println(err)
    						atomic.AddInt32(&denied, 1)
    					}
    					cancel()
    				}
    			}
    		}()
    	}
    
    	wait.Wait()
    	fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/10)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    TokenLimit

    func Test_TokenLimit(t *testing.T) {
    	s, err := miniredis.Run()
    	assert.Nil(t, err)
    
    	//store := redis.New("localhost:6379")
    	store := redis.New(s.Addr())
    
    	const (
    		burst   = 100
    		rate    = 10
    		seconds = 10
    	)
    
    	fmt.Println(store.Ping())
    	// New tokenLimiter
    	limiter := limit.NewTokenLimiter(rate, burst, store, "rate-test")
    	timer := time.NewTimer(time.Second * seconds)
    	quit := make(chan struct{})
    	defer timer.Stop()
    	go func() {
    		<-timer.C
    		close(quit)
    	}()
    
    	var allowed, denied int32
    	var wait sync.WaitGroup
    	for i := 0; i < runtime.NumCPU(); i++ {
    		wait.Add(1)
    		go func() {
    			for {
    				select {
    				case <-quit:
    					wait.Done()
    					return
    				default:
    					if limiter.Allow() {
    						atomic.AddInt32(&allowed, 1)
    					} else {
    						atomic.AddInt32(&denied, 1)
    					}
    				}
    			}
    		}()
    	}
    
    	wait.Wait()
    	fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/seconds)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    PeriodLimit

    func Test_PeriodLimit(t *testing.T) {
    
    	s, err := miniredis.Run()
    	assert.Nil(t, err)
    
    	//store := redis.New("localhost:6379")
    	store := redis.New(s.Addr())
    
    	const (
    		seconds = 1
    		total   = 100
    		quota   = 5
    	)
    	l := limit.NewPeriodLimit(seconds, quota, store, "periodlimit")
    	var allowed, hitQuota, overQuota int
    	for i := 0; i < total; i++ {
    		val, err := l.Take("first")
    		if err != nil {
    			t.Error(err)
    		}
    		switch val {
    		case limit.Allowed:
    			allowed++
    		case limit.HitQuota:
    			hitQuota++
    		case limit.OverQuota:
    			overQuota++
    		default:
    			t.Error("unknown status")
    		}
    	}
    
    	assert.Equal(t, quota-1, allowed)
    	assert.Equal(t, 1, hitQuota)
    	assert.Equal(t, total-quota, overQuota)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    【大数据技术】Spark-SQL如何连接MySQL并读写数据
    上海站活动回顾 | 聚焦私募视野,助力量化投研交易
    在C#中使用NModbus4通信库执行【读】操作
    驱动开发:内核枚举IoTimer定时器
    最新ACR15.0新功能如何使用?ps插件camera raw15.0mac版新功能教程
    物流职业怎么运用自动化员工保证清算效率
    JS的装箱和拆箱
    苏宁使用API接口范例讲解
    实践一个Vue 3 + TypeScript + Vite + Pinia项目
    Python入门学习15(面向对象)
  • 原文地址:https://blog.csdn.net/u011142688/article/details/126683615