限流,就是在某个时间窗口对资源访问做次数限制,如设定每秒最多100个访问请求。限流具有两个维度,如下
在实际开发中,不止设置一种限流规则,而是设置多个限流规则共同作用,常见限流规则如下
限流常见方法如下
在springboot工程中,通过aop的方式实现限流,应用场景有采用guava的速率限流、采用redis的指定时间指定次数限制、socket请求限制、http请求限制。
实例1:guava速率限流
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
//默认每秒桶中漏掉的token
int limit() default 10;
}
@Aspect
@Component
public class RateLimitAspect {
/**
* 用来存放不同接口的RateLimiter(key为接口名称,value为RateLimiter)
*/
private ConcurrentHashMap map = new ConcurrentHashMap<>();
private RateLimiter rateLimiter;
@Pointcut("@annotation(com..annotation.RateLimit)")
public void limitPointCut() {
}
@Around("limitPointCut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
RateLimit annotation = methodSignature.getMethod().getAnnotation(RateLimit.class);
int limit = annotation.limit();
// 注解所在方法名区分不同的限流策略
String key = methodSignature.getName();
//获取rateLimiter
if (!map.containsKey(key)) {
map.put(key, RateLimiter.create(limit-1));
}
rateLimiter = map.get(key);
if (rateLimiter.tryAcquire()) {
//执行方法
return joinPoint.proceed();
} else {
System.out.println("在1秒内已经超过" + limit + "次请求,不再向下发起请求.");
return null;
}
}
}
@RateLimit(limit = 3)
@GetMapping(value = "/hello2", produces = "application/json;charset=UTF-8")
public ResponseEntity hello2() {
String result = testService.hello();
return ResponseEntity.ok("每秒内只接受3个请求:" + System.currentTimeMillis() + result);
}
实例2:redis指定时间指定次数限制
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RequestLimit {
int limit() default 10;
int time() default 10;
}
@Aspect
@Component
public class RequestLimitAspect {
private final HttpServletRequest request;
private final RedisService redisService;
@Autowired
public RequestLimitAspect(HttpServletRequest request, RedisService redisService) {
this.request = request;
this.redisService = redisService;
}
@Pointcut("@annotation(com..annotation.RequestLimit)")
public void limitPointCut() {
}
@Around("limitPointCut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
RequestLimit requestLimit = methodSignature.getMethod().getAnnotation(RequestLimit.class);
if (requestLimit != null) {
String key = IPUtil.getIpAddress(request) + ":" + methodSignature.getName();
int limit = requestLimit.limit();
int time = requestLimit.time();
if (redisService.hasKey(key)) {
String count = redisService.get(key).toString();
int countRequest = Integer.parseInt(count);
if (countRequest >= limit) {
System.out.println("在" + time + "秒内已经超过" + limit + "次请求,不再向下发起请求.");
return null;
} else {
redisService.incr(key, 1);
//return joinPoint.proceed();
}
} else {
redisService.set(key, 1, time);
//return joinPoint.proceed();
}
}
return joinPoint.proceed();
}
}
@RequestLimit(time = 60, limit = 3)
@GetMapping(value = "/hello", produces = "application/json;charset=UTF-8")
public ResponseEntity hello() {
String result = testService.hello();
return ResponseEntity.ok("60s内只接受3个请求:" + System.currentTimeMillis() + result);
}
或切面环绕无返回,请求返回为空
@Aspect
@Component
public class RequestLimitAspect2 {
private final HttpServletRequest request;
private final RedisService redisService;
@Autowired
public RequestLimitAspect2(HttpServletRequest request, RedisService redisService) {
this.request = request;
this.redisService = redisService;
}
@Pointcut("@annotation(com.ldc.springboot_ratelimiter.annotation.RequestLimit2)")
public void limitPointCut() {
}
@Around("limitPointCut()")
public void around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
RequestLimit2 requestLimit2 = methodSignature.getMethod().getAnnotation(RequestLimit2.class);
if (requestLimit2 != null) {
String key = IPUtil.getIpAddress(request) + ":" + methodSignature.getName();
int limit = requestLimit2.limit();
int time = requestLimit2.time();
if (redisService.hasKey(key)) {
String count = redisService.get(key).toString();
int countRequest = Integer.parseInt(count);
if (countRequest >= limit) {
System.out.println("在" + time + "秒内已经超过" + limit + "次请求,不再向下发起请求.");
return;
} else {
redisService.incr(key, 1);
joinPoint.proceed();
}
} else {
redisService.set(key, 1, time);
joinPoint.proceed();
}
}
}
}
实例3:socket请求限流
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SocketRequestLimit {
String event() default "";
int limit() default 10;
int time() default 10;
}
@Aspect
@Component
public class SocketRequestLimitAspect {
private static final Logger logger = LoggerFactory.getLogger(SocketRequestLimitAspect.class);
private final RedisService redisService;
@Autowired
public SocketRequestLimitAspect(RedisService redisService) {
this.redisService = redisService;
}
@Pointcut("@annotation(com..annotation.SocketRequestLimit)")
public void pointcut(){}
@Around("pointcut()")
public void around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = joinPoint.getTarget().getClass().getMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
SocketRequestLimit annotation = method.getAnnotation(SocketRequestLimit.class);
String event = annotation.event();
SocketIOClient socketIOClient = null;
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if(arg instanceof SocketIOClient) {
socketIOClient = (SocketIOClient) arg;
}
}
if (annotation != null) {
String ip = socketIOClient.getRemoteAddress().toString();
String key = ip.substring(0, ip.lastIndexOf(":")) + ":" + event;
int limit = annotation.limit();
int time = annotation.time();
if (redisService.hasKey(key)) {
String count = redisService.get(key).toString();
int countRequest = Integer.parseInt(count);
if (countRequest >= limit) {
System.out.println("在" + time + "秒内已经超过" + limit + "次请求,不再向下发起请求.");
return;
} else {
redisService.incr(key, 1);
joinPoint.proceed();
}
} else {
redisService.set(key, 1, time);
joinPoint.proceed();
}
}
}
}
@SocketRequestLimit(event = "EVENT",limit = 3,time = 60)
@OnEvent(value = "EVENT")
public void EVENT(SocketIOClient client) {
String result = testService.hello2();
client.sendEvent("EVENT", JSON.toJSONString(result));
}
页面1秒发起1个请求,在10秒内共发起了10个请求,在60秒内允许向下请求3个,丢弃了7个请求,下游服务10秒响应1个,页面最终共收到3个响应结果。
实例4:多个切面组件,指定增强顺序
通过@Order注解即可指定执行的先后顺序
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SocketRequestLimit {
String event() default "";
int limit() default 10;
int time() default 10;
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SocketRequestTimeOutAnnotation {
long timeout() default 3;
String event() default "";
}
@Order(1)
@Aspect
@Component
public class SocketRequestLimitAspect {
private static final Logger logger = LoggerFactory.getLogger(SocketRequestLimitAspect.class);
private final RedisService redisService;
@Autowired
public SocketRequestLimitAspect(RedisService redisService) {
this.redisService = redisService;
}
@Pointcut("@annotation(com..SocketRequestLimit)")
public void pointcut(){}
@Around("pointcut()")
public void around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = joinPoint.getTarget().getClass().getMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
SocketRequestLimit annotation = method.getAnnotation(SocketRequestLimit.class);
String event = annotation.event();
SocketIOClient socketIOClient = null;
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if(arg instanceof SocketIOClient) {
socketIOClient = (SocketIOClient) arg;
}
}
if (annotation != null) {
String ip = socketIOClient.getRemoteAddress().toString();
String key = ip.substring(0, ip.lastIndexOf(":")) + ":" + event;
int limit = annotation.limit();
int time = annotation.time();
if (redisService.hasKey(key)) {
String count = redisService.get(key).toString();
int countRequest = Integer.parseInt(count);
if (countRequest >= limit) {
System.out.println("在" + time + "秒内已经超过" + limit + "次请求,不再向下发起请求.");
return;
} else {
redisService.incr(key, 1);
joinPoint.proceed();
}
} else {
redisService.set(key, 1, time);
joinPoint.proceed();
}
}
}
}
@Order(2)
@Aspect
@Component
public class SocketRequestTimeOutAspect {
private static final Logger logger = LoggerFactory.getLogger(SocketRequestTimeOutAspect.class);
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);
ExecutorService executorService = new ThreadPoolExecutor(2,2,60, TimeUnit.MILLISECONDS,blockingQueue,new ThreadPoolExecutor.DiscardOldestPolicy());
@Pointcut("@annotation(com..annotation.SocketRequestTimeOutAnnotation)")
public void pointcut(){}
@Around("pointcut()")
public void around(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = joinPoint.getTarget().getClass().getMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
SocketRequestTimeOutAnnotation annotation = method.getAnnotation(SocketRequestTimeOutAnnotation.class);
long timeout = annotation.timeout();
String event = annotation.event();
SocketIOClient socketIOClient = null;
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if(arg instanceof SocketIOClient) {
socketIOClient = (SocketIOClient) arg;
}
}
Future
请求次数限制先行增强,将请求只放行3个,然后进行3个请求超时增强,由于响应处理超时,页面收到3个超时响应,待3个请求响应完成,页面收到3个响应结果,进行了双重限流,第一层口子小,第二层没有充分利用线程池及其拒绝策略。
继续思考限流的应用场景,加油吧,少年。