• HttpClient遭遇Connection Reset异常分析(浅析部分源码)


    背景

      前段时间测试反馈页面会偶先错误提示,刷新后就没有了。根据测试反馈的时间点查看kibana日志,发现有Connection reset异常。下面通过分析客户端和服务端相关源码进行分析根本原因,给出解决方案。
    在这里插入图片描述

    源码分析

    HttpClient分析(version:4.5.13)

    1. 测试代码如下
      @Test
      public void shouldAnswerWithTrue() throws Exception {
      
          HttpClient httpClient = HttpClientBuilder.create()
                  .build();
      
          HttpGet httpGet = new HttpGet("http://localhost:8080/demo/test");
      
          HttpResponse httpResponse = httpClient.execute(httpGet);
          System.out.println(EntityUtils.toString(httpResponse.getEntity(), "UTF-8"));
      
          Thread.sleep(2000);
      
          HttpResponse httpResponse1 = httpClient.execute(httpGet);
          System.out.println(EntityUtils.toString(httpResponse1.getEntity(), "UTF-8"));
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    2. 连接池获取连接
      2.1 调用栈截图如下 在这里插入图片描述
      2.2 下面展示MainClientExec#execute相关代码
      public CloseableHttpResponse execute(
              final HttpRoute route,
              final HttpRequestWrapper request,
              final HttpClientContext context,
              final HttpExecutionAware execAware) throws IOException, HttpException {
       		/*
       		 ** 1. 从连接池CPool取出Future
       		 **    ---> AbstractConnPool#lease
       		 ** 2. 返回ConnectionRequest
       		 **     2.1 操作上面future从连接池取连接
       		 */
       		final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
       		/**
       		   ** 1. 从连接池取连接:AbstractConnPool#getPoolEntryBlocking
       		           1.1 根据路由获取连接池:AbstractConnPool#getPool,没有则新建映射关系:routeToPool
       		           1.2 从连接池中获取空闲连接:RouteSpecificPool#getFree
       		           1.3 判断连接是否过期:PoolEntry#isExpired (见下面响应头的keep-alive的timeout值)
       		           1.4 如果没有,则创建该路由的连接:PoolingHttpClientConnectionManager#create,放入连接池中
       		   ** 2. 检查连接是否可用:BHttpConnectionBase#isStale
       		           2.1 检查间隔2s
       		           2.2 连接不是open状态时不需要检查
       		           2.3 尝试从socket读取数据,如果有问题重新执行第一步获取连接
       		HttpClientConnection managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
       		// 如果不是open状态,则建立路由连接:MainClientExec#establishRoute
       		establishRoute(proxyAuthState, managedConn, route, request, context);
       		/**
       		  ** 1. 发送请求:HttpRequestExecutor#doSendRequest
       		  ** 2. 处理响应:HttpRequestExecutor#doReceiveResponse
       		  		2.1 通过socket读取数据:SocketInputStream#read (异常点)
       		  		2.2 注意:如果在服务端连接关闭,此时执行SocketInputStream#read是没有异常
       		  */
       		response = requestExecutor.execute(request, managedConn, context);
       		// 如果请求头的Connection为keep-alive则为true:DefaultClientConnectionReuseStrategy#keepAlive
       		if (reuseStrategy.keepAlive(response, context)) {
       		    // 从响应头获取Keep-Alive,timeout=60s :DefaultConnectionKeepAliveStrategy#getKeepAliveDuration
       			final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
       		}
       		
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39

    tomcat分析(version:9.0.65)

    在这里插入图片描述

    1. 在connector启动时,会开始acceptor线程等待接收请求(下面展示Acceptor#run重要节点逻辑)

      public void run() {
      	while (!stopCalled) {
      	
      		// 如果我们已经到达最大连接,等待(最大连接配置:server.tomcat.max-connections=1)
      		endpoint.countUpOrAwaitConnection();
      		
      		/** 
      		   ** 等待接收请求 NioEndpoint#serverSocketAccept
      		   ** 1. ServerSocketChannelImpl#serverSocketAccept 如果没有客户端请求则会阻塞
      		   */
      		socket = endpoint.serverSocketAccept();
      		/**
      		  ** 将socket传递给适当的处理器
      		  ** 1. 新建一个指定buffer的NioChannel : NioChannel channel = new NioChannel(bufhandler)
      		  ** 2. 将channel和endipoint包装为NioSocketWrapper,设置读写超时时间(默认1分钟)
      		  ** 3. NioEndpoint$Poller#register
      		  		3.1 socket注册SelectionKey.OP_READ事件
      		  **/
      		endpoint.setSocketOptions(socket)
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    2. NioEndpoint$Poller在初始化时开启selector,selector循环获取事件(监听读写事件)

      public void run() {
      	while (true) {
      		// selector获取注册的事件数
      		int keyCount = selector.select(selectorTimeout);
      		Iterator iterator =
                      keyCount > 0 ? selector.selectedKeys().iterator() : null;
               while (iterator != null && iterator.hasNext()) {
               	SelectionKey sk = iterator.next();
               	// 获取绑定在SelectionKey的socket
               	NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
               	
               	/** 
               	  ** 1. AbstractEndpoint#processSocket
               	  **     1.1 新建NioEndpoint$SocketProcessor
               	  **     1.2 将SocketProcessor丢入线程池执行
               	  */
               	processKey(sk, socketWrapper);
               }
      		/**
      		  ** 超时处理
      		  ** 1. 计算当前时间到上次socket读数据的时间的间隔:long delta = now - socketWrapper.getLastRead()
      		  ** 2. socket超时时间:long timeout = socketWrapper.getReadTimeout()
      		  ** 3. 如果delta>timeout则读超时,重置SocketProcessor的event为SocketEvent.ERROR
      		timeout(keyCount,hasEvents);
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    3. SocketProcessor最终会调业务的controller方法
      3.1 下面展示SocketProcessor#doRun部分代码

      protected void doRun() {
      	 if (event == null) {
                  state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
            } else {
            		/**
            		  ** AbstractProtocol#process
            		  ** 1. 如果SocketEvent为SocketEvent.ERROR,则修改SocketState为CLOSED
            		  */
                  state = getHandler().process(socketWrapper, event);
          }
          // 超时,则AbstractEndpoint#countDownConnection连接次数减1,重新等待连接(参考上面acceptor线程)
          if (state == SocketState.CLOSED) {
                poller.cancelledKey(getSelectionKey(), socketWrapper);
            }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15

      3.2 下图展示调用堆栈
      在这里插入图片描述

    问题复现

    1. 启动springboot的web项目,在socket过期关闭的地方打上断点等待执行
      在这里插入图片描述
    2. 在通过socket连接发起http请求地方打上断点(MainClientExec#execute)
      2.1 第一个http请求直接跳过
      2.2 第二个同一个路由的http请求断点停留,等到服务端sokcet过期关闭该socket连接后执行
      在这里插入图片描述
      2.3 此时执行下一步SocketInputStream#socketRead就会捕获到ConnectionResetException异常
      在这里插入图片描述
    3. 最终在RetryExec#execute捕获异常并打印日志
      3.1 在捕获异常后,通过DefaultHttpRequestRetryHandler#retryRequest判断是否需要重试
      (1)默认连续重试三次
      (2)是否包含在不能重试的异常 在这里插入图片描述
      3.2 打印出连接重置的debug日志
      在这里插入图片描述
      3.3 如果超过重试次数会打印如下日志
      在这里插入图片描述

    束语

       虽然本地模拟复现了重置连接的异常,但只是抛出的debug日志,实际遇到异常却是error级别的,所以问题没有完全复现。在网上看到一篇类似的文章《HttpClient遭遇Connection Reset异常,如何正确配置》,对比发现确实自己也没有配空闲连接驱逐器,就死马当作活马医加了一下配置,发到生产查看效果,神奇的是发了之后确实再也没有发现该问题了。

  • 相关阅读:
    数字化转型导师坚鹏:科技金融政策、案例及发展研究
    基于MySQL内核的SQL限流设计与实现|得物技术
    vim配置
    《TCP/IP详解 卷一》第11章 DNS
    2023.10.19 关于设计模式 —— 单例模式
    【python学习第11节:numpy】
    PLC-Recorder离线分析软件Ana里为什么不能显示变量的编号?
    K8S 控制器 service ingress
    13个学习技巧,让你每天进步一点点
    支付功能、支付平台、支持渠道如何测试?
  • 原文地址:https://blog.csdn.net/weixin_40803011/article/details/126723928