• 打造千万级流量秒杀第二十六课 限流器:如何实现熔断器和限流器防止宕机和雪崩?


    前面我介绍过,熔断器和限流器是高并发下保障服务稳定运行的有效手段。虽然各语言实现逻辑差不多,但由于特性不同,具体的实现也就有所不同。比如 Go 语言中的 Channel、 Goroutine、Atomic 很强大,你可以用它们来实现熔断器和限流器,但这并不意味着你用它们的时候会很顺利。相反,在没有充分了解它们的特点时,你可能会遇到各种坑。

    接下来,我给你详细介绍下 Go 语言的一些高级编程技巧,以及如何用它们来实现熔断器和限流器。

    熔断器

    实现熔断器的基本思路是:

    1. 划分一个时间窗口,如 100 毫秒,并设置判断条件,如失败率超过 5%、请求数超过 1000 等;

    2. 在请求进来的时候,判断条件是否满足熔断条件,如果满足就拒绝请求,如果不满足就继续处理请求;

    3. 请求处理完后,统计时间窗口内请求失败率、延迟不达标率、请求数等指标,以便作为后续请求的判断条件。

    熔断器中最关键的部分是计数器和判断条件,这是因为它们为熔断器提供了判断依据。所以,为了实现计数器,我基于 int64 定义了一个 Counter 类型,并用原子操作为它实现了用于自增的 Add 方法、用于获取当前计数的 Load 方法、用于重置的 Reset 方法。具体代码如下:

    type Counter int64
    func (c *Counter) Add() int64 {
       return atomic.AddInt64((*int64)(c), 1)
    }
    func (c *Counter) Load() int64 {
       return atomic.LoadInt64((*int64)(c))
    }
    func (c *Counter) Reset() {
       atomic.StoreInt64((*int64)(c), 0)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    并且我还定义了一个 CircuitBreaker 结构体用于实现熔断器,它主要有这些字段:请求计数器 totalCounter、失败请求计数器 failsCounter、时间窗口 duration、最大延迟限制 latencyLimit、最大请求数限制 totalLimit、最大失败率限制 failsRateLimit、恢复请求的最低失败率 recoverFailsRate、时间窗口开始时间 lastTime、当前是否允许请求执行 allow。代码如下:

    type CircuitBreaker struct {
       totalCounter Counter
       failsCounter Counter
       duration       int64
       latencyLimit   int64
       totalLimit     int64
       failsRateLimit int64
       recoverFailsRate int64
       lastTime         int64
       allow            int64
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    然后我实现了一个函数 NewCircuitBreaker 用来根据参数创建一个熔断器,并且定义了一个类型 CBOption 来实现可扩展的变参。具体代码如下:

    type CBOption func(cb *CircuitBreaker)
    const (
       minDuration  = 100
       minTotal     = 1000
       minFailsRate = 2
    )
    func WithDuration(duration int64) CBOption {
       return func(cb *CircuitBreaker) {
          cb.duration = duration
       }
    }
    func WithLatencyLimit(latencyLimit int64) CBOption {
       return func(cb *CircuitBreaker) {
          cb.latencyLimit = latencyLimit
       }
    }
    func WithFailsLimit(failsRateLimit int64) CBOption {
       return func(cb *CircuitBreaker) {
          cb.failsRateLimit = failsRateLimit
       }
    }
    func WithTotalLimit(totalLimit int64) CBOption {
       return func(cb *CircuitBreaker) {
          cb.totalLimit = totalLimit
       }
    }
    func NewCircuitBreaker(opts ...CBOption) *CircuitBreaker {
       cb := &CircuitBreaker{
          totalCounter:   0,
          failsCounter:   0,
          duration:       0,
          lastTime:       0,
          failsRateLimit: 0,
          latencyLimit:   0,
          totalLimit:     0,
          allow:          1,
       }
       for _, opt := range opts {
          opt(cb)
       }
       if cb.duration < minDuration {
          cb.duration = minDuration
       }
       if cb.totalLimit < minTotal {
          cb.totalLimit = minTotal
       }
       if cb.failsRateLimit < minFailsRate {
          cb.failsRateLimit = minFailsRate
       }
       cb.recoverFailsRate = cb.failsRateLimit / 2
       return cb
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    以上代码都在infrastructure/utils/circuit_breaker.go这个文件中。

    熔断器最核心的是 Allow 方法,它支持传入一个类型为函数的参数 f ,返回该请求是否允许执行。 Allow 方法先更新请求计数器和时间窗口,并判断当前是否满足执行条件。如果不满足条件则返回 false;如果满足条件,则更新当前执行状态,并根据参数 f 的执行结果来更新失败计数器。代码如下:

    func (cb *CircuitBreaker) Allow(f func() bool) bool {
       fails := cb.failsCounter.Load()
       total := cb.totalCounter.Load()
       start := time.Now().UnixNano() / int64(time.Millisecond)
       if start > cb.lastTime+cb.duration {
          atomic.StoreInt64(&cb.lastTime, start)
          cb.failsCounter.Reset()
          cb.totalCounter.Reset()
          atomic.StoreInt64(&cb.allow, 1)
       }
       cb.totalCounter.Add()
       allow := !(total > 0 && fails*100/cb.failsRateLimit >= total || total >= cb.totalLimit)
       if atomic.LoadInt64(&cb.allow) == 0 {
          if fails*100/cb.recoverFailsRate > total {
             allow = false
          } else if allow {
             atomic.StoreInt64(&cb.allow, 1)
          }
       } else if !allow {
          atomic.StoreInt64(&cb.allow, 0)
       }
       if !allow {
          logrus.Error("not allowed")
          return false
       }
       ok := f()
       end := time.Now().UnixNano() / int64(time.Millisecond)
       if (cb.latencyLimit > 0 && end-start >= cb.latencyLimit) || !ok {
          cb.failsCounter.Add()
       }
       return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    接下来,我在 interfaces/api/middlewares/circuit_break.go 中实现熔断器中间件生成函数 NewCircuitBreakMiddleware,它主要是根据传入的熔断器返回一个熔断器中间件,在该中间件中调用熔断器的 Allow 方法,以此来控制当前请求是否需要熔断。如果发生熔断,则将请求的返回状态设置为 http.StatusServiceUnavailable,立刻中断请求并返回结果。代码如下:

    func NewCircuitBreakMiddleware(cb *utils.CircuitBreaker) gin.HandlerFunc {
       return func(c *gin.Context) {
          ok := cb.Allow(func() bool {
             c.Next()
             if c.Writer.Status() >= http.StatusInternalServerError {
                return false
             }
             return true
          })
          if !ok {
             c.AbortWithStatus(http.StatusServiceUnavailable)
          }
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    最后,我们修改 interfaces/api/routers.go 的代码,将中间件注入框架中。我们可以根据业务特点,为不同的接口生成不同的熔断器,合理分配服务器资源。

    比如我们可以将活动信息接口 限制为 100 毫秒内最多处理 2000 次请求、最大延迟 100 毫秒、最大失败率 5%,而购物车接口则限制为 100 毫秒内最多处理 1000 次请求、最大延迟 200 毫秒、最大失败率 5%。需要注意的是,熔断器中间件需要放到所有中间件的最前面,这样确保在第一时间拦截承载能力外的请求。 代码如下:

    func initRouters(g *gin.Engine) {
       g.POST("/login", api.User{}.Login)
       eventCB := utils.NewCircuitBreaker(
          utils.WithDuration(100),
          utils.WithTotalLimit(2000),
          utils.WithLatencyLimit(100),
          utils.WithFailsLimit(5),
       )
       eventCBMdw := middlewares.NewCircuitBreakMiddleware(eventCB)
       event := g.Group("/event").Use(eventCBMdw, middlewares.NewAuthMiddleware(false))
       eventApp := api.Event{}
       event.GET("/list", eventApp.List)
       event.GET("/info", eventApp.Info)
       subscribe := g.Group("/event/subscribe").Use(middlewares.NewAuthMiddleware(true))
       subscribe.POST("/", eventApp.Subscribe)
       shopCB := utils.NewCircuitBreaker(
          utils.WithDuration(100),
          utils.WithTotalLimit(1000),
          utils.WithLatencyLimit(200),
          utils.WithFailsLimit(5),
       )
       shopCBMdw := middlewares.NewCircuitBreakMiddleware(shopCB)
       shop := g.Group("/shop").Use(shopCBMdw, middlewares.NewAuthMiddleware(true), middlewares.Blacklist)
       shopApp := api.Shop{}
       shop.PUT("/cart/add", shopApp.AddCart)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    限流器

    在 Go 语言中,如果不借助任何第三方库,有两种经典模式实现限流器:Fan-In 和 Fan-Out,也叫扇入和扇出。它们各有优缺点,适合不同的业务场景。

    Fan-In 模式

    Fan-In 模式是一种多对一的模式,也就是说它有多个生产者,而只有一个消费者。它相当于限流算法中的漏桶算法,处理速度取决于消费者的速度。

    这种模式最大的作用是将生产者的数据排队,在消费端以固定速度消费并处理,它也比较适合需要在消费端操作分布式锁的场景。比如:在秒杀系统中控制队列的消费速度,并通过分布式锁和原子操作扣减库存。

    前面我提到过,在 Go 语言中,Channel 可以当队列来用,而生产者和消费者就是 Goroutine 了。所以,对于Fan-In 模式来说,它可以看作是多个 Goroutine 生产数据到一个 Channel,由一个 Goroutine 来消费 Channel。

    Fan-Out 模式

    Fan-Out 模式又是怎样的呢?它是一种一对多的模式,也就是说它只有一个生产者,却有多个消费者。它类似于限流算法中的令牌桶算法,处理速度取决于生产者的速度。

    这种模式最大的作用是在源头通过限制生产者的速度,来控制下游系统的压力。由于下游系统偏重业务,涉及 Redis、mysql 等操作,通常每次请求会有一定的延迟。为了提升吞吐量,通常是采用多个 Goroutine 的来消费队列中的数据,配合连接池并发请求下游系统。

    以上是Fan-In 和 Fan-Out 的作用介绍,在高并发系统中,通常会将两者搭配使用,这种方式也叫组合模式。 举个例子:秒杀系统中,用户的抢购请求在获得初步资格后,先通过 Fan-In 进入队列,由一个 Goroutine 消费队列,并通过 Fan-Out 将请求转发给多个 Goroutine 来并发操作多个商品的库存。

    代码实现

    这三种模式共同特点是什么呢?它们都需要有一个 Push 方法提供给生产者推送数据,一个 Pop 方法提供给消费者消费数据,一个 Close 方法用于关闭限流器。这里我把它们抽象成 RateLimiter 接口类,代码在 infrastructure/utils/rate_limiter.go 文件中。如下所示:

    type RateLimiter interface {
       Push(t pool.Task) bool
       Pop() (pool.Task, bool)
       Close() error
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    为了复用代码,我还定义了一个 fanInOut 结构体用于实现三种模式的限流器,它的核心字段主要是用于 Fan-In 模式的队列 queueIn,用于 Fan-Out 模式的队列 queueOut,以及一些控制流量速度的参数。然后,我实现了一个函数 NewRateLimiter,它接收 size、rate、mode 这三个参数,分别用于设置队列缓冲区、流量速度、模式等。其中,mode 参数用来控制三种模式的主要流程。该函数主要执行一些初始化的工作,比如初始化队列。相关代码如下所示:

    type fanInOut struct {
       sync.RWMutex
       queueIn  chan pool.Task
       queueOut chan pool.Task
       lastTime int64
       rate     int64
       duration time.Duration
       closed   int64
       mode     int
    }
    const (
       minRate = 1
       minSize = 10
       FanIn  = 1 << 0
       FanOut = 1 << 1
    )
    func NewRateLimiter(size int, rate int64, mode int) (RateLimiter, error) {
       modeMask := FanIn | FanOut
       if mode > modeMask || modeMask&mode == 0 {
          return nil, errors.New("wrong flag")
       }
       if rate < minRate {
          rate = minRate
       }
       if size < minSize {
          size = minSize
       }
       f := &fanInOut{
          lastTime: 0,
          rate:     rate,
          duration: time.Second / time.Duration(rate),
          closed:   0,
          mode:     mode,
       }
       if FanIn&mode != 0 {
          f.queueIn = make(chan pool.Task, size)
       }
       if FanOut&mode != 0 {
          f.queueOut = make(chan pool.Task, size)
       }
       if mode == modeMask {
          go f.exchange()
       }
       return f, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    在限流器中,比较关键的是如何控制好速度。 也就是说,如何根据 rate 参数来控制每次操作数据的时间间隔。这里我实现了一个 sleep 方法,用于计算出需要等待的时间,如等待 500 毫秒后再返回。

    如果是 Fan-In 模式,在 Push 方法里将采用非阻塞模式将数据推送到队列,而 Fan-Out 模式下则需要等待一段时间后再以阻塞的模式推送数据,确保速度不会超过限制。在 Pop 方法里,如果是 Fan-Out 模式,则直接消费队列并返回,如果是 Fan-In 模式,则等待一段时间再去消费队列,同样是为了确保速度不超过限制。

    对于组合模式,我实现了一个 exchange 方法,用于以固定速度将数据从 Fan-In 的队列里搬到 Fan-Out 的队列里。代码如下所示:

    func (f *fanInOut) Push(t pool.Task) bool {
       if atomic.LoadInt64(&f.closed) == 1 {
          return false
       }
       f.RLock()
       defer f.RUnlock()
       if atomic.LoadInt64(&f.closed) == 1 {
          return false
       }
       if FanIn&f.mode != 0 {
          select {
          case f.queueIn <- t:
             return true
          default:
             return false
          }
       } else {
          f.sleep()
          f.queueIn <- t
          return true
       }
    }
    func (f *fanInOut) Pop() (pool.Task, bool) {
       if FanOut&f.mode != 0 {
          t, ok := <-f.queueOut
          return t, ok
       } else {
          f.sleep()
          t, ok := <-f.queueIn
          return t, ok
       }
    }
    func (f *fanInOut) sleep() {
       now := time.Now().UnixNano()
       delta := f.duration - time.Duration(now-atomic.LoadInt64(&f.lastTime))
       if delta > time.Millisecond {
          time.Sleep(delta)
       }
       atomic.StoreInt64(&f.lastTime, now)
    }
    func (f *fanInOut) exchange() {
       for t := range f.queueIn {
          f.sleep()
          f.queueOut <- t
       }
       close(f.queueOut)
    }
    func (f *fanInOut) Close() error {
       f.Lock()
       defer f.Unlock()
       if atomic.CompareAndSwapInt64(&f.closed, 0, 1) {
          if f.mode&FanIn != 0 {
             close(f.queueIn)
          } else if f.mode == FanOut {
             close(f.queueOut)
          }
       }
       return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    小结

    这一讲,我主要给你介绍了 Go 语言中常用的熔断器和限流器实现方法。希望你已经掌握并能熟练运用到工作中。需要注意的是,熔断器和限流器实现方法有多种。我这里只是给你介绍了常用的几种实现方式,而开源社区还有一些更优秀的代码实现,你在项目中需要充分评估优缺点,做好性能对比分析。

    接下来,为你留一个思考题:熔断器和限流器的单元测试和性能测试该如何做呢?

    你可以把答案写在留言区。期待你的回答哦!

    好了,这一讲就到这里了。下一讲,我将给你介绍“如何使用队列和分布式锁防止库存超售”。到时见!

    源码地址:

    https://github.com/lagoueduCol/MiaoSha-Yiletian/tree/main/infrastructure/utils

    https://github.com/lagoueduCol/MiaoSha-Yiletian/tree/main/interfaces/api


    精选评论

  • 相关阅读:
    LabVIEW中EPICS客户端/服务端的测试
    Ubuntu Linux玩童年小霸王插卡游戏
    uniapp 短信监听(验证码)插件 Ba-Sms
    搭建自己的SSR
    二叉树路径模板
    【Dotnet 工具箱】推荐一个使用 C# 开发的轻量级压测工具
    不依赖第三方平台,用Dart语言实现 ios 消息推送
    一次偶然的钓鱼文件分析
    kubernetes client-go功能介绍
    vue 打包性能优化总结
  • 原文地址:https://blog.csdn.net/fegus/article/details/126377998