• Spring Boot自定义注解+AOP,使用guava的RateLimiter实现接口的限流


    目录

    一、需求

    二、设计

    漏桶算法

    令牌桶算法

    几种算法对比

    三、相关代码

    1. 引入相关依赖

    2.自定义注解 @RateLimit

    3.封装限流器 EfRateLimiter

    4.定义AOP切面

    5.在接口中使用@RateLimit来开启限流:


    一、需求

            接口限流,支持通过配置文件设置是否开启限流,限流的大小,以及超时时间

    二、设计

    常用限流算法:漏桶算法、令牌桶算法、滑动窗口(计数器)算法

    漏桶算法

            漏桶非常均匀的控制流量,如果漏桶满了,后续的水全部会溢出,用它来作为应用层限流是不合适的。如果有大量的用户访问,会导致后面的用户全部拒绝服务,给人的感觉就像服务挂了一样。

    令牌桶算法

            令牌桶算法恰好相反,桶里放的不是请求,而是令牌。当请求到来时,需要从桶中拿到一个令牌才能获取服务,否则该请求会被拒绝。由于令牌桶是动态变化的,令牌消耗完了会继续往里放,因此就不存在漏桶那样后面的用户拿不到令牌的情况,是一个比较平滑的过程。

     

     

    几种算法对比

            令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
            令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
            令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;

    guava的RateLimiter使用的是令牌桶算法,有两种实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)

     

    三、相关代码

    1. 引入相关依赖

    AOP

    1. org.springframework.boot
    2. spring-boot-starter-aop

    guava 

    1. com.google.guava
    2. guava
    3. 20.0

    2.自定义注解 @RateLimit

    1. import org.springframework.beans.factory.annotation.Required;
    2. import java.lang.annotation.Documented;
    3. import java.lang.annotation.ElementType;
    4. import java.lang.annotation.Retention;
    5. import java.lang.annotation.RetentionPolicy;
    6. import java.lang.annotation.Target;
    7. import java.util.concurrent.TimeUnit;
    8. /**
    9. * 自定义注解,用于接口的限流
    10. *
    11. * @author lizf
    12. * date: 2022/7/18 11:53
    13. */
    14. @Target({ElementType.METHOD})
    15. @Retention(RetentionPolicy.RUNTIME)
    16. @Documented
    17. public @interface RateLimit {
    18. /**
    19. * 限流器名称,如果不设置,默认是类名加方法名。如果多个接口设置了同一个名称,那么使用同一个限流器
    20. *
    21. * @return
    22. */
    23. String name() default "";
    24. /**
    25. * 一秒内允许通过的请求数QPS
    26. *
    27. * @return
    28. */
    29. @Required
    30. String permitsPerSecond();
    31. /**
    32. * 获取令牌超时时间
    33. *
    34. * @return
    35. */
    36. String timeout() default "0";
    37. /**
    38. * 获取令牌超时时间单位
    39. *
    40. * @return
    41. */
    42. TimeUnit timeUnit() default TimeUnit.SECONDS;
    43. }

    3.封装限流器 EfRateLimiter

    1. import com.google.common.util.concurrent.RateLimiter;
    2. import java.util.concurrent.TimeUnit;
    3. /**
    4. * 接口限流器
    5. *
    6. * @author lizf
    7. * date: 2022/7/18 14:20
    8. */
    9. public class EfRateLimiter {
    10. private RateLimiter rateLimiter;
    11. private long timeout;
    12. private TimeUnit timeUnit;
    13. public RateLimiter getRateLimiter() {
    14. return rateLimiter;
    15. }
    16. public void setRateLimiter(RateLimiter rateLimiter) {
    17. this.rateLimiter = rateLimiter;
    18. }
    19. public long getTimeout() {
    20. return timeout;
    21. }
    22. public void setTimeout(long timeout) {
    23. this.timeout = timeout;
    24. }
    25. public TimeUnit getTimeUnit() {
    26. return timeUnit;
    27. }
    28. public void setTimeUnit(TimeUnit timeUnit) {
    29. this.timeUnit = timeUnit;
    30. }
    31. public boolean tryAcquire() {
    32. return rateLimiter.tryAcquire(timeout, timeUnit);
    33. }
    34. public boolean tryAcquire(int permits) {
    35. return rateLimiter.tryAcquire(permits, timeout, timeUnit);
    36. }
    37. }

    4.定义AOP切面

    这里使用的切点、通知,采用了增强的方式,可以直接在通知的参数里获取自定义注解里的内容,省却了通过反射来获取注解里的内容。参考链接:

    AOP高级特性,Advice Parameters,在拦截方法里配置参数、自定义注解对象等_lzhfdxhxm的博客-CSDN博客

    1. import com.google.common.util.concurrent.RateLimiter;
    2. import org.aspectj.lang.ProceedingJoinPoint;
    3. import org.aspectj.lang.annotation.Around;
    4. import org.aspectj.lang.annotation.Aspect;
    5. import org.aspectj.lang.annotation.Pointcut;
    6. import org.aspectj.lang.reflect.MethodSignature;
    7. import org.springframework.beans.factory.annotation.Value;
    8. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    9. import org.springframework.context.EnvironmentAware;
    10. import org.springframework.core.env.Environment;
    11. import org.springframework.stereotype.Component;
    12. import java.util.Map;
    13. import java.util.concurrent.ConcurrentHashMap;
    14. import java.util.concurrent.TimeUnit;
    15. /**
    16. * 接口限流切面
    17. * 配合@RateLimit使用
    18. *
    19. * @author lizf
    20. * date: 2022/7/18 14:20
    21. */
    22. @Aspect
    23. @Component
    24. @ConditionalOnProperty(prefix = "rate.limit.default", name = "enabled", havingValue = "true")
    25. public class SmoothBurstyInterceptor implements EnvironmentAware {
    26. private static final Log log = LogFactory.getLog(SmoothBurstyInterceptor.class);
    27. private static final Map EF_RATE_LIMITER_MAP = new ConcurrentHashMap<>();
    28. private Environment environment;
    29. @Value("${rate.limit.default.permitsPerSecond:1000}")
    30. private double defaultPermitsPerSecond;
    31. @Pointcut("@annotation(rateLimit)")
    32. public void pointCut(RateLimit rateLimit) {
    33. }
    34. @Around(value = "pointCut(rateLimit)")
    35. public Object around(ProceedingJoinPoint pjp, RateLimit rateLimit) throws Throwable {
    36. MethodSignature signature = (MethodSignature) pjp.getSignature();
    37. String className = pjp.getTarget().getClass().getSimpleName();
    38. String methodName = signature.getName();
    39. String rateLimitName = environment.resolvePlaceholders(rateLimit.name());
    40. if (EmptyUtil.isEmpty(rateLimitName) || rateLimitName.contains("${")) {
    41. rateLimitName = className + "-" + methodName;
    42. }
    43. EfRateLimiter rateLimiter = this.getRateLimiter(rateLimitName, rateLimit);
    44. boolean success = rateLimiter.tryAcquire();
    45. Object[] args = pjp.getArgs();
    46. if (success) {
    47. return pjp.proceed(args);
    48. } else {
    49. log.error("MQ_HDL > {}.{}(), rate limiting, parameters[{}]", className, methodName, args);
    50. throw new BizException(EfMessageCode.ERR_INTCPT_RATE_LIMIT, "接口访问太过频繁,请稍候再试");
    51. }
    52. }
    53. private EfRateLimiter getRateLimiter(String key, RateLimit rateLimit) {
    54. EfRateLimiter efRateLimiter = EF_RATE_LIMITER_MAP.get(key);
    55. if (efRateLimiter == null) {
    56. synchronized (this) {
    57. if ((efRateLimiter = EF_RATE_LIMITER_MAP.get(key)) == null) {
    58. String permitsPerSecondStr = environment.resolvePlaceholders(rateLimit.permitsPerSecond());
    59. double permitsPerSecond = defaultPermitsPerSecond;
    60. if (EmptyUtil.isNotEmpty(permitsPerSecondStr) && !permitsPerSecondStr.contains("${")) {
    61. permitsPerSecond = Double.valueOf(permitsPerSecondStr);
    62. }
    63. efRateLimiter = new EfRateLimiter();
    64. RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);
    65. String timeoutStr = environment.resolvePlaceholders(rateLimit.timeout());
    66. long timeout = 0L;
    67. if (EmptyUtil.isNotEmpty(timeoutStr) && !timeoutStr.contains("${")) {
    68. timeout = Math.max(Integer.valueOf(timeoutStr), 0L);
    69. }
    70. TimeUnit timeUnit = rateLimit.timeUnit();
    71. efRateLimiter.setRateLimiter(rateLimiter);
    72. efRateLimiter.setTimeout(timeout);
    73. efRateLimiter.setTimeUnit(timeUnit);
    74. EF_RATE_LIMITER_MAP.putIfAbsent(key, efRateLimiter);
    75. }
    76. }
    77. }
    78. return efRateLimiter;
    79. }
    80. /**
    81. * Set the {@code Environment} that this component runs in.
    82. *
    83. * @param environment
    84. */
    85. @Override
    86. public void setEnvironment(Environment environment) {
    87. this.environment = environment;
    88. }
    89. }

    5.在接口中使用@RateLimit来开启限流:

    1. @Controller
    2. @RequestMapping(value = "/rttp")
    3. public class AdapterController {
    4. @ApiOperation("外部使用json调用")
    5. @PostMapping(path = "/service4OutAppJson", consumes = "text/plain", produces = "text/plain")
    6. @ResponseBody
    7. @RateLimit(permitsPerSecond = "${rate.limit.adapter.out.permitsPerSecond:}", timeout = "${rate.limit.adapter.out.timeout:}")
    8. public String service4OutAppJson(@RequestBody String json) {
    9. Request req = new Request();
    10. req.setJson(json);
    11. Response resp = new Response<>();
    12. Transaction tran = new Transaction<>(req, resp, outAppCodeLocId);
    13. handler.handle(tran);
    14. return tran.getResp().getContent();
    15. }
    16. }

  • 相关阅读:
    Azure DevOps (九) 通过流水线推送镜像到Registry
    MR混合现实情景实训教学系统在旅游管理专业中的应用
    解决新创建的anaconda环境在C:\Users\xxx\.conda\envs\,而不在anaconda安装目录下的envs中
    基于PCIe的NVMe协议在FPGA中实现方法
    CC2642 OAD文件合成
    在Java中使用XxlCrawler时防止被反爬的几种方式
    为什么大家都开始做游戏化产品?
    如何修改别人的神经网络,人工神经网络通过调整
    记录一次centos7增加crontab定时任务
    CVPR 2022 Paper Reading List
  • 原文地址:https://blog.csdn.net/lzhfdxhxm/article/details/125872086