private HttpClient newHttpClient() {
Registry socketFactoryRegistry = RegistryBuilder
.create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", Objects.nonNull(this.connectionSocketFactory) ? this.connectionSocketFactory : SSLConnectionSocketFactory.getSocketFactory()).build();
//设置连接池大小
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
// 全局最大连接数
connManager.setMaxTotal(this.maxConnPerTotal);
// 每个路由的最大连接数
connManager.setDefaultMaxPerRoute(this.maxConnPerRoute);
return HttpClients.custom().setKeepAliveStrategy(new DefaultDurationConnectionKeepAliveStrategy())
.setConnectionManager(connManager).build();
}
protected RequestConfig buildRequestConfig() {
return RequestConfig.custom()
.setConnectTimeout(this.connectTimeout)
.setSocketTimeout(this.socketTimeout)
.setCookieSpec(CookieSpecs.STANDARD)
.build();
}
注意:EntityUtils.toString处理会关闭并释连接
private final ResponseHandler responseHandler = response -> {
int status = response.getStatusLine().getStatusCode();
HttpEntity entity;
if (status >= HttpStatus.SC_OK && status < HttpStatus.SC_MULTIPLE_CHOICES) {
entity = response.getEntity();
return entity != null ? EntityUtils.toString(entity) : null;
} else {
String errorMsg = null;
if ((entity = response.getEntity()) != null) {
errorMsg = EntityUtils.toString(entity);
}
throw new HttpResponseException(status, errorMsg);
}
};
包含连接池中的一个连接与它的路由
每个路由的连接池,是连接池CPool的子集
属性
方法
连接池
属性
方法
响应体代理对象,该对象会对普通响应体进行增强,增强后的响应体Entity为代理:ResponseEntityProxy,主要增强功能如下
public HttpResponseProxy(final HttpResponse original, final ConnectionHolder connHolder) {
this.original = original;
this.connHolder = connHolder;
// 增强响应体
ResponseEntityProxy.enchance(original, connHolder);
}
public static void enchance(final HttpResponse response, final ConnectionHolder connHolder) {
final HttpEntity entity = response.getEntity();
if (entity != null && entity.isStreaming() && connHolder != null) {
response.setEntity(new ResponseEntityProxy(entity, connHolder));
}
}
为什么是totalAvailable > freeCapacity - 1才释放available连接?
因为逻辑走到此处,紧接着会创建一个新的连接,也就是说freeCapacity的数量会减少1。如果不减1的话则会出现连接数溢出1的场景(每个路由会溢出1,路由越多溢出越多)
httpclient释放连接场景的总结
注意:请求完成后,响应体不为空,且HttpEntity为流式时,不会自动释放连接,需要客户端自行释放连接
tomcat:tomcat-embed-core-8.5.72.jar
长轮询线程
new KafkaThread("KM-LONG-POLLING", new LongPollingTask(), true).start();
class LongPollingTask implements Runnable {
@Override
public void run() {
while (true) {
try {
int httpResponseCode = RestClient.longPollingStatus(KM_CONFIG_LISTENER_URL);
if (HttpStatus.SC_CREATED == httpResponseCode) {
CONFIG.topicsAndClusterInfo = loadFromKM();
CONFIG.version.getAndIncrement();
log.info("KM Config Has Change:{} ", CONFIG.topicsAndClusterInfo);
}
log.debug("KM long polling .................");
} catch (Exception e) {
log.error("KM long polling failed , ", e);
}
}
}
}
请求服务端获取数据
public static int longPollingStatus(String uri) {
try {
HttpResponse httpResponse = LONG_REST_CLIENT.get(uri, HttpResponse.class);
return httpResponse.getStatusLine().getStatusCode();
} catch (Exception e) {
}
return HttpStatus.SC_BAD_GATEWAY;
}
public T get(String uri, Class clazz, Map params, Map headers) throws RestClientException {
String aUri = buildUrl(uri, params);
log.debug("Start GET uri[{}] with params[{}] header[{}]", uri, params, headers);
String body = null;
try {
HttpGet httpGet = buildHttpGet(aUri, headers);
log.debug("Done GET uri[{}] with params[{}] header[{}], and response is {}", uri, params, headers, body);
if (clazz == HttpResponse.class) {
return (T) httpClient.execute(httpGet);
}
body = executeRequest(httpGet);
if (clazz == String.class) {
return (T) body;
}
if (body != null) {
return JacksonUtil.from(body, clazz);
}
} catch (Exception e) {
log.warn("Occur error when GET uri[{}] with params[{}] headers[{}], response is {}, error: {}", uri, params, headers, body, e);
throw new RestClientException(e.getMessage(), e);
}
return null;
}
controller
@RequestMapping("/listener")
public void fetchTopicAndClusterInfo(@RequestParam String appName,
HttpServletRequest request,
HttpServletResponse response) {
topicAndClusterInfoManager.addAsyncRequest(renameAppName(appName), request, response);
}
异步请求
public void addAsyncRequest(String appName, HttpServletRequest request, HttpServletResponse response) {
AsyncContext asyncContext = request.startAsync(request, response);
AsyncTask asyncTask = new AsyncTask(asyncContext, true);
APPID_CONTEXT.put(appName, asyncTask);
// 30s 后写入 304 响应
TIMEOUT_CHECKER.schedule(() -> {
if (asyncTask.isTimeout()) {
try {
APPID_CONTEXT.remove(appName, asyncTask);
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
asyncContext.complete();
} catch (Exception e) {
}
}
}, 30, TimeUnit.SECONDS);
}
案例并非一定会泄漏,生产9台服务,有3台出现了泄漏
案发现场
"KM-LONG-POLLING" #90 daemon prio=5 os_prio=0 cpu=7591.61ms elapsed=3429768.95s tid=0x00007f52e0a86d30 nid=0x11f waiting on condition [0x00007f52ce2c8000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.10/Native Method)
- parking to wait for <0x00001000bead9b18> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.10/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.10/AbstractQueuedSynchronizer.java:2081)
at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:379)
at org.apache.http.pool.AbstractConnPool.access$200(AbstractConnPool.java:69)
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:245)
- locked <0x00001000de891e20> (a org.apache.http.pool.AbstractConnPool$2)
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:193)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:304)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:280)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at ....framework.net.RestClient.get(RestClient.java:315)
at ....framework.net.RestClient.get(RestClient.java:161)
at ....framework.net.RestClient.get(RestClient.java:154)
at ....kafka.core.util.RestClient.longPollingStatus(RestClient.java:31)
at ....kafka.core.metadata.KafkaMetaDataManager$LongPollingTask.run(KafkaMetaDataManager.java:171)
at java.lang.Thread.run(java.base@11.0.10/Thread.java:829)
Locked ownable synchronizers:
- None
3台机器均处于等待获取连接中,线程为死循环,不应该是waiting状态,并且单线程任务,连接池大小为2,不应该出现等待连接场景。问题基本定位是连接泄漏导致,为什么呢?
问题定位,因为服务端响应5xx错误时会触发连接泄漏。
查看服务端异步请求代码,发现了猫腻-_-!代码如下:org.apache.catalina.connector.Request#startAsync(javax.servlet.ServletRequest, javax.servlet.ServletResponse)
@Override
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
IllegalStateException ise =
new IllegalStateException(sm.getString("request.asyncNotSupported"));
log.warn(sm.getString("coyoteRequest.noAsync",
StringUtils.join(getNonAsyncClassNames())), ise);
throw ise;
}
if (asyncContext == null) {
asyncContext = new AsyncContextImpl(this);
}
asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
asyncContext.setTimeout(getConnector().getAsyncTimeout());
return asyncContext;
}
没错,tomcat的异步请求实现是有超时时间,而且超时时间恰好是:30S
连接泄漏的原因小结
解决方法
推荐方案3,类似实现可以参考:tomcat-jdbc连接池,其中就有超时驱赶机制