• Redis实现滑动窗口限流


    常见限流算法

    1. 固定窗口算法

      在固定的时间窗口下进行计数,达到阈值就拒绝请求。固定窗口如果在窗口开始就打满阈值,窗口后半部分进入的请求都会拒绝。

    2. 滑动窗口算法

      在固定窗口的基础上,窗口会随着时间向前推移,可以在时间内平滑控制流量,解决固定窗口出现的突发流量问题。

    3. 漏斗算法

      请求来了先进入漏斗,漏斗以恒定的速率放行请求。

    4. 令牌桶算法

      在令牌桶中,以恒定的速率放入令牌,令牌桶也有一定的容量,如果满了令牌就无法放进去了。拿到令牌的请求通过,并消耗令牌,如果令牌桶中令牌为空,则会丢弃该请求。

    redis实现滑动窗口算法

    当有请求来的时候记录时间戳,统计窗口内请求的数量时只需要统计redis中记录的数量。可以使用redis中的zset结构来存储。key可以设置为请求的资源名,同时根据限流的对象,往key中加入限流对象信息。比如根据ip限制访问某个资源的流量,可以使用方法名+ip作为key。score设置为时间戳。value则可以根据请求参数等信息生成MD5,或者直接生成UUID来存入,防止并发时多个请求存入的score和value一样导致只存入一个数据。

    步骤如下:

    1. 定义时间窗口
    2. 请求到来,丢弃时间窗口之外的数据,ZREMRANGEBYSCORE KEYS[i], -inf, window_start
    3. 判断时间窗口内的请求个数是否达到阈值。ZCARD KEYS[i] 要小于阈值
    4. 如果小于则通过zadd加入,超过则返回不放行

    lua脚本:

    1. local window_start = tonumber(ARGV[1])- tonumber(ARGV[2])
    2. redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', window_start)
    3. local current_requests = redis.call('ZCARD', KEYS[1])
    4. if current_requests < tonumber(ARGV[3]) then
    5. redis.call('ZADD', KEYS[1], tonumber(ARGV[1]), ARGV[4])
    6. return 1
    7. else
    8. return 0
    9. end

    java通过注解+切面实现限流

    在java中,我们的需求是对资源可以进行多种规则的限流。注解可以定义不同类型的限流,如:全局限流,根据IP限流,根据用户限流。对每种类型的限流可以在一个注解中定义多个限流规则。

    整体效果如下:

    1. @RateLimiter(rules = {@RateLimitRule(time = 50,count = 100),@RateLimitRule(time = 20,count = 10)}, type = LimitType.IP)
    2. @RateLimiter(rules = {@RateLimitRule(time = 60,count = 1000)}, type = LimitType.DEFAULT)
    3. public void update(){
    4. }

    定义注解

    定义了三个注解:

    1. RateLimiter:限流注解
    2. RateLimitRule:限流规则
    3. RateLimiters:存放多个限流注解的容器,为了可以重复使用该注解

    RateLimiter:

    1. @Target(ElementType.METHOD)
    2. @Retention(RetentionPolicy.RUNTIME)
    3. @Documented
    4. // 支持重复注解
    5. @Repeatable(value = RateLimiters.class)
    6. public @interface RateLimiter {
    7. /**
    8. * 限流键前缀
    9. *
    10. * @return
    11. */
    12. String key() default "rate_limit:";
    13. /**
    14. * 限流规则
    15. *
    16. * @return
    17. */
    18. RateLimitRule[] rules() default {};
    19. /**
    20. * 限流类型
    21. *
    22. * @return
    23. */
    24. LimitType type() default LimitType.DEFAULT;
    25. }

    RateLimitRule:

    1. public @interface RateLimitRule {
    2. /**
    3. * 时间窗口, 单位秒
    4. *
    5. * @return
    6. */
    7. int time() default 60;
    8. /**
    9. * 允许请求数
    10. *
    11. * @return
    12. */
    13. int count() default 100;
    14. }

    RateLimiters:

    1. @Target(ElementType.METHOD)
    2. @Retention(RetentionPolicy.RUNTIME)
    3. @Documented
    4. public @interface RateLimiters {
    5. RateLimiter[] value();
    6. }

    改造lua脚本

    在实现切面之前,我们需要对lua脚本进行改造。我们的需求对资源可以进行多种规则的限流。根据限流类型和限流规则可以组合出不同的key,比如我们要对某个资源进行以下规则限流:全局限流(60s,1000次; 600s,5000次),根据ip限流(2s,5次)。

    根据这些规则我们就需要使用3个zset分别来存放请求记录。并且当三个规则都没达到阈值时才放行请求,否则拒绝请求。

    对lua脚本改造,支持多个key。

     
    
    1. local flag = 1
    2. for i = 1, #KEYS do
    3. local window_start = tonumber(ARGV[1])- tonumber(ARGV[(i-1)*3+2])
    4. redis.call('ZREMRANGEBYSCORE', KEYS[i], '-inf', window_start)
    5. local current_requests = redis.call('ZCARD', KEYS[i])
    6. if current_requests < tonumber(ARGV[(i-1)*3+3]) then
    7. else
    8. flag = 0
    9. end
    10. end
    11. if flag == 1 then
    12. for i = 1, #KEYS do
    13. redis.call('ZADD', KEYS[i], tonumber(ARGV[1]), ARGV[(i-1)*3+4])
    14. end
    15. end
    16. return flag

    定义切面

    定义一个切面实现限流逻辑:RateLimiterAspect

    首先定义切点,由于我们可以重复使用注解,所以需要把RateLimiter和RateLimiters都定义为切点

    1. @Pointcut("@annotation(com.imgyh.framework.annotation.RateLimiter)")
    2. public void rateLimiter() {
    3. }
    4. @Pointcut("@annotation(com.imgyh.framework.annotation.RateLimiters)")
    5. public void rateLimiters() {
    6. }

    在前置通知中实现限流逻辑:

    主要流程如下:

    1. 把所有的RateLimiter都拿到,解析出限流规则和限流类型
    2. 根据限流规则和限流类型,获取所有的key和参数,为调用lua脚本做准备
    3. 调用lua脚本,根据返回值判断是否放行请求
    1. // 定义切点之前的操作
    2. @Before("rateLimiter() || rateLimiters()")
    3. public void doBefore(JoinPoint point) {
    4. try {
    5. // 从切点获取方法签名
    6. MethodSignature signature = (MethodSignature) point.getSignature();
    7. // 获取方法
    8. Method method = signature.getMethod();
    9. String name = point.getTarget().getClass().getName() + "." + signature.getName();
    10. // 获取日志注解
    11. RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);
    12. RateLimiters rateLimiters = method.getAnnotation(RateLimiters.class);
    13. List limiters = new ArrayList<>();
    14. if (ObjectUtils.isNotNull(rateLimiter)) {
    15. limiters.add(rateLimiter);
    16. }
    17. if (ObjectUtils.isNotNull(rateLimiters)) {
    18. limiters.addAll(Arrays.asList(rateLimiters.value()));
    19. }
    20. if (!allowRequest(limiters, name)) {
    21. throw new ServiceException("访问过于频繁,请稍候再试");
    22. }
    23. } catch (ServiceException e) {
    24. throw e;
    25. } catch (Exception e) {
    26. throw new RuntimeException("服务器限流异常,请稍候再试");
    27. }
    28. }
    29. /**
    30. * 是否允许请求
    31. *
    32. * @param rateLimiters 限流注解
    33. * @param name 方法全名
    34. * @return 是否放行
    35. */
    36. private boolean allowRequest(List rateLimiters, String name) {
    37. List keys = getKeys(rateLimiters, name);
    38. Object[] args = getArgs(rateLimiters);
    39. Object res = redisTemplate.execute(limitScript, keys, args);
    40. return ObjectUtils.isNotNull(res) && (Long) res == 1L;
    41. }
    42. /**
    43. * 获取限流的键
    44. *
    45. * @param rateLimiters 限流注解
    46. * @param name 方法全名
    47. * @return
    48. */
    49. private List getKeys(List rateLimiters, String name) {
    50. List keys = new ArrayList<>();
    51. for (RateLimiter rateLimiter : rateLimiters) {
    52. String key = rateLimiter.key();
    53. RateLimitRule[] rules = rateLimiter.rules();
    54. LimitType type = rateLimiter.type();
    55. StringBuilder sb = new StringBuilder();
    56. sb.append(key).append(name);
    57. if (LimitType.IP == type) {
    58. String ipAddr = IpUtils.getIpAddr();
    59. sb.append("_").append(ipAddr);
    60. } else if (LimitType.USER == type) {
    61. Long userId = SecurityUtils.getUserId();
    62. sb.append("_").append(userId);
    63. }
    64. for (RateLimitRule rule : rules) {
    65. int time = rule.time() * 1000;
    66. int count = rule.count();
    67. StringBuilder builder = new StringBuilder(sb);
    68. builder.append("_").append(time).append("_").append(count);
    69. keys.add(builder.toString());
    70. }
    71. }
    72. return keys;
    73. }
    74. /**
    75. * 获取需要的参数
    76. *
    77. * @param rateLimiters 限流注解
    78. * @return
    79. */
    80. private Object[] getArgs(List rateLimiters) {
    81. List args = new ArrayList<>();
    82. args.add(System.currentTimeMillis());
    83. for (RateLimiter rateLimiter : rateLimiters) {
    84. RateLimitRule[] rules = rateLimiter.rules();
    85. for (RateLimitRule rule : rules) {
    86. int time = rule.time() * 1000;
    87. int count = rule.count();
    88. args.add(time);
    89. args.add(count);
    90. args.add(IdUtils.fastSimpleUUID());
    91. }
    92. }
    93. return args.toArray();
    94. }
    95. 实例demo演示

      demo源码仓库:github.com/imgyh/devel…

      定义接口,并添加限流注解。

      限制对某个用户只能1s中访问2次。对接口整体10s中访问50次,60秒访问100次。

      当某个用户一秒钟请求超过两次时,抛出异常。

      参考资源

      1. Hollis,《Java面试宝典》
      2. 一文搞懂高频面试题之限流算法,从算法原理到实现,再到对比分析
    96. 相关阅读:
      canvas学习
      程序员脱单
      网络安全(黑客)-自学手册
      RabbitMQ系列【3】安装RabbitMQ
      Nacos注册中心
      ethernet eth0: Could not attach to PHY
      LaTex常用技巧6:矩阵编写总结
      2023年软件开发领域的发展趋势
      数据结构(六)单向循环链表
      【博客444】Linux Macvlan
    97. 原文地址:https://blog.csdn.net/WXF_Sir/article/details/136250213