同步阻塞模式下,如果服务端接口响应较慢,那会直接影响客户端接口请求的吞吐量,虽然可以通过在应用代码中通过异步线程的方式优化,但是会增加客户端的线程开销。所以考虑用异步模式来解决这个问题
因此测试时,主要是针对线程数设置比较小的情况下,客户端发起请求的吞吐量来进行对比
用spring boot写一个最简单的接口:sleep 1s,然后返回ok
客户端程序引入httpClient依赖:
org.apache.httpcomponents.client5
httpclient5
5.1.3
代码:
import lombok.SneakyThrows;
import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class SyncClientHttpTest {
static final CloseableHttpClient httpclient;
static {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(1000);
connectionManager.setDefaultMaxPerRoute(100);
httpclient = HttpClients.custom().setConnectionManager(connectionManager).build();
}
public static void main(final String[] args) throws Exception {
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicBoolean stop = new AtomicBoolean(false);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (!stop.get()) {
httpGet();
atomicInteger.incrementAndGet();
}
}).start();
}
Thread.sleep(30000);
stop.set(true);
Thread.sleep(1000);
System.out.println(atomicInteger.get());
System.exit(0);
}
@SneakyThrows
private static void httpGet() {
final HttpGet httpget = new HttpGet("http://localhost:8080/test");
// Create a custom response handler
final HttpClientResponseHandler<String> responseHandler = new HttpClientResponseHandler<String>() {
@Override
public String handleResponse(
final ClassicHttpResponse response) throws IOException {
final int status = response.getCode();
if (status >= HttpStatus.SC_SUCCESS && status < HttpStatus.SC_REDIRECTION) {
final HttpEntity entity = response.getEntity();
try {
return entity != null ? EntityUtils.toString(entity) : null;
} catch (final ParseException ex) {
throw new ClientProtocolException(ex);
}
} else {
throw new ClientProtocolException("Unexpected response status: " + status);
}
}
};
final String responseBody = httpclient.execute(httpget, responseHandler);
// System.out.println(responseBody);
if(!responseBody.equals("ok")) {
throw new RuntimeException("error");
}
}
}
}
开启5个线程,循环发起请求30s
打印结果:150
差不多每秒5个请求,符合预期
改为10个线程
打印结果:300
改为100个线程
打印结果:3000
请求吞吐和线程数呈线性增长关系(线程数应小于maxPerRoute)
代码:
import lombok.SneakyThrows;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Example of asynchronous HTTP/1.1 request execution.
*/
public class AsyncClientHttpTest {
static final CloseableHttpAsyncClient client;
static final AtomicInteger atomicInteger = new AtomicInteger(0);
static final AtomicBoolean stop = new AtomicBoolean(false);
static {
PoolingAsyncClientConnectionManager connectionManager = new PoolingAsyncClientConnectionManager();
connectionManager.setMaxTotal(1000);
connectionManager.setDefaultMaxPerRoute(100);
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(Timeout.ofSeconds(5))
.setIoThreadCount(5) //IO线程数
.build();
client = HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.setConnectionManager(connectionManager)
.build();
client.start();
}
public static void main(final String[] args) throws Exception {
new Thread(()->{
while (!stop.get()) {
httpGet();
}
}).start();
Thread.sleep(5000);
stop.set(true);
Thread.sleep(25000);
System.out.println(atomicInteger.get());
// client.close(CloseMode.GRACEFUL);
System.exit(0);
}
@SneakyThrows
private static void httpGet() {
final SimpleHttpRequest request = SimpleRequestBuilder.get()
.setUri("http://localhost:8080//test")
.build();
final Future<SimpleHttpResponse> future = client.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
// System.out.println(request + "->" + new StatusLine(response));
// System.out.println(response.getBody().getBodyText());
if(!response.getBody().getBodyText().equals("ok")) {
throw new RuntimeException("error");
}
atomicInteger.incrementAndGet();
}
@Override
public void failed(final Exception ex) {
System.out.println(request + "->" + ex);
}
@Override
public void cancelled() {
System.out.println(request + " cancelled");
}
});
}
}
ps: 这里代码其实不够严谨,不过测试结果对比已经很悬殊了,不影响最终结论
开启5个IO线程(不设置默认为cpu核数)
客户端1个线程循环发起请求5s,之后再sleep 25s打印结果
打印结果:2700
修改代码:connectionManager.setDefaultMaxPerRoute(100);
->connectionManager.setDefaultMaxPerRoute(200);
调大maxPerRoute为200
打印结果:5400
可以看到异步模式下,每秒的吞吐受maxPerRoute的影响较大(基本持平)
注意如果不手动设置,这个默认值为5,所以如果不进行ConnectionManager设置,异步的测试结果会很差
异步模式下因为使用了多路复用,一个IO线程管理多个连接,所以只需少量线程即可进行大量的远程接口调用