• 从零开发短视频电商 使用Spring WebClient发起远程Http调用



    Spring WebClient 是 Spring WebFlux 项目中 Spring 5 中引入的异步、反应式 HTTP 客户端,用于替换旧的 RestTemplate,以便在使用 Spring Boot 框架构建的应用程序中进行 REST API 调用。

    它支持同步、异步和流式场景。

    它是一种基于 HTTP/1.1 协议的反应式、非阻塞解决方案。

    依赖

    为了使用WebClient,我们需要添加对 Spring WebFlux 启动器模块的依赖:

      <dependency>
       <groupId>org.springframework.bootgroupId>
       <artifactId>spring-boot-starter-webfluxartifactId>
      dependency>
    
    • 1
    • 2
    • 3
    • 4

    使用

    创建WebClient实例

    WebClient client = WebClient.create("http://localhost:8080");
    
    WebClient client = WebClient.builder()
      .baseUrl("http://localhost:8080") // 基本 URL
      .defaultCookie("cookieKey", "cookieValue") // 定义默认cookie
      .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) // 定义默认header
      .defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080")) // 定义默认 quringstring参数
      .build();
    
    WebClient.builder()
          .baseUrl(host)
          .exchangeStrategies(ExchangeStrategies
    	  	.builder()
    	  	.codecs(codecs -> codecs
                .defaultCodecs()
                .maxInMemorySize(500 * 1024))// 编解码器内存中数据的缓冲,默认值为 262,144 字节
    	    .build())
          .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    创建带有超时的WebClient实例

    默认的 30 秒超时

    • 通过 ChannelOption.CONNECT_TIMEOUT_MILLIS 选项设置连接超时时间
    • 使用 ReadTimeoutHandler 和 WriteTimeoutHandler 分别设置读取写入超时时间
    • 使用 responseTimeout 指令配置响应超时时间
    HttpClient httpClient = HttpClient.create()
      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间
      .responseTimeout(Duration.ofMillis(5000)) // 响应超时时间
      .doOnConnected(conn -> 
        conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS)) // 读超时
          .addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))); // 写超时
    
    WebClient client = WebClient.builder()
      .clientConnector(new ReactorClientHttpConnector(httpClient))
      .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    示例
    // 配置资源工厂,以控制连接池、线程池和其他资源的创建和管理
    ReactorResourceFactory factory = new ReactorResourceFactory();
    // 设置是否使用全局资源。设置为 false将创建独立的资源。默认情况下,为 true,表示使用全局资源。
    factory.setUseGlobalResources(false);
    // 配置连接池的提供者,控制连接的创建、分配和回收。 创建名为"httpClient"的连接池,最大连接数为 50 默认是 max(cpu,8)*2 。
    factory.setConnectionProvider(ConnectionProvider.create("httpClient", 50));
    // 用于配置事件循环资源,它管理底层事件循环线程。创建一个名为 "httpClient" 的事件循环资源,最大线程数为 50,而第三个参数 线程是否在 JVM 关闭时释放 max(cpu,4)
    factory.setLoopResources(LoopResources.create("httpClient", 50, true));
    WebClient.builder().
            baseUrl("")
            // 用于配置请求和响应的处理策略 通常用于配置序列化和反序列化。
            .exchangeStrategies(ExchangeStrategies.builder()
                    .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024))
                    .build())
            // 用于配置编解码器。
            .codecs(clientCodecConfigurer -> {
                clientCodecConfigurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024); // 设置最大内存大小
            })
            .clientConnector(new ReactorClientHttpConnector(factory, client -> client
                    // 设置连接建立的超时时间,单位为毫秒。
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    // 启用或禁用 HTTP 响应的压缩。默认情况下,压缩是禁用的。
                    .compress(true)
                    // 启用或禁用 "wiretap",这将允许你记录请求和响应的详细信息,用于调试和监控。
                    .wiretap(true)
                    .responseTimeout(Duration.ofMillis(5000)) // 响应超时时间
                    .doOnConnected(connection -> {
                        // 添加读超时处理器,单位为毫秒
                        connection.addHandlerLast(new ReadTimeoutHandler(10));
                        // 添加写超时处理器,单位为毫秒
                        connection.addHandlerLast(new WriteTimeoutHandler(8));
                    })
                    // 启用跟随重定向,默认情况下启用
                    .followRedirect(true)
            ))
            .filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
                        // 在请求发出之前记录请求信息
                        System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());
                        return Mono.just(clientRequest);
                    }).andThen(ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
                        // 在响应接收后记录响应信息
                        System.out.println("Response: " + clientResponse.statusCode());
                        return Mono.just(clientResponse);
                    }))
            ).build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    请求准备

    // 1.方法
    UriSpec<RequestBodySpec> uriSpec = client.method(HttpMethod.POST);
    UriSpec<RequestBodySpec> uriSpec = client.post();
    // 2.uri
    RequestBodySpec bodySpec = uriSpec.uri("/resource");
    RequestBodySpec bodySpec = uriSpec.uri(
      uriBuilder -> uriBuilder.pathSegment("/resource").build());
    //  "/products/2"
    RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder
        .path("/products/{id}")
        .build(2))
    //  "/products/2/attributes/13"    
    RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder
        .path("/products/{id}/attributes/{attributeId}")
        .build(2, 13))
    //	"/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"    
    RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder
        .path("/products/")
        .queryParam("name", "AndroidPhone")
        .queryParam("color", "black")
        .queryParam("deliveryDate", "13/04/2019")
        .build())
    //	"/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019" 这种'/'符被转义了
    RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder
        .path("/products/")
        .queryParam("name", "{title}")
        .queryParam("color", "{authorId}")
        .queryParam("deliveryDate", "{date}")
        .build("AndroidPhone", "black", "13/04/2019"))
    // 数组参数 "/products/?category=Phones&category=Tablets"
    webClient.get()
      .uri(uriBuilder - > uriBuilder
        .path("/products/")
        .queryParam("category", "Phones", "Tablets")
        .build())
    // 数组参数 "/products/?category=Phones,Tablets"
    webClient.get()
      .uri(uriBuilder - > uriBuilder
        .path("/products/")
        .queryParam("category", String.join(",", "Phones", "Tablets"))
        .build())    
        
    // 3.内容
    RequestHeadersSpec<?> headersSpec = bodySpec.bodyValue("data");
    RequestHeadersSpec<?> headersSpec = bodySpec.body(
      Mono.just(new Foo("name")), Foo.class);
    RequestHeadersSpec<?> headersSpec = bodySpec.body(
      BodyInserters.fromValue("data"));
    LinkedMultiValueMap map = new LinkedMultiValueMap();
    map.add("key1", "value1");
    map.add("key2", "value2");
    RequestHeadersSpec<?> headersSpec = bodySpec.body(
      BodyInserters.fromMultipartData(map));
    // 4.header标头
    ResponseSpec responseSpec = headersSpec.header(
        HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
      .accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML)
      .acceptCharset(StandardCharsets.UTF_8)
      .ifNoneMatch("*")
      .ifModifiedSince(ZonedDateTime.now())
      .retrieve();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    获取响应

    发送请求并接收响应。我们可以通过使用exchangeToMono/exchangeToFlux或retrieve方法来实现。

    // ExchangeToMono和ExchangeToFlux方法允许访问ClientResponse及其状态和标头
    Mono<String> response = headersSpec.exchangeToMono(response -> {
      if (response.statusCode().equals(HttpStatus.OK)) {
          return response.bodyToMono(String.class);
      } else if (response.statusCode().is4xxClientError()) {
          return Mono.just("Error response");
      } else {
          return response.createException()
            .flatMap(Mono::error);
      }
    });
    // 而retrieve方法是直接获取body
    Mono<String> response = headersSpec.retrieve()
      .bodyToMono(String.class);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    需要注意的是 ResponseSpec.bodyToMono 方法,如果状态码为4xx(客户端错误)或5xx(服务器错误),它将抛出一个 WebClientException。

    单个资源

    Mono<Employee> employeeMono = client.get()
      .uri("/employees/{id}", "1")
      .retrieve()
      .bodyToMono(Employee.class);
    
    employeeMono.subscribe(System.out::println);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    多个资源

    Flux<Employee> employeeFlux = client.get()
      .uri("/employees")
      .retrieve()
      .bodyToFlux(Employee.class);
            
    employeeFlux.subscribe(System.out::println);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    高级

    过滤器

    过滤器可以拦截、检查和修改客户端请求(或响应)。过滤器非常适合为每个请求添加功能,因为逻辑保留在一个位置。用例包括监视、修改、记录和验证客户端请求。

    一个请求具有一个有序链,包含零个或多个过滤器。

    在Spring Reactive中,过滤器是 ExchangeFilterFunction 的实例。过滤器函数有两个参数:要修改的 ClientRequest 和下一个 ExchangeFilterFunction。

    通常,过滤器函数通过调用过滤器链中的下一个函数来返回:

    ExchangeFilterFunction filterFunction = (clientRequest, nextFilter) -> {
        LOG.info("WebClient fitler executed");
        return nextFilter.exchange(clientRequest);
    };
    
    // 添加过滤器
    WebClient webClient = WebClient.builder()
      .filter(filterFunction)
      .build();
    
    WebClient.builder()
                    .filter((request, next) -> {  //过滤器,3次重试,header打印
                        log.info(String.format("请求地址: %s", request.url()));
                        log.info(String.format("请求头信息: %s", request.headers()));
                        Mono<ClientResponse> exchange = next.exchange(request).retry(3);
                        ClientResponse clientResponse = exchange.block();
                        log.info(String.format("响应头信息: %s", clientResponse.headers().asHttpHeaders()));
                        return exchange;
                    })
                    .clientConnector(connector).build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    自定义过滤器

    让我们从一个对客户端发送的 HTTP GET 请求进行计数的过滤器开始。

    过滤器检查请求方法并在 GET 请求的情况下增加“全局”计数器:

    ExchangeFilterFunction countingFunction = (clientRequest, nextFilter) -> {
        HttpMethod httpMethod = clientRequest.method();
        if (httpMethod == HttpMethod.GET) {
            getCounter.incrementAndGet();
        }
        return nextFilter.exchange(clientRequest);
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    我们将定义的第二个过滤器将版本号附加到请求 URL 路径。我们利用ClientRequest.from()方法从当前请求对象创建一个新的请求对象并设置修改后的 URL。

    随后,我们继续使用新修改的请求对象执行过滤器链:

    ExchangeFilterFunction urlModifyingFilter = (clientRequest, nextFilter) -> {
        String oldUrl = clientRequest.url().toString();
        URI newUrl = URI.create(oldUrl + "/" + version);
        ClientRequest filteredRequest = ClientRequest.from(clientRequest)
          .url(newUrl)
          .build();
        return nextFilter.exchange(filteredRequest);
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    自定义线程池

    WebClient 默认使用 Project Reactor 提供的线程池来执行异步操作。但是,你可以根据应用程序的需求进行自定义线程池配置。以下是一些相关原理和最佳实践:

    • 调度器(Schedulers):WebClient 使用调度器来管理线程,例如 Schedulers.elastic() 用于 CPU 密集型操作,Schedulers.parallel() 用于 I/O 密集型操作。你可以通过 publishOnsubscribeOn 方法来切换调度器。
    webClient
        .get()
        .uri("/todos/1")
        .retrieve()
        .bodyToMono(Todo.class)
        .subscribeOn(Schedulers.elastic()) // 切换订阅线程
        .publishOn(Schedulers.parallel()) // 切换发布线程
        .subscribe(result -> {
            // 处理响应
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 自定义线程池:如果需要更精细的线程控制,你可以创建自定义的线程池,并在调度器中使用它。这对于控制并发度和资源管理非常有用。
    ExecutorService customExecutorService = Executors.newFixedThreadPool(10);
    
    webClient
        .get()
        .uri("/todos/1")
        .retrieve()
        .bodyToMono(Todo.class)
        .subscribeOn(Schedulers.fromExecutor(customExecutorService))
        .subscribe(result -> {
            // 处理响应
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    自定义WebClient连接池

    WebClient 使用 Reactor Netty 作为底层的 HTTP 客户端,它管理着连接池。连接池是一组可重用的连接,以提高性能和资源利用率。以下是有关连接池的原理和最佳实践:

    • 连接池大小:连接池的大小可以通过 WebClient 的配置进行设置。默认情况下,它会根据应用程序的需求动态分配连接。你可以使用 HttpClientmaxConnections 方法来设置最大连接数。
    HttpClient httpClient = HttpClient.create()
            .maxConnections(50); // 设置最大连接数为 50
    
    WebClient webClient = WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .baseUrl("https://jsonplaceholder.typicode.com")
            .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 连接超时:你可以配置连接超时和读写超时,以确保请求不会永远等待。这对于防止应用程序被慢速或不响应的服务挂起非常重要。
    HttpClient httpClient = HttpClient.create()
            .responseTimeout(Duration.ofSeconds(10)) // 设置响应超时时间为 10 秒
            .doOnConnected(connection -> {
                connection.addHandlerLast(new ReadTimeoutHandler(10)); // 设置读取超时
                connection.addHandlerLast(new WriteTimeoutHandler(10)); // 设置写入超时
            });
    
    WebClient webClient = WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .baseUrl("https://jsonplaceholder.typicode.com")
            .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 默认情况下,WebClient 会自动管理连接池,无需手动配置。
    • 通常情况下,你不需要担心连接池的具体细节,因为它会在后台自动处理连接的创建、重用和关闭。
    • 如果你需要更精细的控制,可以考虑配置连接池的大小和超时设置。

    开启日志

    在开发和调试期间,启用详细的 WebClient 日志记录可以帮助你识别问题。你可以使用 Spring Boot 的日志配置来启用 WebClient 的日志记录。在 application.propertiesapplication.yml 中添加以下配置:

    logging.level.org.springframework.web.reactive.function.client=DEBUG
    logging.level.reactor.netty.http.client=DEBUG
    logging.level.reactor.netty.tcp.client=DEBUG
    
    • 1
    • 2
    • 3

    这将为 WebClient 的 HTTP 请求和响应生成详细的日志信息,包括请求头、响应头和响应体。

    WebClient 提供了一种简单的方法来记录请求和响应的日志,以便进行调试和监控。你可以通过添加过滤器来实现日志记录。以下是一个示例:

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
    import org.springframework.web.reactive.function.client.WebClient;
    
    @Configuration
    public class WebClientConfig {
    
        @Bean
        public WebClient.Builder webClientBuilder() {
            return WebClient.builder()
                    .filter(logRequest())
                    .filter(logResponse());
        }
    
        private ExchangeFilterFunction logRequest() {
            return ExchangeFilterFunction.ofRequestProcessor(request -> {
                // 记录请求日志
                System.out.println("Request: " + request.method() + " " + request.url());
                return Mono.just(request);
            });
        }
    
        private ExchangeFilterFunction logResponse() {
            return ExchangeFilterFunction.ofResponseProcessor(response -> {
                // 记录响应日志
                System.out.println("Response: " + response.statusCode());
                return Mono.just(response);
            });
        }
    
        // ... 省略其他配置
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在上面的示例中,我们定义了两个过滤器 logRequestlogResponse,分别用于记录请求和响应的日志。你可以根据需要将日志输出到日志文件或其他监控工具。

    错误处理

    WebClient 提供了多种处理错误的方式。在上面的示例中,我们使用了 onStatus 方法来处理特定的 HTTP 状态码。这里有一些更多的错误处理策略和最佳实践:

    • onStatus 方法onStatus 方法允许你根据 HTTP 响应的状态码来处理错误。你可以根据需要定义不同的处理逻辑,例如重试、返回默认值或引发自定义异常。
    • onErrorResume 方法:使用 onErrorResume 方法,你可以在出现错误时返回一个备用的 Mono。这可以用于从缓存中获取数据或返回默认值。
    • 全局错误处理器:你可以注册一个全局错误处理器来处理 WebClient 的全局错误,例如连接失败、超时等。通过 ExchangeStrategies 可以定义一个全局错误处理器。
    @Bean
    public WebClient.Builder webClientBuilder() {
        return WebClient.builder()
                .exchangeStrategies(ExchangeStrategies.builder()
                    .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024))
                    .build())
                .clientConnector(new ReactorClientHttpConnector(HttpClient.newConnection()
                    .compress(true)
                    .resolver(DefaultAddressResolverGroup.INSTANCE)))
                .baseUrl("https://jsonplaceholder.typicode.com")
                .filter((request, next) -> next.exchange(request)
                    .doOnError(throwable -> {
                        // 全局错误处理逻辑
                    }));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    WebClient 提供了丰富的错误处理机制,可以通过 onStatus 和其他方法来捕获和处理不同类型的错误。例如,onStatus 方法用于根据 HTTP 响应的状态码来处理错误。你可以在 onStatus 方法中返回一个 Mono.error,将错误包装成异常并传播。

    最佳实践
    • 使用 onStatus 处理不同的 HTTP 状态码。这可以让你根据状态码执行不同的错误处理逻辑。
    • 使用 onErrorResume 来提供备用值或执行备用操作,以确保即使出现错误,也能返回有意义的响应。
    • 使用 onErrorReturn 来在发生错误时返回一个默认值。
    .retrieve()
    .onStatus(HttpStatus::is4xxClientError, response -> {
        // 处理 4xx 错误,或返回备用值
        return Mono.error(new CustomException("Client error: " + response.statusCode()));
    })
    .onStatus(HttpStatus::is5xxServerError, response -> {
        // 处理 5xx 错误,或返回备用值
        return Mono.error(new CustomException("Server error: " + response.statusCode()));
    })
    .bodyToMono(Todo.class)    
    .onErrorResume(CustomClientException.class, ex -> {
         // 处理自定义客户端异常
         return Mono.just(createDefaultTodo()); // 返回一个默认值
    })
    .onErrorResume(CustomServerException.class, ex -> {
         // 处理自定义服务器异常
         return Mono.error(new CustomFallbackException("Fallback error: " + ex.getMessage()));
    }) 
    .onErrorResume(Exception.class, error -> {
        // 处理其他类型的异常,或返回备用值
        return Mono.just(new DefaultResponse());
    })
    .onErrorReturn(CustomClientException.class, createDefaultTodo()) // 在客户端错误时返回默认值
    .onErrorReturn(CustomServerException.class, createDefaultTodo()) // 在服务器错误时返回默认值    
    .doOnError(error -> {
                        // 在发生错误时执行的操作
                        System.err.println("Error occurred: " + error.getMessage());
                    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    示例

    异步请求

    client
      .get()
      .uri(URLConstants.URL)
      .header(URLConstants.API_KEY_NAME, URLConstants.API_KEY_VALUE)
      .retrieve()
      .bodyToMono(String.class)
      .subscribe(result->System.out.println(result));
    
    
    // 创建 WebClient 实例
            WebClient webClient = webClientBuilder().baseUrl("https://jsonplaceholder.typicode.com").build();
    
            // 执行 GET 请求
            Mono<ResponseEntity<String>> responseMono = webClient.get()
                    .uri("/posts/1")
                    .retrieve()
                    .toEntity(String.class);
    
            // 订阅响应并处理结果
            responseMono.subscribe(responseEntity -> {
                if (responseEntity.getStatusCode().is2xxSuccessful()) {
                    System.out.println("Response Body: " + responseEntity.getBody());
                } else {
                    System.err.println("Request failed with status code: " + responseEntity.getStatusCode());
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    以非阻塞的方式订阅subscribe(),该方法返回Mono的包装器。

    同步请求

    虽然Spring WebClient是异步的,但是我们仍然可以通过调用阻塞线程直到执行结束的方法block()来进行同步调用。方法执行后我们得到结果。

     String block = webClient.get().uri("https://jsonplaceholder.typicode.com/t3odos/1")
                    .retrieve()
                    .onStatus(HttpStatus::is4xxClientError, response -> {
                        // 处理 4xx 错误,或返回备用值
                        return Mono.error(new RuntimeException("Client error: " + response.statusCode()));
                    })
                    .onStatus(HttpStatus::is5xxServerError, response -> {
                        // 处理 5xx 错误,或返回备用值
                        return Mono.error(new RuntimeException("Server error: " + response.statusCode()));
                    })
                    .bodyToMono(String.class)
                    .onErrorResume(RuntimeException.class, ex -> {
                        // 处理自定义异常 // 返回一个默认值
                        return Mono.just("createDefaultTodo()");
                    })
                    .doOnError(error -> {
                        // 在发生错误时执行的操作
                        System.out.println("Error occurred: " + error.getMessage());
                    })
                    .block(Duration.ofSeconds(10));
            System.out.println(block);
    
    
    
    
    String result = client
                .post()
                .uri("https://reqbin.com/echo/post/json")
                .body(BodyInserters.fromValue(prepareRequest()))
                .exchange()
                .flatMap(response -> response.bodyToMono(String.class))
                .block();
        System.out.println("result::" + result);
    
    private String prepareRequest() {
        var values = new HashMap<String, String>() {
          {
            put("Id", "12345");
            put("Customer", "Roger Moose");
            put("Quantity", "3");
            put("Price", "167.35");
          }
        };
    
        var objectMapper = new ObjectMapper();
        String requestBody;
        try {
          requestBody = objectMapper.writeValueAsString(values);
        } catch (JsonProcessingException e) {
          e.printStackTrace();
          return null;
        }
        return requestBody;
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    创建了一个 JSON 字符串prepareRequest(),然后将该字符串作为请求正文发送到 HTTPPOST方法中。

    exchange()与之前使用的retrieve()方法相比,该方法通过提供对来自 HTTP 客户端的响应的访问来提供更多控制。

    上传文件

    Mono<HttpStatus> httpStatusMono = webClient.post()
        .uri(url)
        .contentType(MediaType.APPLICATION_PDF)
        .body(BodyInserters.fromResource(resource))
        .exchangeToMono(response -> {
            if (response.statusCode().equals(HttpStatus.OK)) {
                return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());
            } else {
                throw new ServiceException("Error uploading file");
            }
         });
    // 从多部分资源上传文件
    MultipartBodyBuilder builder = new MultipartBodyBuilder();
    builder.part("file", multipartFile.getResource());
    
    Mono<HttpStatus> httpStatusMono = webClient.post()
        .uri(url)
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(builder.build()))
        .exchangeToMono(response -> {
            if (response.statusCode().equals(HttpStatus.OK)) {
                return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());
            } else {
                throw new ServiceException("Error uploading file");
            }
          });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    重试

    // 使用retry方法
    public Mono<String> getData(String stockId) {
        return webClient.get()
            .uri(PATH_BY_ID, stockId)
            .retrieve()
            .bodyToMono(String.class)
            .retry(3); // 无论 Web 客户端返回什么错误,这都会重试最多 3 次。
    }
    // 使用retryWhen方法的可配置策略
    public Mono<String> getData(String stockId) {
        return webClient.get()
            .uri(PATH_BY_ID, stockId)
            .retrieve()
            .bodyToMono(String.class)
            .retryWhen(Retry.max(3));
    }
    // 固定延迟重试
    public Mono<String> getData(String stockId) {
        return webClient.get()
          .uri(PATH_BY_ID, stockId)
          .retrieve()
          .bodyToMono(String.class)
          // 尝试之间有两秒的延迟,这可能会增加成功的机会
          .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
    }
    // 不是按固定间隔重试
    public Mono<String> getData(String stockId) {
        return webClient.get()
          .uri(PATH_BY_ID, stockId)
          .retrieve()
          .bodyToMono(String.class)
          // 会逐渐增加尝试之间的延迟- 大约为 2 秒、4 秒,然后是 8 秒
          .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
    }
    // 抖动重试 为计算的延迟间隔增加了随机性
    public Mono<String> getData(String stockId) {
        return webClient.get()
          .uri(PATH_BY_ID, stockId)
          .accept(MediaType.APPLICATION_JSON)
          .retrieve()
          .bodyToMono(String.class)
          .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    过滤错误

    服务中的任何错误都将导致重试尝试,包括 4xx 错误,例如400:Bad Request或401:Unauthorized。

    显然,我们不应该重试此类客户端错误,因为服务器响应不会有任何不同。因此,让我们看看如何仅在出现特定错误的情况下应用重试策略

    public Mono<String> getData(String stockId) {
        return webClient.get()
          .uri(PATH_BY_ID, stockId)
          .retrieve()
          // 当是5xx 错误的异常,返回我们自定义的异常
          .onStatus(HttpStatus::is5xxServerError, 
              response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
          .bodyToMono(String.class)
          .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
                     // 仅在抛出ServiceException时重试
              .filter(throwable -> throwable instanceof ServiceException));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    所有重试尝试均不成功的时候。在这种情况下,该策略的默认行为是传播 RetryExhaustedException ,包装最后一个错误。

    public Mono<String> getData(String stockId) {
        return webClient.get()
          .uri(PATH_BY_ID, stockId)
          .retrieve()
          .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
          .bodyToMono(String.class)
          .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
              .filter(throwable -> throwable instanceof ServiceException)
              // 一系列失败的重试结束后,请求将失败并出现 ServiceException       
              .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
                  throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value());
              }));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    错误处理

    使用onStatus

    onStatus是一种内置机制,可用于处理WebClient响应。这使我们能够根据特定响应(例如 400、500、503 等)或状态类别(例如 4XX 和 5XX 等)应用细粒度的功能:

    WebClient
      .builder()
      .build()
      .post()
      .uri("/some-resource")
      .retrieve()
      .onStatus(
        HttpStatus.INTERNAL_SERVER_ERROR::equals,
        response -> response.bodyToMono(String.class).map(Exception::new))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    onStatus方法需要两个参数。第一个是接收状态代码的谓词。第二个参数的执行基于第一个参数的输出。第二个是将响应映射到Mono或异常的函数。

    示例为,如果我们看到 INTERNAL_SERVER_ERROR (即 500),我们将使用bodyToMono获取主体,然后将其映射到新的Exception。

    我们可以链接onStatus调用,以便能够为不同的状态条件提供功能:

    Mono<String> response = WebClient
      .builder()
      .build()
      .post()
      .uri("some-resource")
      .retrieve()
      .onStatus( 
        HttpStatus.INTERNAL_SERVER_ERROR::equals,
        response -> response.bodyToMono(String.class).map(CustomServerErrorException::new)) 
      .onStatus(
        HttpStatus.BAD_REQUEST::equals,
        response -> response.bodyToMono(String.class).map(CustomBadRequestException::new))
      ... 
      .bodyToMono(String.class);
    
    // do something with response
    
    
    webClient.get()
                .uri("/todos/{id}", id)
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, response -> {
                    // 处理 4xx 错误
                    return Mono.error(new CustomException("Client error: " + response.statusCode()));
                })
                .onStatus(HttpStatus::is5xxServerError, response -> {
                    // 处理 5xx 错误
                    return Mono.error(new CustomException("Server error: " + response.statusCode()));
                })
                .bodyToMono(Todo.class);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    现在onStatus调用映射到我们的自定义异常。我们为这两种错误状态分别定义了异常类型。onStatus方法允许我们使用我们选择的任何类型。

    使用ExchangeFilterFunction

    ExchangeFilterFunction是处理特定状态代码和获取响应正文的另一种方法 。与onStatus不同,交换过滤器非常灵活,适用于基于任何布尔表达式的过滤器功能。

    我们可以受益于ExchangeFilterFunction的灵活性,涵盖与onStatus函数相同的类别。

    处理返回的逻辑:

    private static Mono<ClientResponse> exchangeFilterResponseProcessor(ClientResponse response) {
        HttpStatus status = response.statusCode();
        if (HttpStatus.INTERNAL_SERVER_ERROR.equals(status)) {
            return response.bodyToMono(String.class)
              .flatMap(body -> Mono.error(new CustomServerErrorException(body)));
        }
        if (HttpStatus.BAD_REQUEST.equals(status)) {
            return response.bodyToMono(String.class)
              .flatMap(body -> Mono.error(new CustomBadRequestException(body)));
        }
        return Mono.just(response);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    接下来,我们将定义过滤器并使用对处理程序的方法引用:

    ExchangeFilterFunction errorResponseFilter = ExchangeFilterFunction
      .ofResponseProcessor(WebClientStatusCodeHandler::exchangeFilterResponseProcessor);
    
    • 1
    • 2

    与onStatus调用类似,我们在错误时映射到异常类型。但是,使用Mono.error会将此异常包装ReactiveException 中。处理错误时应牢记这种嵌套。

    现在我们将其应用于WebClient的实例,以达到与onStatus链式调用相同的效果:

    Mono<String> response = WebClient
      .builder()
      .filter(errorResponseFilter)
      .build()
      .post()
      .uri("some-resource")
      .retrieve()
      .bodyToMono(String.class);
    
    // do something with response
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    参考

    • https://reflectoring.io/comparison-of-java-http-clients/
    • https://reflectoring.io/spring-webclient/
    • https://docs.flydean.com/spring-framework-documentation5/webreactive/2.webclient
    • https://blog.hanqunfeng.com/2020/04/18/http-utils/#WebClientUtil
  • 相关阅读:
    NodeJs模块化
    阿里云访问资源:NoSuchKey
    记录:2022-9-8 编辑距离 内核级线程的切换 分区 分段
    yum命令中的gcc是什么含义?答:是一种包,并不是参数。
    PFSK165 3BSE027778R1 VP74201-933CW07 允许用户对数据进行读写操作
    前端使用 Konva 实现可视化设计器(2)
    深入学习JVM(Java虚拟机)
    C++11 decltype 的简单使用
    【OnlyOffice】 桌面应用编辑器,版本8.0已发布,PDF表单、RTL支持、Moodle集成、本地界面主题
    Spring Cloud微服务治理框架深度解析
  • 原文地址:https://blog.csdn.net/abu935009066/article/details/132851517