前面我介绍过,熔断器和限流器是高并发下保障服务稳定运行的有效手段。虽然各语言实现逻辑差不多,但由于特性不同,具体的实现也就有所不同。比如 Go 语言中的 Channel、 Goroutine、Atomic 很强大,你可以用它们来实现熔断器和限流器,但这并不意味着你用它们的时候会很顺利。相反,在没有充分了解它们的特点时,你可能会遇到各种坑。
接下来,我给你详细介绍下 Go 语言的一些高级编程技巧,以及如何用它们来实现熔断器和限流器。
实现熔断器的基本思路是:
划分一个时间窗口,如 100 毫秒,并设置判断条件,如失败率超过 5%、请求数超过 1000 等;
在请求进来的时候,判断条件是否满足熔断条件,如果满足就拒绝请求,如果不满足就继续处理请求;
请求处理完后,统计时间窗口内请求失败率、延迟不达标率、请求数等指标,以便作为后续请求的判断条件。
熔断器中最关键的部分是计数器和判断条件,这是因为它们为熔断器提供了判断依据。所以,为了实现计数器,我基于 int64 定义了一个 Counter 类型,并用原子操作为它实现了用于自增的 Add 方法、用于获取当前计数的 Load 方法、用于重置的 Reset 方法。具体代码如下:
type Counter int64
func (c *Counter) Add() int64 {
return atomic.AddInt64((*int64)(c), 1)
}
func (c *Counter) Load() int64 {
return atomic.LoadInt64((*int64)(c))
}
func (c *Counter) Reset() {
atomic.StoreInt64((*int64)(c), 0)
}
并且我还定义了一个 CircuitBreaker 结构体用于实现熔断器,它主要有这些字段:请求计数器 totalCounter、失败请求计数器 failsCounter、时间窗口 duration、最大延迟限制 latencyLimit、最大请求数限制 totalLimit、最大失败率限制 failsRateLimit、恢复请求的最低失败率 recoverFailsRate、时间窗口开始时间 lastTime、当前是否允许请求执行 allow。代码如下:
type CircuitBreaker struct {
totalCounter Counter
failsCounter Counter
duration int64
latencyLimit int64
totalLimit int64
failsRateLimit int64
recoverFailsRate int64
lastTime int64
allow int64
}
然后我实现了一个函数 NewCircuitBreaker 用来根据参数创建一个熔断器,并且定义了一个类型 CBOption 来实现可扩展的变参。具体代码如下:
type CBOption func(cb *CircuitBreaker)
const (
minDuration = 100
minTotal = 1000
minFailsRate = 2
)
func WithDuration(duration int64) CBOption {
return func(cb *CircuitBreaker) {
cb.duration = duration
}
}
func WithLatencyLimit(latencyLimit int64) CBOption {
return func(cb *CircuitBreaker) {
cb.latencyLimit = latencyLimit
}
}
func WithFailsLimit(failsRateLimit int64) CBOption {
return func(cb *CircuitBreaker) {
cb.failsRateLimit = failsRateLimit
}
}
func WithTotalLimit(totalLimit int64) CBOption {
return func(cb *CircuitBreaker) {
cb.totalLimit = totalLimit
}
}
func NewCircuitBreaker(opts ...CBOption) *CircuitBreaker {
cb := &CircuitBreaker{
totalCounter: 0,
failsCounter: 0,
duration: 0,
lastTime: 0,
failsRateLimit: 0,
latencyLimit: 0,
totalLimit: 0,
allow: 1,
}
for _, opt := range opts {
opt(cb)
}
if cb.duration < minDuration {
cb.duration = minDuration
}
if cb.totalLimit < minTotal {
cb.totalLimit = minTotal
}
if cb.failsRateLimit < minFailsRate {
cb.failsRateLimit = minFailsRate
}
cb.recoverFailsRate = cb.failsRateLimit / 2
return cb
}
以上代码都在infrastructure/utils/circuit_breaker.go这个文件中。
熔断器最核心的是 Allow 方法,它支持传入一个类型为函数的参数 f ,返回该请求是否允许执行。 Allow 方法先更新请求计数器和时间窗口,并判断当前是否满足执行条件。如果不满足条件则返回 false;如果满足条件,则更新当前执行状态,并根据参数 f 的执行结果来更新失败计数器。代码如下:
func (cb *CircuitBreaker) Allow(f func() bool) bool {
fails := cb.failsCounter.Load()
total := cb.totalCounter.Load()
start := time.Now().UnixNano() / int64(time.Millisecond)
if start > cb.lastTime+cb.duration {
atomic.StoreInt64(&cb.lastTime, start)
cb.failsCounter.Reset()
cb.totalCounter.Reset()
atomic.StoreInt64(&cb.allow, 1)
}
cb.totalCounter.Add()
allow := !(total > 0 && fails*100/cb.failsRateLimit >= total || total >= cb.totalLimit)
if atomic.LoadInt64(&cb.allow) == 0 {
if fails*100/cb.recoverFailsRate > total {
allow = false
} else if allow {
atomic.StoreInt64(&cb.allow, 1)
}
} else if !allow {
atomic.StoreInt64(&cb.allow, 0)
}
if !allow {
logrus.Error("not allowed")
return false
}
ok := f()
end := time.Now().UnixNano() / int64(time.Millisecond)
if (cb.latencyLimit > 0 && end-start >= cb.latencyLimit) || !ok {
cb.failsCounter.Add()
}
return true
}
接下来,我在 interfaces/api/middlewares/circuit_break.go 中实现熔断器中间件生成函数 NewCircuitBreakMiddleware,它主要是根据传入的熔断器返回一个熔断器中间件,在该中间件中调用熔断器的 Allow 方法,以此来控制当前请求是否需要熔断。如果发生熔断,则将请求的返回状态设置为 http.StatusServiceUnavailable,立刻中断请求并返回结果。代码如下:
func NewCircuitBreakMiddleware(cb *utils.CircuitBreaker) gin.HandlerFunc {
return func(c *gin.Context) {
ok := cb.Allow(func() bool {
c.Next()
if c.Writer.Status() >= http.StatusInternalServerError {
return false
}
return true
})
if !ok {
c.AbortWithStatus(http.StatusServiceUnavailable)
}
}
}
最后,我们修改 interfaces/api/routers.go 的代码,将中间件注入框架中。我们可以根据业务特点,为不同的接口生成不同的熔断器,合理分配服务器资源。
比如我们可以将活动信息接口 限制为 100 毫秒内最多处理 2000 次请求、最大延迟 100 毫秒、最大失败率 5%,而购物车接口则限制为 100 毫秒内最多处理 1000 次请求、最大延迟 200 毫秒、最大失败率 5%。需要注意的是,熔断器中间件需要放到所有中间件的最前面,这样确保在第一时间拦截承载能力外的请求。 代码如下:
func initRouters(g *gin.Engine) {
g.POST("/login", api.User{}.Login)
eventCB := utils.NewCircuitBreaker(
utils.WithDuration(100),
utils.WithTotalLimit(2000),
utils.WithLatencyLimit(100),
utils.WithFailsLimit(5),
)
eventCBMdw := middlewares.NewCircuitBreakMiddleware(eventCB)
event := g.Group("/event").Use(eventCBMdw, middlewares.NewAuthMiddleware(false))
eventApp := api.Event{}
event.GET("/list", eventApp.List)
event.GET("/info", eventApp.Info)
subscribe := g.Group("/event/subscribe").Use(middlewares.NewAuthMiddleware(true))
subscribe.POST("/", eventApp.Subscribe)
shopCB := utils.NewCircuitBreaker(
utils.WithDuration(100),
utils.WithTotalLimit(1000),
utils.WithLatencyLimit(200),
utils.WithFailsLimit(5),
)
shopCBMdw := middlewares.NewCircuitBreakMiddleware(shopCB)
shop := g.Group("/shop").Use(shopCBMdw, middlewares.NewAuthMiddleware(true), middlewares.Blacklist)
shopApp := api.Shop{}
shop.PUT("/cart/add", shopApp.AddCart)
}
在 Go 语言中,如果不借助任何第三方库,有两种经典模式实现限流器:Fan-In 和 Fan-Out,也叫扇入和扇出。它们各有优缺点,适合不同的业务场景。
Fan-In 模式是一种多对一的模式,也就是说它有多个生产者,而只有一个消费者。它相当于限流算法中的漏桶算法,处理速度取决于消费者的速度。
这种模式最大的作用是将生产者的数据排队,在消费端以固定速度消费并处理,它也比较适合需要在消费端操作分布式锁的场景。比如:在秒杀系统中控制队列的消费速度,并通过分布式锁和原子操作扣减库存。
前面我提到过,在 Go 语言中,Channel 可以当队列来用,而生产者和消费者就是 Goroutine 了。所以,对于Fan-In 模式来说,它可以看作是多个 Goroutine 生产数据到一个 Channel,由一个 Goroutine 来消费 Channel。
Fan-Out 模式又是怎样的呢?它是一种一对多的模式,也就是说它只有一个生产者,却有多个消费者。它类似于限流算法中的令牌桶算法,处理速度取决于生产者的速度。
这种模式最大的作用是在源头通过限制生产者的速度,来控制下游系统的压力。由于下游系统偏重业务,涉及 Redis、mysql 等操作,通常每次请求会有一定的延迟。为了提升吞吐量,通常是采用多个 Goroutine 的来消费队列中的数据,配合连接池并发请求下游系统。
以上是Fan-In 和 Fan-Out 的作用介绍,在高并发系统中,通常会将两者搭配使用,这种方式也叫组合模式。 举个例子:秒杀系统中,用户的抢购请求在获得初步资格后,先通过 Fan-In 进入队列,由一个 Goroutine 消费队列,并通过 Fan-Out 将请求转发给多个 Goroutine 来并发操作多个商品的库存。
这三种模式共同特点是什么呢?它们都需要有一个 Push 方法提供给生产者推送数据,一个 Pop 方法提供给消费者消费数据,一个 Close 方法用于关闭限流器。这里我把它们抽象成 RateLimiter 接口类,代码在 infrastructure/utils/rate_limiter.go 文件中。如下所示:
type RateLimiter interface {
Push(t pool.Task) bool
Pop() (pool.Task, bool)
Close() error
}
为了复用代码,我还定义了一个 fanInOut 结构体用于实现三种模式的限流器,它的核心字段主要是用于 Fan-In 模式的队列 queueIn,用于 Fan-Out 模式的队列 queueOut,以及一些控制流量速度的参数。然后,我实现了一个函数 NewRateLimiter,它接收 size、rate、mode 这三个参数,分别用于设置队列缓冲区、流量速度、模式等。其中,mode 参数用来控制三种模式的主要流程。该函数主要执行一些初始化的工作,比如初始化队列。相关代码如下所示:
type fanInOut struct {
sync.RWMutex
queueIn chan pool.Task
queueOut chan pool.Task
lastTime int64
rate int64
duration time.Duration
closed int64
mode int
}
const (
minRate = 1
minSize = 10
FanIn = 1 << 0
FanOut = 1 << 1
)
func NewRateLimiter(size int, rate int64, mode int) (RateLimiter, error) {
modeMask := FanIn | FanOut
if mode > modeMask || modeMask&mode == 0 {
return nil, errors.New("wrong flag")
}
if rate < minRate {
rate = minRate
}
if size < minSize {
size = minSize
}
f := &fanInOut{
lastTime: 0,
rate: rate,
duration: time.Second / time.Duration(rate),
closed: 0,
mode: mode,
}
if FanIn&mode != 0 {
f.queueIn = make(chan pool.Task, size)
}
if FanOut&mode != 0 {
f.queueOut = make(chan pool.Task, size)
}
if mode == modeMask {
go f.exchange()
}
return f, nil
}
在限流器中,比较关键的是如何控制好速度。 也就是说,如何根据 rate 参数来控制每次操作数据的时间间隔。这里我实现了一个 sleep 方法,用于计算出需要等待的时间,如等待 500 毫秒后再返回。
如果是 Fan-In 模式,在 Push 方法里将采用非阻塞模式将数据推送到队列,而 Fan-Out 模式下则需要等待一段时间后再以阻塞的模式推送数据,确保速度不会超过限制。在 Pop 方法里,如果是 Fan-Out 模式,则直接消费队列并返回,如果是 Fan-In 模式,则等待一段时间再去消费队列,同样是为了确保速度不超过限制。
对于组合模式,我实现了一个 exchange 方法,用于以固定速度将数据从 Fan-In 的队列里搬到 Fan-Out 的队列里。代码如下所示:
func (f *fanInOut) Push(t pool.Task) bool {
if atomic.LoadInt64(&f.closed) == 1 {
return false
}
f.RLock()
defer f.RUnlock()
if atomic.LoadInt64(&f.closed) == 1 {
return false
}
if FanIn&f.mode != 0 {
select {
case f.queueIn <- t:
return true
default:
return false
}
} else {
f.sleep()
f.queueIn <- t
return true
}
}
func (f *fanInOut) Pop() (pool.Task, bool) {
if FanOut&f.mode != 0 {
t, ok := <-f.queueOut
return t, ok
} else {
f.sleep()
t, ok := <-f.queueIn
return t, ok
}
}
func (f *fanInOut) sleep() {
now := time.Now().UnixNano()
delta := f.duration - time.Duration(now-atomic.LoadInt64(&f.lastTime))
if delta > time.Millisecond {
time.Sleep(delta)
}
atomic.StoreInt64(&f.lastTime, now)
}
func (f *fanInOut) exchange() {
for t := range f.queueIn {
f.sleep()
f.queueOut <- t
}
close(f.queueOut)
}
func (f *fanInOut) Close() error {
f.Lock()
defer f.Unlock()
if atomic.CompareAndSwapInt64(&f.closed, 0, 1) {
if f.mode&FanIn != 0 {
close(f.queueIn)
} else if f.mode == FanOut {
close(f.queueOut)
}
}
return nil
}
这一讲,我主要给你介绍了 Go 语言中常用的熔断器和限流器实现方法。希望你已经掌握并能熟练运用到工作中。需要注意的是,熔断器和限流器实现方法有多种。我这里只是给你介绍了常用的几种实现方式,而开源社区还有一些更优秀的代码实现,你在项目中需要充分评估优缺点,做好性能对比分析。
接下来,为你留一个思考题:熔断器和限流器的单元测试和性能测试该如何做呢?
你可以把答案写在留言区。期待你的回答哦!
好了,这一讲就到这里了。下一讲,我将给你介绍“如何使用队列和分布式锁防止库存超售”。到时见!
源码地址:
https://github.com/lagoueduCol/MiaoSha-Yiletian/tree/main/infrastructure/utils
https://github.com/lagoueduCol/MiaoSha-Yiletian/tree/main/interfaces/api