目录
接口限流,支持通过配置文件设置是否开启限流,限流的大小,以及超时时间
常用限流算法:漏桶算法、令牌桶算法、滑动窗口(计数器)算法
漏桶非常均匀的控制流量,如果漏桶满了,后续的水全部会溢出,用它来作为应用层限流是不合适的。如果有大量的用户访问,会导致后面的用户全部拒绝服务,给人的感觉就像服务挂了一样。
令牌桶算法恰好相反,桶里放的不是请求,而是令牌。当请求到来时,需要从桶中拿到一个令牌才能获取服务,否则该请求会被拒绝。由于令牌桶是动态变化的,令牌消耗完了会继续往里放,因此就不存在漏桶那样后面的用户拿不到令牌的情况,是一个比较平滑的过程。
令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;
guava的RateLimiter使用的是令牌桶算法,有两种实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)
AOP
-
-
org.springframework.boot -
spring-boot-starter-aop -
guava
-
-
com.google.guava -
guava -
20.0 -
-
- import org.springframework.beans.factory.annotation.Required;
-
- import java.lang.annotation.Documented;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 自定义注解,用于接口的限流
- *
- * @author lizf
- * date: 2022/7/18 11:53
- */
- @Target({ElementType.METHOD})
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface RateLimit {
- /**
- * 限流器名称,如果不设置,默认是类名加方法名。如果多个接口设置了同一个名称,那么使用同一个限流器
- *
- * @return
- */
- String name() default "";
-
- /**
- * 一秒内允许通过的请求数QPS
- *
- * @return
- */
- @Required
- String permitsPerSecond();
-
- /**
- * 获取令牌超时时间
- *
- * @return
- */
- String timeout() default "0";
-
- /**
- * 获取令牌超时时间单位
- *
- * @return
- */
- TimeUnit timeUnit() default TimeUnit.SECONDS;
- }
-
- import com.google.common.util.concurrent.RateLimiter;
-
- import java.util.concurrent.TimeUnit;
-
- /**
- * 接口限流器
- *
- * @author lizf
- * date: 2022/7/18 14:20
- */
- public class EfRateLimiter {
- private RateLimiter rateLimiter;
- private long timeout;
- private TimeUnit timeUnit;
-
- public RateLimiter getRateLimiter() {
- return rateLimiter;
- }
-
- public void setRateLimiter(RateLimiter rateLimiter) {
- this.rateLimiter = rateLimiter;
- }
-
- public long getTimeout() {
- return timeout;
- }
-
- public void setTimeout(long timeout) {
- this.timeout = timeout;
- }
-
- public TimeUnit getTimeUnit() {
- return timeUnit;
- }
-
- public void setTimeUnit(TimeUnit timeUnit) {
- this.timeUnit = timeUnit;
- }
-
- public boolean tryAcquire() {
- return rateLimiter.tryAcquire(timeout, timeUnit);
- }
-
- public boolean tryAcquire(int permits) {
- return rateLimiter.tryAcquire(permits, timeout, timeUnit);
- }
-
- }
这里使用的切点、通知,采用了增强的方式,可以直接在通知的参数里获取自定义注解里的内容,省却了通过反射来获取注解里的内容。参考链接:
AOP高级特性,Advice Parameters,在拦截方法里配置参数、自定义注解对象等_lzhfdxhxm的博客-CSDN博客
-
- import com.google.common.util.concurrent.RateLimiter;
- import org.aspectj.lang.ProceedingJoinPoint;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.aspectj.lang.annotation.Pointcut;
- import org.aspectj.lang.reflect.MethodSignature;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.context.EnvironmentAware;
- import org.springframework.core.env.Environment;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 接口限流切面
- * 配合@RateLimit使用
- *
- * @author lizf
- * date: 2022/7/18 14:20
- */
- @Aspect
- @Component
- @ConditionalOnProperty(prefix = "rate.limit.default", name = "enabled", havingValue = "true")
- public class SmoothBurstyInterceptor implements EnvironmentAware {
- private static final Log log = LogFactory.getLog(SmoothBurstyInterceptor.class);
- private static final Map
EF_RATE_LIMITER_MAP = new ConcurrentHashMap<>(); - private Environment environment;
-
- @Value("${rate.limit.default.permitsPerSecond:1000}")
- private double defaultPermitsPerSecond;
-
- @Pointcut("@annotation(rateLimit)")
- public void pointCut(RateLimit rateLimit) {
- }
-
- @Around(value = "pointCut(rateLimit)")
- public Object around(ProceedingJoinPoint pjp, RateLimit rateLimit) throws Throwable {
- MethodSignature signature = (MethodSignature) pjp.getSignature();
- String className = pjp.getTarget().getClass().getSimpleName();
- String methodName = signature.getName();
- String rateLimitName = environment.resolvePlaceholders(rateLimit.name());
- if (EmptyUtil.isEmpty(rateLimitName) || rateLimitName.contains("${")) {
- rateLimitName = className + "-" + methodName;
- }
-
- EfRateLimiter rateLimiter = this.getRateLimiter(rateLimitName, rateLimit);
- boolean success = rateLimiter.tryAcquire();
- Object[] args = pjp.getArgs();
- if (success) {
- return pjp.proceed(args);
- } else {
- log.error("MQ_HDL > {}.{}(), rate limiting, parameters[{}]", className, methodName, args);
- throw new BizException(EfMessageCode.ERR_INTCPT_RATE_LIMIT, "接口访问太过频繁,请稍候再试");
- }
- }
-
- private EfRateLimiter getRateLimiter(String key, RateLimit rateLimit) {
- EfRateLimiter efRateLimiter = EF_RATE_LIMITER_MAP.get(key);
- if (efRateLimiter == null) {
- synchronized (this) {
- if ((efRateLimiter = EF_RATE_LIMITER_MAP.get(key)) == null) {
- String permitsPerSecondStr = environment.resolvePlaceholders(rateLimit.permitsPerSecond());
- double permitsPerSecond = defaultPermitsPerSecond;
- if (EmptyUtil.isNotEmpty(permitsPerSecondStr) && !permitsPerSecondStr.contains("${")) {
- permitsPerSecond = Double.valueOf(permitsPerSecondStr);
- }
- efRateLimiter = new EfRateLimiter();
- RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);
- String timeoutStr = environment.resolvePlaceholders(rateLimit.timeout());
- long timeout = 0L;
- if (EmptyUtil.isNotEmpty(timeoutStr) && !timeoutStr.contains("${")) {
- timeout = Math.max(Integer.valueOf(timeoutStr), 0L);
- }
- TimeUnit timeUnit = rateLimit.timeUnit();
-
- efRateLimiter.setRateLimiter(rateLimiter);
- efRateLimiter.setTimeout(timeout);
- efRateLimiter.setTimeUnit(timeUnit);
- EF_RATE_LIMITER_MAP.putIfAbsent(key, efRateLimiter);
- }
- }
- }
-
- return efRateLimiter;
- }
-
- /**
- * Set the {@code Environment} that this component runs in.
- *
- * @param environment
- */
- @Override
- public void setEnvironment(Environment environment) {
- this.environment = environment;
- }
- }
- @Controller
- @RequestMapping(value = "/rttp")
- public class AdapterController {
- @ApiOperation("外部使用json调用")
- @PostMapping(path = "/service4OutAppJson", consumes = "text/plain", produces = "text/plain")
- @ResponseBody
- @RateLimit(permitsPerSecond = "${rate.limit.adapter.out.permitsPerSecond:}", timeout = "${rate.limit.adapter.out.timeout:}")
- public String service4OutAppJson(@RequestBody String json) {
- Request req = new Request();
- req.setJson(json);
- Response
resp = new Response<>(); - Transaction
tran = new Transaction<>(req, resp, outAppCodeLocId); - handler.handle(tran);
- return tran.getResp().getContent();
- }
- }