• Go 限流器使用


    限流器是后台服务中的非常重要的组件,可以用来限制请求速率,保护服务,以免服务过载。
    限流器的实现方法有很多种,常见的有计数器算法,滑动窗口法、Token Bucket(令牌桶算法)、Leaky Bucket(漏桶算法) 等。

    限流算法选择

    限流算法的简单对比,更加详细的可以看服务端接口限流设计

    算法描述特点go实现备注
    计数器算法在一定的时间间隔里,记录请求次数,当请求次数超过该时间限制时,就把计数器清零,然后重新计算实现简单,存在突发流量-
    滑动窗口算法
    滑动窗口算法是将时间周期分为N个小周期,分别记录每个小周期内访问次数,并且根据时间滑动删除过期的小周期计数器算法的优化,减少突发流量-https://juejin.cn/post/7043640933199380487
    令牌桶算法系统匀速的产生令牌存放到令牌桶中;令牌桶的容量固定,当令牌桶填满后,再放入其中的令牌会被丢弃;每个请求从令牌桶中获取令牌,如果获取成功则处理请求,如果失败则丢弃请求允许突发流量https://pkg.go.dev/golang.org/x/time/rate
    漏桶算法请求先进入到漏桶里,漏桶以一定的速度流出请求缓存请求,匀速处理https://github.com/uber-go/ratelimit

    本文主要着重于限流算法的实现或者已有实现的分析

    单机限流实现

    Golang 计数器算法

    在Go语言中,可以使用计数器算法实现计数器限流。计数器限流是根据单位时间内的请求数量来控制请求的通过与拒绝。

    package main
     
    import (
        "fmt"
        "sync"
        "time"
    )
     
    type CounterLimiter struct {
        rate       int          // 每秒最大请求数量
        interval   time.Duration // 时间窗口大小
        counter    int          // 当前时间窗口内的请求数量
        lastUpdate time.Time    // 上次更新时间
        mu         sync.Mutex   // 互斥锁
    }
     
    func NewCounterLimiter(rate int, interval time.Duration) *CounterLimiter {
        return &CounterLimiter{
            rate:       rate,
            interval:   interval,
            counter:    0,
            lastUpdate: time.Now(),
        }
    }
     
    func (cl *CounterLimiter) Allow() bool {
        cl.mu.Lock()
        defer cl.mu.Unlock()
     
        now := time.Now()
        elapsed := now.Sub(cl.lastUpdate)
     
        // 如果超过时间窗口,重置计数器
        if elapsed >= cl.interval {
            cl.counter = 0
            cl.lastUpdate = now
        }
     
        // 判断当前时间窗口内的请求数量是否超过限制
        if cl.counter < cl.rate {
            cl.counter++
            return true
        }
     
        return false
    }
     
    func main() {
        limiter := NewCounterLimiter(10, time.Second) // 每秒最多处理10个请求
     
        // 模拟请求
        for i := 1; i <= 15; i++ {
            if limiter.Allow() {
                fmt.Println("Request", i, "allowed")
            } else {
                fmt.Println("Request", i, "rejected")
            }
            time.Sleep(200 * time.Millisecond)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    Golang 滑动窗口算法

    在Go语言中,可以使用滑动窗口算法实现限流。滑动窗口算法基于一个固定的时间窗口和一个滑动的窗口,用于控制在该时间窗口内的请求数量。可以根据当前时间和请求的到达时间来判断是否允许通过请求。

    package main
     
    import (
        "fmt"
        "sync"
        "time"
    )
     
    func main() {
        newWindow := func() Window {
            return NewLocalWindow()
        }
        limit := NewLimiter(time.Second, 1, newWindow)
        // 模拟请求
        for i := 1; i <= 15; i++ {
            if limit.Allow() {
                fmt.Println("Request", i, "allowed")
            } else {
                fmt.Println("Request", i, "rejected")
            }
            time.Sleep(1 * time.Millisecond)
        }
    }
     
    // Window ...
    type Window interface {
        Start() time.Time
        Count() int64
        AddCount(n int64)
        Reset(s time.Time, c int64)
    }
     
    type LocalWindow struct {
        // 窗口的开始边界(以纳秒为单位的时间戳)。
        // [start, start + size)
        start int64
     
        // 窗口中发生的事件总数。
        count int64
    }
     
    func NewLocalWindow() *LocalWindow {
        return &LocalWindow{}
    }
     
    func (w *LocalWindow) Start() time.Time {
        return time.Unix(0, w.start)
    }
     
    func (w *LocalWindow) Count() int64 {
        return w.count
    }
     
    func (w *LocalWindow) AddCount(n int64) {
        w.count += n
    }
     
    func (w *LocalWindow) Reset(s time.Time, c int64) {
        w.start = s.UnixNano()
        w.count = c
    }
     
    type Limiter struct {
        size  time.Duration // window size
        limit int64         // maximum events
     
        mu sync.Mutex
     
        curr Window
        prev Window
    }
     
    type NewWindow func() Window
     
    // NewLimiter returns a new limiter.
    func NewLimiter(size time.Duration, limit int64, newWindow NewWindow) *Limiter {
        currWin := newWindow()
        prevWin := NewLocalWindow()
        lim := &Limiter{
            size:  size,
            limit: limit,
            curr:  currWin,
            prev:  prevWin,
        }
        return lim
    }
     
    // Size 返回一个窗口大小的持续时间
    func (lim *Limiter) Size() time.Duration {
        return lim.size
    }
     
    // Limit 返回在一个窗口大小期间允许发生的最大事件数。
    func (lim *Limiter) Limit() int64 {
        lim.mu.Lock()
        defer lim.mu.Unlock()
        return lim.limit
    }
     
    // SetLimit sets a new Limit for the limiter.
    func (lim *Limiter) SetLimit(newLimit int64) {
        lim.mu.Lock()
        defer lim.mu.Unlock()
        lim.limit = newLimit
    }
     
    // Allow is shorthand for AllowN(time.Now(), 1).
    func (lim *Limiter) Allow() bool {
        return lim.AllowN(time.Now(), 1)
    }
     
    // AllowN reports whether n events may happen at time now.
    func (lim *Limiter) AllowN(now time.Time, n int64) bool {
        lim.mu.Lock()
        defer lim.mu.Unlock()
     
        lim.advance(now)
     
        // 计算跟当前window的时间差
        elapsed := now.Sub(lim.curr.Start())
        // 计算涵盖了多少个 window
        weight := float64(lim.size-elapsed) / float64(lim.size)
        count := int64(weight*float64(lim.prev.Count())) + lim.curr.Count()
        if count+n > lim.limit {
            return false
        }
     
        lim.curr.AddCount(n)
        return true
    }
     
    //  advance 计算是否要将 当前的 window 移动到前一个 window
    func (lim *Limiter) advance(now time.Time) {
        newCurrStart := now.Truncate(lim.size)
        diffWindowSize := newCurrStart.Sub(lim.curr.Start()) / lim.size
        // 已经超过一个 window 大小了,这个时候就要将当前的 window 移动到前一个 window
        if diffWindowSize >= 1 {
            newPrevCount := int64(0)
            // 刚好是一个window的大小
            if diffWindowSize == 1 {
                newPrevCount = lim.curr.Count()
            }
            // 将当前的 window 移动到前一个 window
            lim.prev.Reset(newCurrStart.Add(-lim.size), newPrevCount)
            lim.curr.Reset(newCurrStart, 0)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147

    Golang time/rate 使用介绍

    该限流器是基于 Token Bucket(令牌桶) 实现的。简单来说,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放。
    而用户则从桶中取 Token,如果有剩余 Token 就可以一直取。如果没有剩余 Token,则需要等到系统中被放置了 Token 才行。

    package main
     
    import (
        "context"
        "fmt"
        "time"
     
        "golang.org/x/time/rate"
    )
     
    func main() {
        // 创建一个每秒最多处理 2 个事件的限流器
        limiter := rate.NewLimiter(2, 1)
     
        // 模拟处理一系列事件
        events := []string{"event1", "event2", "event3", "event4", "event5"}
     
        for _, event := range events {
            // 等待直到获取到处理事件的令牌
            if err := limiter.Wait(context.Background()); err != nil {
                fmt.Println("Rate limit exceeded. Waiting...")
            }
     
            // 处理事件
            processEvent(event)
        }
    }
     
    func processEvent(event string) {
        fmt.Printf("Processing event: %s\n", event)
        time.Sleep(1 * time.Second)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    核心代码分析

    WaitN

    Wait 方法消费 Token 时,如果此时桶内 Token 数组不足 ( 小于 n ),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件。如果充足则直接返回。

    // WaitN blocks until lim permits n events to happen.
    // It returns an error if n exceeds the Limiter's burst size, the Context is
    // canceled, or the expected wait time exceeds the Context's Deadline.
    // The burst limit is ignored if the rate limit is Inf.
    func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
        lim.mu.Lock()
        burst := lim.burst
        limit := lim.limit
        lim.mu.Unlock()
     
        if n > burst && limit != Inf {
            return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
        }
        // Check if ctx is already cancelled
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        // Determine wait limit
        now := time.Now()
        waitLimit := InfDuration
       // 计算能等待多长时间
        if deadline, ok := ctx.Deadline(); ok {
            waitLimit = deadline.Sub(now)
        }
        // reserveN 计算n 个token 需要多长时间拿到
        r := lim.reserveN(now, n, waitLimit)
        if !r.ok {
            return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
        }
        // Wait if necessary
        delay := r.DelayFrom(now)
        if delay == 0 {
            return nil
        }
        t := time.NewTimer(delay)
        defer t.Stop()
        select {
        case <-t.C:
            // We can proceed.
            return nil
        case <-ctx.Done():
            // Context was canceled before we could proceed.  Cancel the
            // reservation, which may permit other events to proceed sooner.
            r.Cancel()
            return ctx.Err()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    reserveN
    // reserveN is a helper method for AllowN, ReserveN, and WaitN.
    // maxFutureReserve specifies the maximum reservation wait duration allowed.
    // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
    func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
        lim.mu.Lock()
     
        if lim.limit == Inf {
            lim.mu.Unlock()
            return Reservation{
                ok:        true,
                lim:       lim,
                tokens:    n,
                timeToAct: now,
            }
        }
     
         // 拿到截至 now 时间时
        // 可以获取的令牌 tokens 数量及上一次拿走令牌的时间 last  
         now, last, tokens := lim.advance(now)
     
        // Calculate the remaining number of tokens resulting from the request.
        tokens -= float64(n)
     
        // Calculate the wait duration
        var waitDuration time.Duration
        if tokens < 0 {
            waitDuration = lim.limit.durationFromTokens(-tokens)
        }
     
        // Decide result
        ok := n <= lim.burst && waitDuration <= maxFutureReserve
     
        // Prepare reservation
        r := Reservation{
            ok:    ok,
            lim:   lim,
            limit: lim.limit,
        }
        if ok {
            r.tokens = n
            r.timeToAct = now.Add(waitDuration)
        }
     
        // Update state
        if ok {
            lim.last = now
            lim.tokens = tokens
            lim.lastEvent = r.timeToAct
        } else {
            lim.last = last
        }
     
        lim.mu.Unlock()
        return r
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    Golang uber-go/ratelimit 使用介绍

    uber-go/ratelimit 是一个漏桶限流器的实现,

    package main
     
    import (
        "fmt"
        "time"
     
        "go.uber.org/ratelimit"
    )
     
    func main() {
        rl := ratelimit.New(100) // per second
     
        prev := time.Now()
        for i := 0; i < 10; i++ {
            now := rl.Take()
            fmt.Println(i, now.Sub(prev))
            prev = now
        }
    }
     
     
    // 执行结果
    0 0s
    1 10ms
    2 10ms
    3 10ms
    4 10ms
    5 10ms
    6 10ms
    7 10ms
    8 10ms
    9 10ms
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这个例子中,我们给定限流器每秒可以通过 100 个请求,也就是平均每个请求间隔 10ms。

    要实现以上每秒固定速率的目的,其实还是比较简单的。

    在 ratelimit 的 New 函数中,传入的参数是每秒允许请求量 (RPS)。
    我们可以很轻易的换算出每个请求之间的间隔:

    perRequest := config.per / time.Duration(rate)
    config.per 默认是 time.Second

    限流实现

    限流的实现主要是通过 Take() 方法来做的。

    // Take blocks to ensure that the time spent between multiple
    // Take calls is on average time.Second/rate.
    func (t *atomicInt64Limiter) Take() time.Time {
        var (
            newTimeOfNextPermissionIssue int64
            now                          int64
        )
        for {
            // 获取到当前的时间
            now = t.clock.Now().UnixNano()
            // 拿到上一次通过限流器的时间
            timeOfNextPermissionIssue := atomic.LoadInt64(&t.state)
     
            switch {
             // 第一次 或者 当前的时间和上一次的时间间隔 > t.perRequest
            case timeOfNextPermissionIssue == 0 || (t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest)):
                // if this is our first call or t.maxSlack == 0 we need to shrink issue time to now
                newTimeOfNextPermissionIssue = now
            // t.maxSlack:最大松弛量
            case t.maxSlack > 0 && now-timeOfNextPermissionIssue > int64(t.maxSlack):
                // a lot of nanoseconds passed since the last Take call
                // we will limit max accumulated time to maxSlack
                // 这里把下一次可以通过的时间提前计算
                newTimeOfNextPermissionIssue = now - int64(t.maxSlack)
            default:
                // calculate the time at which our permission was issued
                newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)
            }
            // 通过 cas 来计算是否拿到当前的时间可以通过了
            if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {
                break
            }
        }
        // 计算等待的时间
        sleepDuration := time.Duration(newTimeOfNextPermissionIssue - now)
        if sleepDuration > 0 {
            //等待时间
            t.clock.Sleep(sleepDuration)
            return time.Unix(0, newTimeOfNextPermissionIssue)
        }
        // return now if we don't sleep as atomicLimiter does
        return time.Unix(0, now)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    最大松弛量

    Leaky Bucket 每个请求的间隔是固定的,然而在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。这个最大松弛量可以把两个请求间隔多等待出来的时间挪到给下一个请求使用。保证每秒请求数 (QPS) 即可。

    分布式限流实现

    分布式限流我们一般使用Redis + lua 来实现

    基于Redis+Lua: 令牌桶算法 限流方案

    这段代码是Lua脚本,用于实现令牌桶算法的限流功能

    local tokens_key = KEYS[1]
    local timestamp_key = KEYS[2]
     
    local rate = tonumber(ARGV[1])
    local capacity = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])
    local requested = tonumber(ARGV[4])
     
    // 计算填充桶所需的时间,即桶的容量除以速率。
    local fill_time = capacity/rate
    // 计算桶的生存时间,即填充时间的两倍取整
    local ttl = math.floor(fill_time*2)
    // 从Redis中获取上一次剩余的令牌数量,如果没有则默认为桶的容量。
    local last_tokens = tonumber(redis.call("get", tokens_key))
    if last_tokens == nil then
      last_tokens = capacity
    end
    // 从Redis中获取上一次刷新令牌的时间戳,如果没有则默认为0。
    local last_refreshed = tonumber(redis.call("get", timestamp_key))
    if last_refreshed == nil then
      last_refreshed = 0
    end
     
    // 计算两次刷新令牌之间的时间间隔。
    local delta = math.max(0, now-last_refreshed)
    // 计算当前时刻桶中的令牌数量,不能超过桶的容量。
    local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
    // 判断请求的令牌数量是否小于等于桶中现有的令牌数量,如果是则允许通过。
    local allowed = filled_tokens >= requested
    // 将当前时刻桶中的令牌数量赋值给新的令牌数量变量。
    local new_tokens = filled_tokens
    // 如果允许通过,则将新的令牌数量减去请求的令牌数量。
    if allowed then
      new_tokens = filled_tokens - requested
    end
     
    // 将新的令牌数量存储到Redis中,并设置存储的过期时间为桶的生存时间。
    redis.call("setex", tokens_key, ttl, new_tokens)
    // 将当前时刻的时间戳存储到Redis中,并设置存储的过期时间为桶的生存时间。
    redis.call("setex", timestamp_key, ttl, now)
     
    // 返回一个包含允许通过的标志和新的令牌数量的数组。
    return { allowed, new_tokens }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
  • 相关阅读:
    OpenGLES:绘制一个混色旋转的3D圆柱
    Kubernetes禁止调度
    乙炔黑-离子液体复合修饰玻碳电极(AB-ILs/GCE)|离子液体修饰改性的多壁碳纳米管(MWNTs)
    唐人街徒步:在异国情调的纽约感受浓厚的中式气息
    Swift-30-高级编程-类型扩展和协议扩展
    军队状态出现的六种结果,是将帅的过失
    算法工程师面经汇总
    Kotlin委托Delegate托管by
    八、Vue3基础之八
    线程与线程锁
  • 原文地址:https://blog.csdn.net/baidu_32452525/article/details/133647411