• 【Java编程系列】gateway限流实践时发生的问题和解决方案


    前期回顾:

    【Java编程系列】Springcloud-gateway自带限流方案实践篇


    1、实践中发生的问题

    主要有以下几个问题:

    1、限流返回的响应数据无法自定义

    (LogFormatUtils.java:91) - [7b93af46-20] Completed 429 TOO_MANY_REQUESTS

    返回后显示的情况如下:


    2、默认的限流器(RequestRateLimiterGatewayFilterFactory)会发生异常

    有一部分情况会出现:Error [java.lang.UnsupportedOperationException]

    详细信息如下:

    1. 2023-04-25 12:27:10.628 [NGIEGzOnuguYdozn] ERROR (HttpWebHandlerAdapter.java:295) - [7533b14d-23] Error [java.lang.UnsupportedOperationException] for HTTP POST "***/activity/interaction", but ServerHttpResponse already committed (429 TOO_MANY_REQUESTS)
    2. 2023-04-25 12:27:10.641 [NGIEGzOnuguYdozn] ERROR (Loggers.java:319) - [id: 0x7533b14d, L:/0:0:0:0:0:0:0:1:18085 - R:/0:0:0:0:0:0:0:1:48745] Error starting response. Replying error status
    3. java.lang.UnsupportedOperationException: null
    4. at org.springframework.http.ReadOnlyHttpHeaders.add(ReadOnlyHttpHeaders.java:91)
    5. Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
    6. Error has been observed at the following site(s):
    7. |_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
    8. |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
    9. |_ checkpoint ⇢ HTTP POST "/***/***/interact" [Exception***Handler]
    10. Stack trace:
    11. at org.springframework.http.ReadOnlyHttpHeaders.add(ReadOnlyHttpHeaders.java:91)
    12. at org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory.lambda$null$0(RequestRateLimiterGatewayFilterFactory.java:120)
    13. at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
    14. at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
    15. at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
    16. at reactor.core.publisher.MonoReduceSeed$ReduceSeedSubscriber.onComplete(MonoReduceSeed.java:156)
    17. at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1939)
    18. at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:402)
    19. at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:536)
    20. at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:81)
    21. at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:803)
    22. at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:589)
    23. at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
    24. at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:455)
    25. at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:137)
    26. at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
    27. at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
    28. at reactor.core.publisher.FluxMerge.subscribe(FluxMerge.java:69)
    29. at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    30. at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:394)
    31. at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    32. at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1939)
    33. at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:359)
    34. at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:268)
    35. at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1939)
    36. at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:252)
    37. at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onComplete(RedisPublisher.java:895)
    38. at io.lettuce.core.RedisPublisher$State.onAllDataRead(RedisPublisher.java:673)
    39. at io.lettuce.core.RedisPublisher$State$3.read(RedisPublisher.java:587)
    40. at io.lettuce.core.RedisPublisher$State$3.onDataAvailable(RedisPublisher.java:544)
    41. at io.lettuce.core.RedisPublisher$RedisSubscription.onDataAvailable(RedisPublisher.java:313)
    42. at io.lettuce.core.RedisPublisher$RedisSubscription.onAllDataRead(RedisPublisher.java:328)
    43. at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:758)
    44. at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
    45. at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
    46. at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
    47. at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    48. at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    49. at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
    50. at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    51. at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
    52. at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    53. at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    54. at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    55. at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    56. at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    57. at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    58. at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    59. at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    60. at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    61. at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    62. at java.lang.Thread.run(Thread.java:748)

    3、在nacos配置gateway的限流配置不生效问题

    在nacos的配置中,配置gateway的限流配置,如下:

    1. - id: ***service
    2. uri: 'lb://***service'
    3. order: 0
    4. filters: []
    5. predicates:
    6. - args:
    7. pattern: /xxx/**
    8. name: Path
    9. - id: ***service-limiter
    10. uri: lb://***service
    11. order: 0
    12. predicates:
    13. - Path= /xxx/activity/**
    14. filters:
    15. - name: GatewayRequestRateLimiter
    16. args:
    17. key-resolver: "#{@remoteAddrKeyResolver}"
    18. redis-rate-limiter.replenishRate: 20 # 令牌桶填充的速率 秒为单位
    19. redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
    20. redis-rate-limiter.requestedTokens: 1 # 每次请求获取的令牌数

    你会发现这样配置是不会生效的~~~,测试时都有点不理解。。。


    2、解决方案

    首先,我们分析一下2种情况导致的原因,

    第一个问题,因为源码的过滤器RequestRateLimiterGatewayFilterFactory中,会将限流拦截的请求的http status code设置为429,但是具体的内容格式却不是JSON格式,导致我们看到的响应结果如上图所示。

    第二个问题,通过报错内容提示,我们可以找到源码的120行:

    at org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory.

    lambda$null$0(RequestRateLimiterGatewayFilterFactory.java:120)

    源码如下:

    这里有往header里加参数,但是提示显示,是 ReadOnlyHttpHeaders.add,只读的headers,是不可以添加操作的,所以抛出了UnsupportedOperationException的异常:

     所以,

    这2个问题基本都是由于源代码的过滤器所导致,这里要解决问题,我们可以自定义一个过滤器替代一下,代码如下:

    1. package ***.filters;
    2. import com.alibaba.fastjson.JSONObject;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.cloud.gateway.filter.GatewayFilter;
    5. import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
    6. import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
    7. import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
    8. import org.springframework.cloud.gateway.route.Route;
    9. import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
    10. import org.springframework.core.io.buffer.DataBuffer;
    11. import org.springframework.http.HttpStatus;
    12. import org.springframework.http.server.reactive.ServerHttpResponse;
    13. import org.springframework.stereotype.Component;
    14. import reactor.core.publisher.Mono;
    15. import java.nio.charset.StandardCharsets;
    16. import java.util.Map;
    17. /**
    18. * @Date: 2023/4/25 10:44
    19. * @Description gateway限流limiter,重写限流原过滤器RequestRateLimiterGatewayFilterFactory
    20. */
    21. @Component
    22. @Slf4j
    23. public class GatewayRequestRateLimiter extends RequestRateLimiterGatewayFilterFactory {
    24. private final RateLimiter defaultRateLimiter;
    25. private final KeyResolver defaultKeyResolver;
    26. public GatewayRequestRateLimiter(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
    27. super(defaultRateLimiter, defaultKeyResolver);
    28. this.defaultRateLimiter = defaultRateLimiter;
    29. this.defaultKeyResolver = defaultKeyResolver;
    30. }
    31. @Override
    32. public GatewayFilter apply(Config config) {
    33. KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
    34. RateLimiter limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
    35. return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
    36. String routeId = config.getRouteId();
    37. if (routeId == null) {
    38. Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
    39. routeId = route.getId();
    40. }
    41. String finalRouteId = routeId;
    42. return limiter.isAllowed(routeId, key).flatMap(response -> {
    43. for (Map.Entry header : response.getHeaders().entrySet()) {
    44. exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
    45. }
    46. if (response.isAllowed()) {
    47. return chain.filter(exchange);
    48. }
    49. ServerHttpResponse httpResponse = exchange.getResponse();
    50. //修改code为500
    51. httpResponse.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
    52. if (!httpResponse.getHeaders().containsKey("Content-Type")) {
    53. httpResponse.getHeaders().add("Content-Type", "application/json");
    54. }
    55. //此处无法触发全局异常处理,手动返回
    56. JSONObject object = new JSONObject();
    57. object.put("status","429");
    58. object.put("message","请求已被限流");
    59. DataBuffer buffer = httpResponse.bufferFactory().wrap(object.toJSONString().getBytes(StandardCharsets.UTF_8));
    60. return httpResponse.writeWith(Mono.just(buffer));
    61. });
    62. });
    63. }
    64. private T getOrDefault(T configValue, T defaultValue) {
    65. return (configValue != null) ? configValue : defaultValue;
    66. }
    67. }
    68. 然后,替换掉原来的gateway中的配置项:

      1. filters:
      2. - name: GatewayRequestRateLimiter #替换原默认过滤器RequestRateLimiter
      3. args:
      4. key-resolver: "#{@remoteAddrKeyResolver}"
      5. redis-rate-limiter.replenishRate: 20 # 令牌桶填充的速率 秒为单位
      6. redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
      7. redis-rate-limiter.requestedTokens: 1 # 每次请求获取的令牌数

      然后再来看看效果,返回结果:

      第三个问题,是因为该服务的限流配置的过滤器优先级,和要限流的服务本身的路由配置,两者之间的执行前后优先级一样或是限流的在服务本身配置之后了,简单说就是order的值如果更大,就会越迟执行,所以才导致限流配置没生效。。改成如下即可:

      1. - id: ***service-limiter
      2. uri: lb://***service
      3. order: -1 ##将优先级改小,即提高优先级
      4. predicates:
      5. - Path= /xxx/activity/**
      6. filters:
      7. - name: GatewayRequestRateLimiter
      8. args:
      9. key-resolver: "#{@remoteAddrKeyResolver}"
      10. redis-rate-limiter.replenishRate: 20 # 令牌桶填充的速率 秒为单位
      11. redis-rate-limiter.burstCapacity: 20 # 令牌桶总容量
      12. redis-rate-limiter.requestedTokens: 1 # 每次请求获取的令牌数

       好啦,至此限流的优化处理,基本完成啦。。。

      最后,对于本文有疑问的地方,欢迎下方留言讨论,喜欢的朋友,请帮忙一键三连~~~

    69. 相关阅读:
      Python武器库开发-flask篇之error404(二十七)
      程序员必读书籍有哪些值得推荐?
      mediasoup webrtc音视频会议搭建
      chatgpt相关问题解答
      新型大数据架构之湖仓一体(Lakehouse)架构特性说明——Lakehouse 架构(一)
      go——内存分配机制
      C++制作游戏引擎之一 方向键控制地球上下左右乱跑
      C++项目:高并发内存池
      CAP理论和BASE思想
      探花交友_第3章_今日佳人功能实现
    70. 原文地址:https://blog.csdn.net/yy339452689/article/details/130362083