• ES 客户端 RestHighLevelClient Connection reset by peer 亲测有效 2022-11-05


    导读#

      最新公司ES集群老出现连接关闭,进而导致查询|写入ES时报错,报错日志显示如下

    复制代码
    [2m2022-10-23 14:13:10.088 - ERROR - [NONE][NONE][NONE][0][1584065372948234240] - [XNIO-1 task-3] - com.mall.search.service.EsRestService:235 : 添加文档失败:{}
    
    java.io.IOException: Connection reset by peer
        at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:964)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:233)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448)
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418)
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388)
        at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)
        at sun.reflect.GeneratedMethodAccessor117.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
        at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:523)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)
        at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:129)
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
        at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
        at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
        at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:108)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
        at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
        at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
        at io.undertow.servlet.handlers.FilterHandler.handleRequest(FilterHandler.java:84)
        at io.undertow.servlet.handlers.security.ServletSecurityRoleHandler.handleRequest(ServletSecurityRoleHandler.java:62)
        at io.undertow.servlet.handlers.ServletChain$1.handleRequest(ServletChain.java:68)
        at io.undertow.servlet.handlers.ServletDispatchingHandler.handleRequest(ServletDispatchingHandler.java:36)
        at io.undertow.servlet.handlers.RedirectDirHandler.handleRequest(RedirectDirHandler.java:68)
        at io.undertow.servlet.handlers.security.SSLInformationAssociationHandler.handleRequest(SSLInformationAssociationHandler.java:132)
        at io.undertow.servlet.handlers.security.ServletAuthenticationCallHandler.handleRequest(ServletAuthenticationCallHandler.java:57)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at io.undertow.security.handlers.AbstractConfidentialityHandler.handleRequest(AbstractConfidentialityHandler.java:46)
        at io.undertow.servlet.handlers.security.ServletConfidentialityConstraintHandler.handleRequest(ServletConfidentialityConstraintHandler.java:64)
        at io.undertow.security.handlers.AuthenticationMechanismsHandler.handleRequest(AuthenticationMechanismsHandler.java:60)
        at io.undertow.servlet.handlers.security.CachedAuthenticatedSessionHandler.handleRequest(CachedAuthenticatedSessionHandler.java:77)
        at io.undertow.security.handlers.AbstractSecurityContextAssociationHandler.handleRequest(AbstractSecurityContextAssociationHandler.java:43)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at io.undertow.servlet.handlers.ServletInitialHandler.handleFirstRequest(ServletInitialHandler.java:269)
        at io.undertow.servlet.handlers.ServletInitialHandler.access$100(ServletInitialHandler.java:78)
        at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:133)
        at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:130)
        at io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:48)
        at io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43)
        at io.undertow.servlet.handlers.ServletInitialHandler.dispatchRequest(ServletInitialHandler.java:249)
        at io.undertow.servlet.handlers.ServletInitialHandler.access$000(ServletInitialHandler.java:78)
        at io.undertow.servlet.handlers.ServletInitialHandler$1.handleRequest(ServletInitialHandler.java:99)
        at io.undertow.server.Connectors.executeRootHandler(Connectors.java:376)
        at io.undertow.server.HttpServerExchange$1.run(HttpServerExchange.java:830)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at org.apache.http.impl.nio.conn.LoggingIOSession$LoggingByteChannel.read(LoggingIOSession.java:204)
        at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231)
        at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136)
        at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241)
        at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
        at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
        ... 1 common frames omitted
    复制代码

    优化&解决#

      因为这个项目是助农app(抖音+淘宝,杂交版app),app首页下拉获取最新推荐视频,需要通过第三方极光拉取推荐视频,存入本地数据库mysql后;一次拉取20条,然后通过线程池异步写入ES,大概架构如下

      初步判定,可能是app随着用户体量上去,线程池异步逐条写入ES,造成ES大量的写请求,然后将架构改为如图

      就这样以为问题就解决了呢,第二天去服务器里拉下日志,发现之前的问题还是存在,就上百度,github,ES官网,很多人遇到相同问题,各种解决方案,对我们这套架构来说,都不管用。大概意思是ES客户端如果没有设置KeepAlive的话,默认为-1就是永不过期,然后就设置ES客户端的KeepAlive时间,问题还是复现,这边最终做了如下修改,从此ES客户端连接关闭问题就没有复现过

      我们是通过Nginx反向代理,构建一个ES集群,架构如下

    问题修复#

    步骤一#

      修改客户端tcp keepalive,系统默认为7200,修改为300(单位秒,5分钟)

    修改为300

    复制代码
    [root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 
    7200
    [root@ybchen-1 ~]# 
    [root@ybchen-1 ~]# 
    [root@ybchen-1 ~]# 
    [root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 
    7200
    [root@ybchen-1 ~]# echo 300 > /proc/sys/net/ipv4/tcp_keepalive_time 
    [root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 
    300
    [root@ybchen-1 ~]# 
    [root@ybchen-1 ~]# 
    [root@ybchen-1 ~]# 
    [root@ybchen-1 ~]# 
    复制代码

    步骤二#

      修改ES客户端的KeepAlive时间,并添加线程池大小等参数

    复制代码
    =========ES配置类-开始=============
    
    @Data
    @Component
    @ConfigurationProperties(prefix = "elastic.search")
    public class EsConfiguration {
        //主机
        private String host;
        //端口
        private int port;
        //集群名称
        private String clusterName;
        //访问协议,如:http
        private String schema;
        //用户名
        private String userName;
        //密码
        private String password;
    }
    
    =========ES配置类-结束=============
    
    ============ES客户端-开始=======================
        @Autowired
        EsConfiguration esConfiguration;
    
        private static RestHighLevelClient restHighLevelClient;
    
        public RestHighLevelClient getRestClient() {
    
            if (null != restHighLevelClient) {
                return restHighLevelClient;
            }
            List httpHosts = new ArrayList<>();
            //填充数据
            httpHosts.add(new HttpHost(esConfiguration.getHost(), esConfiguration.getPort()));
            //填充host节点
            RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));
    
            if (StringUtils.isNotBlank(esConfiguration.getUserName()) && StringUtils.isNotBlank(esConfiguration.getPassword())) {
                //填充用户名密码
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfiguration.getUserName(), esConfiguration.getPassword()));
                //异步链接延时配置
                builder.setRequestConfigCallback(requestConfigBuilder ->
                        requestConfigBuilder
                                .setConnectTimeout(5000) //5秒
                                .setSocketTimeout(5000)
                                .setConnectionRequestTimeout(5000)
                );
                //异步链接数配置
                builder.setHttpClientConfigCallback(httpClientBuilder -> {
                    //最大连接数100个
                    httpClientBuilder.setMaxConnTotal(100);
                    //最大路由连接数
                    httpClientBuilder.setMaxConnPerRoute(100);
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    // 设置KeepAlive为5分钟的时间,不设置默认为-1,也就是持续连接,然而这会受到外界的影响比如Firewall,会将TCP连接单方面断开,从而会导致Connection reset by peer的报错
                    // 参考github解决方案:https://github.com/TFdream/Elasticsearch-learning/issues/30
                    httpClientBuilder.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(3))
                            .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).setSoKeepAlive(true).build());
                    return httpClientBuilder;
                });
            }
    
            restHighLevelClient = new RestHighLevelClient(builder);
            return restHighLevelClient;
        }
    
    ==============ES客户端-结束================
    
    ========== ES客户端使用示例 ===============
    @Data
    @ApiModel("新增/更新ES通用数据结构")
    public class DocSearchVo implements Serializable {
    
        @ApiModelProperty(value = "文档ID")
        private String docId;
    
        @ApiModelProperty(value = "文档JSON")
        private String docStrJson;
    
    }
    
    
        /**
         * 批量添加文档
         *
         * @param indexName
         * @param docSearchVoList
         * @return
         */
        public boolean batchIndexDoc(String indexName, List docSearchVoList) {
            RestHighLevelClient client = getRestClient();
            BulkRequest request = new BulkRequest();
            for (DocSearchVo docSearchVo : docSearchVoList) {
                IndexRequest indexRequest = new IndexRequest(indexName).id(docSearchVo.getDocId()).source(docSearchVo.getDocStrJson(), XContentType.JSON);
                request.add(indexRequest);
            }
            client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener() {
                @Override
                public void onResponse(BulkResponse bulkItemResponses) {
                    log.debug("异步批量添加文档成功,indexName:{},docSearchVoList:{}", indexName, docSearchVoList);
                }
    
                @Override
                public void onFailure(Exception e) {
                    log.error("异步批量添加文档失败,indexName:{},docSearchVoList:{},错误信息:{}", indexName, docSearchVoList, e);
                }
            });
            return true;
        }
    复制代码

      注:细心的童鞋已经发现,创建ES客户端的时候,不是线程安全的单例模式(这块别的同事写的,我只是负责修改这个bug,然后就没管这个线程安全问题,其实是来背锅的,呜呜呜~~~~~~)

    步骤三#

      添加ES客户端心跳检查,30秒一次

    复制代码
    @Component
    @Slf4j
    public class EsSchedule {
        @Autowired
        EsRestService esRestService;
    
        /**
         * 30秒一次检查es状态
         */
        @Scheduled(fixedRate = 30 * 1000)
        public void heartbeatToES() {
            try {
                RequestOptions requestOptions = RequestOptions.DEFAULT.toBuilder().build();
                boolean result = esRestService.getRestClient().ping(requestOptions);
                log.info("检查ES状态:{}", result);
            } catch (Exception e) {
                log.error("检查ES状态发生异常:{}", e);
            }
        }
    }
    复制代码

    搞定~

  • 相关阅读:
    vue踩坑
    【云原生-Kurbernetes篇】K8s的存储卷/数据卷+PV与PVC
    网络学习(十) | 深入学习HTTPS与安全传输
    发布有关陕西省工程师职称评审申报细节注意事项
    安装插件时Vscode XHR Failed 报错ERR_CERT_AUTHORITY_INVALID
    理解系统内核linux phy驱动
    66.C++多态与虚函数
    linux脚本,导出oracle(spool)指定数据到txt
    模态贡献量在汽车NVH分析中的案例应用
    Java排序算法之贪心算法
  • 原文地址:https://www.cnblogs.com/chenyanbin/p/16860998.html