• Spring cloud gateway 如何在路由时进行负载均衡


    本文为博主原创,转载请注明出处:

     

    1.spring cloud gateway 配置路由

      在网关模块的配置文件中配置路由:

    复制代码
    spring:
      cloud:
        gateway:
          routes:
            - id: user
              uri: lb://user-server
              predicates:
                - Path=/api-web/**    #前端访问需加入例如 http:ip:port/api-web
              filters:
                - StripPrefix=1   #访问后端服务过滤掉m 必填否则找不到后端服务也可以在服务加上统一路径
    复制代码

      其中lb表示采用了负载均衡,user-server表示服务名

      当后端有多个服务节点时,网关会以负载均衡的方式将请求发送到后端的各个服务节点上,当某个服务节点关闭以后,后续的请求不会发送到该节点上。这个过程会存在一定的时间延迟,比如30秒左右。

     

    2.查看 GatewayLoadBalancerClientAutoConfiguration 的配置类

      这个配置类会加载一个过滤器,使用这个过滤器可以实现负载均衡

    复制代码
    @Configuration(
        proxyBeanMethods = false
    )
    @ConditionalOnClass({LoadBalancerClient.class, RibbonAutoConfiguration.class, DispatcherHandler.class})
    @AutoConfigureAfter({RibbonAutoConfiguration.class})
    @EnableConfigurationProperties({LoadBalancerProperties.class})
    public class GatewayLoadBalancerClientAutoConfiguration {
        public GatewayLoadBalancerClientAutoConfiguration() {
        }
    
        @Bean
        @ConditionalOnBean({LoadBalancerClient.class})
        @ConditionalOnMissingBean({LoadBalancerClientFilter.class, ReactiveLoadBalancerClientFilter.class})
        public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client, LoadBalancerProperties properties) {
            // 会加载一个负载均衡的 过滤器 :LoadBalancerClientFilter
            return new LoadBalancerClientFilter(client, properties);
        }
    }
    复制代码

     

    3.查看 LoadBalancerClientFilter 过滤器的实现

      查看该过滤器的实现

    复制代码
    public class LoadBalancerClientFilter implements GlobalFilter, Ordered {  
    public
    static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100; private static final Log log = LogFactory.getLog(LoadBalancerClientFilter.class); protected final LoadBalancerClient loadBalancer; private LoadBalancerProperties properties; public LoadBalancerClientFilter(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) { this.loadBalancer = loadBalancer; this.properties = properties; } public int getOrder() { return 10100; } public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR); if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) { ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url before: " + url); } ServiceInstance instance = this.choose(exchange); if (instance == null) { throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost()); } else { URI uri = exchange.getRequest().getURI(); String overrideScheme = instance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } URI requestUrl = this.loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl); return chain.filter(exchange); } } else { return chain.filter(exchange); } } protected ServiceInstance choose(ServerWebExchange exchange) {
        // 该loadBalancer 为ribbon 配置的负载均衡器,会根据指定的规则进行负载均衡,默认是轮询            
        return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());  
      }
    }
    复制代码

      

      NettyRoutingFilter 过滤器:

      

    复制代码
    public class NettyRoutingFilter implements GlobalFilter, Ordered {
        private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);
        private final HttpClient httpClient;
        private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;
        private final HttpClientProperties properties;
        private volatile List<HttpHeadersFilter> headersFilters;
    
        public NettyRoutingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider, HttpClientProperties properties) {
            this.httpClient = httpClient;
            this.headersFiltersProvider = headersFiltersProvider;
            this.properties = properties;
        }
    
        public List<HttpHeadersFilter> getHeadersFilters() {
            if (this.headersFilters == null) {
                this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable();
            }
    
            return this.headersFilters;
        }
    
        public int getOrder() {
            return 2147483647;
        }
    
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
            String scheme = requestUrl.getScheme();
            if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("http".equals(scheme) || "https".equals(scheme))) {
                ServerWebExchangeUtils.setAlreadyRouted(exchange);
                ServerHttpRequest request = exchange.getRequest();
                HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
                String url = requestUrl.toASCIIString();
                HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);
                DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
                filtered.forEach(httpHeaders::set);
                boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);
                Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                Flux<HttpClientResponse> responseFlux = ((RequestSender)this.getHttpClient(route, exchange).headers((headers) -> {
                    headers.add(httpHeaders);
                    headers.remove("Host");
                    if (preserveHost) {
                        String host = request.getHeaders().getFirst("Host");
                        headers.add("Host", host);
                    }
    
                }).request(method).uri(url)).send((req, nettyOutbound) -> {
                    if (log.isTraceEnabled()) {
                        nettyOutbound.withConnection((connection) -> {
                            log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix());
                        });
                    }
    
                    return nettyOutbound.send(request.getBody().map(this::getByteBuf));
                }).responseConnection((res, connection) -> {
                    exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR, res);
                    exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR, connection);
                    ServerHttpResponse response = exchange.getResponse();
                    HttpHeaders headers = new HttpHeaders();
                    res.responseHeaders().forEach((entry) -> {
                        headers.add((String)entry.getKey(), (String)entry.getValue());
                    });
                    String contentTypeValue = headers.getFirst("Content-Type");
                    if (StringUtils.hasLength(contentTypeValue)) {
                        exchange.getAttributes().put("original_response_content_type", contentTypeValue);
                    }
    
                    this.setResponseStatus(res, response);
                    HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.getHeadersFilters(), headers, exchange, Type.RESPONSE);
                    if (!filteredResponseHeaders.containsKey("Transfer-Encoding") && filteredResponseHeaders.containsKey("Content-Length")) {
                        response.getHeaders().remove("Transfer-Encoding");
                    }
    
                    exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
                    response.getHeaders().putAll(filteredResponseHeaders);
                    return Mono.just(res);
                });
                Duration responseTimeout = this.getResponseTimeout(route);
                if (responseTimeout != null) {
                    responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class, (th) -> {
                        return new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th);
                    });
                }
    
                return responseFlux.then(chain.filter(exchange));
            } else {
                return chain.filter(exchange);
            }
        }
    
        protected ByteBuf getByteBuf(DataBuffer dataBuffer) {
            if (dataBuffer instanceof NettyDataBuffer) {
                NettyDataBuffer buffer = (NettyDataBuffer)dataBuffer;
                return buffer.getNativeBuffer();
            } else if (dataBuffer instanceof DefaultDataBuffer) {
                DefaultDataBuffer buffer = (DefaultDataBuffer)dataBuffer;
                return Unpooled.wrappedBuffer(buffer.getNativeBuffer());
            } else {
                throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());
            }
        }
    
        private void setResponseStatus(HttpClientResponse clientResponse, ServerHttpResponse response) {
            HttpStatus status = HttpStatus.resolve(clientResponse.status().code());
            if (status != null) {
                response.setStatusCode(status);
            } else {
                while(true) {
                    if (!(response instanceof ServerHttpResponseDecorator)) {
                        if (!(response instanceof AbstractServerHttpResponse)) {
                            throw new IllegalStateException("Unable to set status code " + clientResponse.status().code() + " on response of type " + response.getClass().getName());
                        }
    
                        ((AbstractServerHttpResponse)response).setStatusCodeValue(clientResponse.status().code());
                        break;
                    }
    
                    response = ((ServerHttpResponseDecorator)response).getDelegate();
                }
            }
    
        }
    
        protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {
            Object connectTimeoutAttr = route.getMetadata().get("connect-timeout");
            if (connectTimeoutAttr != null) {
                Integer connectTimeout = getInteger(connectTimeoutAttr);
                return this.httpClient.tcpConfiguration((tcpClient) -> {
                    return tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
                });
            } else {
                return this.httpClient;
            }
        }
    
        static Integer getInteger(Object connectTimeoutAttr) {
            Integer connectTimeout;
            if (connectTimeoutAttr instanceof Integer) {
                connectTimeout = (Integer)connectTimeoutAttr;
            } else {
                connectTimeout = Integer.parseInt(connectTimeoutAttr.toString());
            }
    
            return connectTimeout;
        }
    
        private Duration getResponseTimeout(Route route) {
            Object responseTimeoutAttr = route.getMetadata().get("response-timeout");
            Long responseTimeout = null;
            if (responseTimeoutAttr != null) {
                if (responseTimeoutAttr instanceof Number) {
                    responseTimeout = ((Number)responseTimeoutAttr).longValue();
                } else {
                    responseTimeout = Long.valueOf(responseTimeoutAttr.toString());
                }
            }
    
            return responseTimeout != null ? Duration.ofMillis(responseTimeout) : this.properties.getResponseTimeout();
        }
    }
    复制代码

      

      在NettyRoutingFilter中根据GATEWAY_REQUEST_URL_ATTR属性读取requestUrl,然后进行相应请求。

       LoadBalancerClientFilter会作用在url以lb开头的路由,然后利用loadBalancer来获取服务实例,构造目标requestUrl,设置到GATEWAY_REQUEST_URL_ATTR属性中,供NettyRoutingFilter使用。
     

      

  • 相关阅读:
    Docker常用命令
    金蝶云星空采购申请单计算最低价
    投资黄金:如何选择正确的黄金品种增加收益?
    Android-第十三节04Room框架详解
    案例研究|腾讯音乐娱乐集团与JumpServer共探安全运维审计解决方案
    文档管理软件中真正重要的 10 件事
    STM32+摁键与定时器实现Led灯控制(中断)
    RecyclerView高效使用第二节
    Linux学习7—软件管理
    自动驾驶 知识点 Review 2D 感知算法 一(两阶段法 R-CNN系列,FPN,R-FCN)
  • 原文地址:https://www.cnblogs.com/zjdxr-up/p/16361460.html