• httpclient连接泄漏实战-_-


    用法

    常用构造方法

    private HttpClient newHttpClient() {
        Registry socketFactoryRegistry = RegistryBuilder
                .create()
                .register("http", PlainConnectionSocketFactory.INSTANCE)
                .register("https", Objects.nonNull(this.connectionSocketFactory) ? this.connectionSocketFactory : SSLConnectionSocketFactory.getSocketFactory()).build();
    
        //设置连接池大小
        PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
        // 全局最大连接数
        connManager.setMaxTotal(this.maxConnPerTotal);
      	// 每个路由的最大连接数
        connManager.setDefaultMaxPerRoute(this.maxConnPerRoute);
    
        return HttpClients.custom().setKeepAliveStrategy(new DefaultDurationConnectionKeepAliveStrategy())
                .setConnectionManager(connManager).build();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    请求配置

    protected RequestConfig buildRequestConfig() {
        return RequestConfig.custom()
                .setConnectTimeout(this.connectTimeout)
                .setSocketTimeout(this.socketTimeout)
                .setCookieSpec(CookieSpecs.STANDARD)
                .build();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    响应处理

    注意:EntityUtils.toString处理会关闭并释连接

    private final ResponseHandler responseHandler = response -> {
        int status = response.getStatusLine().getStatusCode();
        HttpEntity entity;
        if (status >= HttpStatus.SC_OK && status < HttpStatus.SC_MULTIPLE_CHOICES) {
            entity = response.getEntity();
            return entity != null ? EntityUtils.toString(entity) : null;
        } else {
            String errorMsg = null;
            if ((entity = response.getEntity()) != null) {
                errorMsg = EntityUtils.toString(entity);
            }
            throw new HttpResponseException(status, errorMsg);
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    模型

    PoolEntry

    包含连接池中的一个连接与它的路由

    1. route:路由,例如:org.apache.http.conn.routing.HttpRoute
    2. conn:连接池中的一个连接
    3. state:状态信息,可以在释放连接时指定状态信息,在租用时也可以指定状态信息,可以借助此字段来定制化连接复用策略
    4. created:创建时间戳
    5. updated:更新时间戳
    6. validityDeadline:PoolEntry的有效期截止日期时间戳
    7. expiry:有消息,同validityDeadline,区别在于该字段允许修改,validityDeadline不允许修改

    RouteSpecificPool

    每个路由的连接池,是连接池CPool的子集
    属性

    1. available: 是一个链表,可复用的PoolEntry(包含连接池中的一个连接与它的路由),在leased租借释放时从leased转移而来
    2. leased:已租借集合,当RouteSpecificPool已分配数量小于maxPerRoute并且小于maxTotal,创建新连接,或者在available存在可复用连接时转移而来
    3. pending:当RouteSpecificPool已分配数量大于等于maxPerRoute并且大于等于maxTotal,将请求PoolEntry的future缓存至pending并等待有空闲资源时被唤醒。如果有超时时间,超时未拿到资源将抛出超时异常:throw new TimeoutException(“Timeout waiting for connection”)。注意:如果未设置超时时间,将永久等待

    方法

    1. getFree:返回空闲资源,不为空则将LeaseRequest设置为已完成(即isDone=true)。并且将CPool的该资源由available转移至leased

    CPool

    连接池
    属性

    1. maxTotal:全局最大可用资源数量
    2. available: RouteSpecificPool.available是其子集
    3. leased:RouteSpecificPool.leased是其子集
    4. pending:RouteSpecificPool.pending是其子集
    5. leasingRequests:租赁中的请求链表,当LeaseRequest请求未完成,并且processPendingRequest未完成时,将LeaseRequest添加至该链表
    6. completedRequests:已完成请求队列,当LeaseRequest请求完成时,将LeaseRequest添加至该队列
    7. totalUsed=maxTotal-pending-leased

    方法

    1. fireCallbacks:处理completedRequests队列
    2. release:释放已租借资源

    HttpResponseProxy

    响应体代理对象,该对象会对普通响应体进行增强,增强后的响应体Entity为代理:ResponseEntityProxy,主要增强功能如下

    1. 实现了EofSensorWatcher接口,可以监听InputStream数据流,在关闭/中断/检测到EOF时同时释放连接
    2. 继承了HttpEntityWrapper类,重写了getContent方法,将原InputStream封装为EofSensorInputStream
        public HttpResponseProxy(final HttpResponse original, final ConnectionHolder connHolder) {
            this.original = original;
            this.connHolder = connHolder;
          	// 增强响应体
            ResponseEntityProxy.enchance(original, connHolder);
        }
        public static void enchance(final HttpResponse response, final ConnectionHolder connHolder) {
            final HttpEntity entity = response.getEntity();
            if (entity != null && entity.isStreaming() && connHolder != null) {
                response.setEntity(new ResponseEntityProxy(entity, connHolder));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    流程

    连接池-获取连接

    为什么是totalAvailable > freeCapacity - 1才释放available连接?
    因为逻辑走到此处,紧接着会创建一个新的连接,也就是说freeCapacity的数量会减少1。如果不减1的话则会出现连接数溢出1的场景(每个路由会溢出1,路由越多溢出越多)
    连接池.drawio.png

    请求服务端

    1. 如果开启了staleConnectionCheckEnabled校验,则检查连接是否已stale,是则关闭
    2. 如果HttpExecutionAware不为空则回调:org.apache.http.client.methods.HttpExecutionAware#setCancellable
    3. 如果HttpExecutionAware不为空,检查是否已aborted连接,是则抛出异常:throw new RequestAbortedException(“Request aborted”);
    4. 如果连接未打开,建立目标路由:org.apache.http.impl.execchain.MainClientExec#establishRoute
    5. 设置socketTimeout超时时间
    6. 再次检查是否已aborted连接
    7. 处理认证:org.apache.http.impl.auth.HttpAuthenticator#generateAuthResponse
    8. 执行请求:org.apache.http.protocol.HttpRequestExecutor
      1. 捕获异常:IOException/HttpException/RuntimeException,存在则关闭连接
    9. 执行连接复用策略:org.apache.http.ConnectionReuseStrategy#keepAlive
    10. 如果需要认证:org.apache.http.impl.execchain.MainClientExec#needAuthentication
    11. 如果可以复用,则消费响应体后关闭流
    12. 否则:关闭连接
    13. 处理userToken
    14. 如果响应体HttpEntity为null,或者响应体不是流式响应体org.apache.http.HttpEntity#isStreaming
    15. 释放连接,并返回响应体代理(不包含连接connHolder):org.apache.http.impl.execchain.HttpResponseProxy#HttpResponseProxy
    16. 否则返回响应体代理(包含连接connHolder):org.apache.http.impl.execchain.HttpResponseProxy#HttpResponseProxy
    17. 捕获异常
    18. HttpException/IOException/RuntimeException/Error:释放连接
    19. ConnectionShutdownException:不需要释放连接

    释放连接

    httpclient释放连接场景的总结

    1. 执行请求发生以下异常时,会释放连接
      1. 执行请求期间:IOException/HttpException/RuntimeException
      2. 请求+响应整个过程期间:HttpException/IOException/RuntimeException/Error
    2. 请求完成后,响应体HttpEntity为空时,会释放连接
    3. 请求完成后,响应体不为空但HttpEntity不是流式时,会释放连接
    4. 请求完成后消费完HttpEntity时,会释放连接,例如:EntityUtils.toString(entity)

    注意:请求完成后,响应体不为空,且HttpEntity为流式时,不会自动释放连接,需要客户端自行释放连接

    生产连接泄漏案例

    tomcat:tomcat-embed-core-8.5.72.jar

    客户端代码

    长轮询线程

    new KafkaThread("KM-LONG-POLLING", new LongPollingTask(), true).start();
    class LongPollingTask implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    int httpResponseCode = RestClient.longPollingStatus(KM_CONFIG_LISTENER_URL);
                    if (HttpStatus.SC_CREATED == httpResponseCode) {
                        CONFIG.topicsAndClusterInfo = loadFromKM();
                        CONFIG.version.getAndIncrement();
                        log.info("KM Config Has Change:{} ", CONFIG.topicsAndClusterInfo);
                    }
                    log.debug("KM long polling .................");
                } catch (Exception e) {
                    log.error("KM long polling failed , ", e);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    请求服务端获取数据

        public static int longPollingStatus(String uri) {
    
            try {
                HttpResponse httpResponse = LONG_REST_CLIENT.get(uri, HttpResponse.class);
                return httpResponse.getStatusLine().getStatusCode();
            } catch (Exception e) {
            }
    
            return HttpStatus.SC_BAD_GATEWAY;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    HttpClient工具类

    1. socketTimeout:60S
    2. 连接池大小maxPerRoute:2
        public  T get(String uri, Class clazz, Map params, Map headers) throws RestClientException {
            String aUri = buildUrl(uri, params);
            log.debug("Start GET uri[{}] with params[{}] header[{}]", uri, params, headers);
            String body = null;
            try {
                HttpGet httpGet = buildHttpGet(aUri, headers);
                log.debug("Done GET uri[{}] with params[{}] header[{}], and response is {}", uri, params, headers, body);
                if (clazz == HttpResponse.class) {
                    return (T) httpClient.execute(httpGet);
                }
                body = executeRequest(httpGet);
                if (clazz == String.class) {
                    return (T) body;
                }
                if (body != null) {
                    return JacksonUtil.from(body, clazz);
                }
            } catch (Exception e) {
                log.warn("Occur error when GET uri[{}] with params[{}] headers[{}], response is {}, error: {}", uri, params, headers, body, e);
                throw new RestClientException(e.getMessage(), e);
            }
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    服务端代码

    controller

        @RequestMapping("/listener")
        public void fetchTopicAndClusterInfo(@RequestParam String appName,
                                             HttpServletRequest request,
                                             HttpServletResponse response) {
            topicAndClusterInfoManager.addAsyncRequest(renameAppName(appName), request, response);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    异步请求

        public void addAsyncRequest(String appName, HttpServletRequest request, HttpServletResponse response) {
            AsyncContext asyncContext = request.startAsync(request, response);
            AsyncTask asyncTask = new AsyncTask(asyncContext, true);
            APPID_CONTEXT.put(appName, asyncTask);
    
            // 30s 后写入 304 响应
            TIMEOUT_CHECKER.schedule(() -> {
                if (asyncTask.isTimeout()) {
                    try {
                        APPID_CONTEXT.remove(appName, asyncTask);
                        response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                        asyncContext.complete();
                    } catch (Exception e) {
    
                    }
                }
            }, 30, TimeUnit.SECONDS);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    案例为什么会发生连接泄漏?

    案例并非一定会泄漏,生产9台服务,有3台出现了泄漏
    案发现场

    "KM-LONG-POLLING" #90 daemon prio=5 os_prio=0 cpu=7591.61ms elapsed=3429768.95s tid=0x00007f52e0a86d30 nid=0x11f waiting on condition  [0x00007f52ce2c8000]
       java.lang.Thread.State: WAITING (parking)
    	at jdk.internal.misc.Unsafe.park(java.base@11.0.10/Native Method)
    	- parking to wait for  <0x00001000bead9b18> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.10/LockSupport.java:194)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.10/AbstractQueuedSynchronizer.java:2081)
    	at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:379)
    	at org.apache.http.pool.AbstractConnPool.access$200(AbstractConnPool.java:69)
    	at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:245)
    	- locked <0x00001000de891e20> (a org.apache.http.pool.AbstractConnPool$2)
    	at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:193)
    	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:304)
    	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:280)
    	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
    	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
    	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    	at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
    	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
    	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
    	at ....framework.net.RestClient.get(RestClient.java:315)
    	at ....framework.net.RestClient.get(RestClient.java:161)
    	at ....framework.net.RestClient.get(RestClient.java:154)
    	at ....kafka.core.util.RestClient.longPollingStatus(RestClient.java:31)
    	at ....kafka.core.metadata.KafkaMetaDataManager$LongPollingTask.run(KafkaMetaDataManager.java:171)
    	at java.lang.Thread.run(java.base@11.0.10/Thread.java:829)
    
       Locked ownable synchronizers:
    	- None
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3台机器均处于等待获取连接中,线程为死循环,不应该是waiting状态,并且单线程任务,连接池大小为2,不应该出现等待连接场景。问题基本定位是连接泄漏导致,为什么呢?

    客户端超时?
    1. 客户端超时时间为60S
    2. 服务端定时任务为30S,30S左右一定会响应客户端状态码304
    3. 客户端不会超时,排除
    客户端没有主动关闭连接?
    1. 正常响应没有HttpEntity,会自动关闭连接,排除
    2. 客户端无论是发生超时异常还是正常获取到数据,httpclient会自动关闭连接,排除
    3. 本地尝试复现,并且对请求抓包,看到服务端果然没有正常响应请求,而是响应了500错误码,并且符合不自动释放连接的场景

    问题定位,因为服务端响应5xx错误时会触发连接泄漏。

    为什么服务端会响应5xx错误呢?

    查看服务端异步请求代码,发现了猫腻-_-!代码如下:org.apache.catalina.connector.Request#startAsync(javax.servlet.ServletRequest, javax.servlet.ServletResponse)

        @Override
        public AsyncContext startAsync(ServletRequest request,
                ServletResponse response) {
            if (!isAsyncSupported()) {
                IllegalStateException ise =
                        new IllegalStateException(sm.getString("request.asyncNotSupported"));
                log.warn(sm.getString("coyoteRequest.noAsync",
                        StringUtils.join(getNonAsyncClassNames())), ise);
                throw ise;
            }
    
            if (asyncContext == null) {
                asyncContext = new AsyncContextImpl(this);
            }
    
            asyncContext.setStarted(getContext(), request, response,
                    request==getRequest() && response==getResponse().getResponse());
            asyncContext.setTimeout(getConnector().getAsyncTimeout());
    
            return asyncContext;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    没错,tomcat的异步请求实现是有超时时间,而且超时时间恰好是:30S
    截屏2023-05-24 23.01.40.png

    总结

    连接泄漏的原因小结

    1. 客户端没有主动释放连接
    2. 服务端长轮询定时任务刚好是30S与Tomcat的异步请求超时时间30S吻合
    3. 在数据没有准备好时,服务端定时任务(ScheduledThreadPoolExecutor)30S到时调度任务期间,个别任务还未被调度唤醒时,Tomcat异步请求超时,提前响应了客户端,客户端收到异常请求时没有主动关闭连接,导致连接发生泄漏

    解决方法

    1. 主动释放连接
    2. 客户端超时时间设置低于30S
    3. 超时监控:起一个定时任务,定期检查连接是否已超时,超时则主动释放

    推荐方案3,类似实现可以参考:tomcat-jdbc连接池,其中就有超时驱赶机制

  • 相关阅读:
    【OpenCV 例程200篇】206. Photoshop 色阶调整算法
    【MySQL】第01章_数据库概述
    [模拟赛]2022.07.25
    项目部署Linux步骤
    C++中将类成员函数作为变量传递给函数
    【物理应用】基于Matlab模拟极化雷达回波
    wpf中prism框架切换页面
    紫光同创FPGA实现PCIE测速试验,提供PDS工程和Linux QT上位机源码和技术支持
    深入了解自适应布局与响应式布局的区别
    vueX持久化存储插件
  • 原文地址:https://blog.csdn.net/u010597819/article/details/130905916