• 自定义注解实现分布式限流


    1、为什么要限流?

    在高并发场景下,接口并不能无限制的接受外部请求,当请求数超过一定的数量之后,服务器的内存和CPU将面临巨大的压力,最后会崩溃直到服务器宕机。因此必须对某些高并发接口进行限流。

    2、限流的方案

    常见的限流算法有:

    • 计数器(固定窗口)算法
    • 滑动窗口算法
    • 漏桶算法
    • 令牌桶算法

    这几种算法的介绍和比较请见:主流限流算法比较

    限流具体的实现分为单机和分布式限流

    2.1、单机限流

    单机部署的系统一般可以使用Guava RateLimiter,他实现了令牌桶算法,目前使用比较广泛

    2.2、分布式限流

    现在的微服务一般都是采用多实例部署的方式,即:一个微服务多个部署实例。一般这种情况限流需要采用Redis中间件来进行限流,常用的技术路线是:Redis+Lua限流。分布式限流中又分为网管层限流和微服务API限流。

    2.2.1 网关限流

    Spring Cloud Gateway 自带的RedisRateLimiter可以实现分布式限流实现。它的实现原理是基于令牌桶算法的,具体的实现逻辑放在src/main/resources/META-INF/scripts目录下的request_rate_limiter.lua脚本中:脚本的内容如下:

    local tokens_key = KEYS[1]
    local timestamp_key = KEYS[2]
    --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
    
    local rate = tonumber(ARGV[1])
    local capacity = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])
    local requested = tonumber(ARGV[4])
    
    local fill_time = capacity/rate
    local ttl = math.floor(fill_time*2)
    
    --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
    --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
    --redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
    --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
    --redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
    --redis.log(redis.LOG_WARNING, "ttl " .. ttl)
    
    local last_tokens = tonumber(redis.call("get", tokens_key))
    if last_tokens == nil then
      last_tokens = capacity
    end
    --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
    
    local last_refreshed = tonumber(redis.call("get", timestamp_key))
    if last_refreshed == nil then
      last_refreshed = 0
    end
    --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
    
    local delta = math.max(0, now-last_refreshed)
    local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
    local allowed = filled_tokens >= requested
    local new_tokens = filled_tokens
    local allowed_num = 0
    if allowed then
      new_tokens = filled_tokens - requested
      allowed_num = 1
    end
    
    --redis.log(redis.LOG_WARNING, "delta " .. delta)
    --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
    --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
    --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
    
    if ttl > 0 then
      redis.call("setex", tokens_key, ttl, new_tokens)
      redis.call("setex", timestamp_key, ttl, now)
    end
    
    -- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
    return { allowed_num, new_tokens }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    参考:Spring Cloud Gateway限流实战

    2.2.2 微服务API限流

    微服务API限流可以在接口上进行灵活的配置,相比于网关限流更加灵活和个性化,常见的方案一般也是Redis+Lua脚本;那么为什么一定要使用lua脚本,因为redis运行lua脚本可以保证一批redis指令的执行不会被其他线程打断,可以实现操作的原子性(虽然redis的事务原子性不符合ACID原则)。可以参考Redisson提供的RRateLimiter,基于LUA脚本实现了分布式限流,它的核心限流方法是RedissonRateLimiter#tryAcquireAsync,代码如下:

     private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    "local rate = redis.call('hget', KEYS[1], 'rate');"
                  + "local interval = redis.call('hget', KEYS[1], 'interval');"
                  + "local type = redis.call('hget', KEYS[1], 'type');"
                  + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
                  
                  + "local valueName = KEYS[2];"
                  + "local permitsName = KEYS[4];"
                  + "if type == '1' then "
                      + "valueName = KEYS[3];"
                      + "permitsName = KEYS[5];"
                  + "end;"
    
                  + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
    
                  + "local currentValue = redis.call('get', valueName); "
                  + "if currentValue ~= false then "
                         + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                         + "local released = 0; "
                         + "for i, v in ipairs(expiredValues) do "
                              + "local random, permits = struct.unpack('fI', v);"
                              + "released = released + permits;"
                         + "end; "
    
                         + "if released > 0 then "
                              + "redis.call('zrem', permitsName, unpack(expiredValues)); "
                              + "currentValue = tonumber(currentValue) + released; "
                              + "redis.call('set', valueName, currentValue);"
                         + "end;"
    
                         + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
                             + "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1); "
                             + "local random, permits = struct.unpack('fI', nearest[1]);"
                             + "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);"
                         + "else "
                             + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                             + "redis.call('decrby', valueName, ARGV[1]); "
                             + "return nil; "
                         + "end; "
                  + "else "
                         + "redis.call('set', valueName, rate); "
                         + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "return nil; "
                  + "end;",
                    Arrays.asList(getName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
                    value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3、自定义注解实现分布式限流

    该方案的实现需要引入Redisson,因为Redisson中提供了一个RRateLimiter可以细实线分布式限流,本方案主要是对其进行简单的包装,方便使用。
    首先定义一个注解@RequestRateLimiter

    @Target({ElementType.METHOD,ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RequestRateLimiter {
        String limiterName() default "";
        RateType mode() default RateType.PER_CLIENT;
        long rate() default 10;
        long rateInterval() default 1;
        RateIntervalUnit rateIntervalUnit() default RateIntervalUnit.SECONDS;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    接着要定义拦截器对该注解进行拦截

    @Component
    @Slf4j
    public class RateLimitInterceptor implements HandlerInterceptor {
        @Autowired
        RedissonClient redissonClient;
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            // 如果不是映射到方法直接通过
            if (!(handler instanceof HandlerMethod)) {
                return true;
            }
    
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            Method method = handlerMethod.getMethod();
            //检查有没有限流注解
            if (method.isAnnotationPresent(RequestRateLimiter.class)) {
                RequestRateLimiter rateLimiter = method.getAnnotation(RequestRateLimiter.class);
                if (Objects.nonNull(rateLimiter)) {  //需要限流
                    String limiterName = rateLimiter.limiterName();
                    long rate = rateLimiter.rate();
                    long rateInterval = rateLimiter.rateInterval();
                    RateIntervalUnit rateIntervalUnit = rateLimiter.rateIntervalUnit();
                    RateType mode = rateLimiter.mode();
                    AssertUtil.isStringEmpty(limiterName,"limiterName can't be empty");
                    RRateLimiter rRateLimiter = redissonClient.getRateLimiter(limiterName);
                    rRateLimiter.setRate(mode,rate,rateInterval,rateIntervalUnit);
                    if(rRateLimiter.tryAcquire()) { // 调用了 RedissonRateLimiter#tryAcquireAsync
                        return true;
                    } else {
                        log.info("request limited by redis");
                        return false;
                    }
                }
            }
            return true;
        }
    
        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
            HandlerInterceptor.super.postHandle(request, response, handler, modelAndView);
        }
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
            HandlerInterceptor.super.afterCompletion(request, response, handler, ex);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    将拦截器配置到WebMVC中

    @Configuration
    @Slf4j
    public class InterceptorConfigurer implements WebMvcConfigurer {
        @Autowired
        private RateLimitInterceptor rateLimitInterceptor;
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(rateLimitInterceptor).addPathPatterns("/**");
            log.info("initialize addInterceptors successfully");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    使用注解进行限流

    @RestController
    @RequestMapping
    public class OrderController {
    
        @Autowired
        private IOrderService orderService;
    
        @RequestMapping(value = "/create", method = RequestMethod.POST, produces = "application/json")
        @RequestRateLimiter(limiterName = "createOrder", rate = 20) // 限制每秒20个请求
        DataResponse<String> createOrder(@RequestBody OrderVo orderVo) {
            return orderService.createOrder(orderVo);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    提高CV模型训练性能的 9 个技巧
    微服务架构中实体类模块化设计与MyBatis-Plus注解浅析
    Cholesterol-PEG-Acid,胆固醇-聚乙二醇-羧基保持在干燥、低温环境下
    Day_17> 动态内存管理
    VLOOKUP函数的使用方法
    Handsontable JavaScript 12.2 Crack
    【C++】类和对象(中)
    智能多价格-面向电商的全栈开发利器
    浅析基于EasyCVR视频技术构建工业园区视频安防大数据监管平台的方案
    vue3 组件v-model绑定props里的值,修改组件的值要触发回调
  • 原文地址:https://blog.csdn.net/shuoyueqishilove/article/details/126555926