• 什么是长轮询


    轮询 vs 长轮询

    在看apollo和nacos等配置中心的源码的时候发现,配置更新的实时感知都是采用的长轮询的方式。那么什么是长轮询的呢?在讲解长轮询之前我们先了解一下什么是短轮询。

    短轮询

    首先说轮询的场景:轮询,顾名思义,就是一遍一遍的查询。比如配置中心修改配置的这种场景,我们业务方的系统需要及时感知到关心的配置是否有更新。能想到最简单的方式就是不断地发http请求,然后配置中心接收到请求之后,实时返回结果,告诉客户端关心的配置是否有更新。

    image-20220728095940953

    这是最简单也是最容易想到的实现方式,但是它有自己的弊端:到底多久请求一次呢?如果频率较高,那么就会导致服务端压力大;如果请求的频率放低,那么客户端感知变更的及时性就会降低。

    长轮询就不存在这样的问题,下面对长轮询进行介绍。

    长轮询

    长轮询的含义就是:客户端发起请求,如果服务端的数据没有发生变更,那么就hold住请求,直到服务端的数据发生了变更,或者达到了一定的时间就会返回。这样就减少了客户端和服务端不断频繁连接和传递数据的过程,并且不会消耗服务端太多资源。

    image-20220728205157523

    这里大家可能会有两个疑问:

    1. 为什么达到时间就返回,既然 是长轮询,为什么不一直hold住请求,直到数据发生变更再返回呢?
    2. 服务端hold住难道不消耗线程吗,不是线程一直阻塞在那里吗?

    第一个问题主要有两个层面的考虑,一是连接稳定性的考虑,长轮询在传输层本质上还是走的 TCP 协议,如果服务端假死、fullgc 等异常问题,或者是重启等常规操作,长轮询没有应用层的心跳机制,仅仅依靠 TCP 层的心跳保活很难确保可用性,所以一次长轮询设置一定的超时时间也是在确保可用性。二是在配置中心的使用过程中,用户可能随时新增配置监听,而在此之前,长轮询可能已经发出,新增的配置监听无法包含在旧的长轮询中,所以在配置中心的设计中,一般会在一次长轮询结束后,将新增的配置监听给捎带上,而如果长轮询没有超时时间,只要配置一直不发生变化,响应就无法返回,新增的配置也就没法设置监听了。

    第二个问题可以这样理解:既然是通过让服务端长时间hold住请求实现长轮询的,那么必然不会是让服务端线程阻塞在那里等待数据变更的,我们都知道tomcat线程池默认是200,话句话就是说,服务端的线程是很宝贵的资源,如果有200个这样的长轮询的请求把线程阻塞在那里,也这个服务器基本就属于宕机的状态了,其他什么请求也处理不了了。所以这里说的hold住请求,并不是让线程一直阻塞着,而是tomcat线程把request和response引用放在服务端全局的集合中,由单独的一个或几个线程处理这些请求,而tomcat线程把本次请求的request和response引用放入全局集合中之后,当前的使命就算是完成了,从而可以被调度去处理其他请求。

    长轮询的原理

    在上面长轮询的图中我们看到,客户端侧标注了一个timeout时间90s,服务端最长的hold时间是80s,两个时间只是个示例,代表的是服务端hold的时间要小于客户端设置的超时时间。这也很容易理解,如果服务端的hold时间大于客户端设置的超时时间,那么大概率客户端会出现timeout异常,这是非常不优雅的。

    上节讲了,长轮询不可能一直占用tomcat的线程池,所以需要采用异步响应的方式去实现,而比较方便实现异步http的方式就是Servlet3.0提供的AsyncContext 机制。

    asyncContext是为了把主线程返回给tomcat线程池,不影响服务对其他客户端请求。会有线程专门处理这个长轮询,但并不是说每一个长轮询的http请求都要用一个线程阻塞在那。而是把长轮询的request的引用在一个集合中存起来,用一个或几个线程专门处理一批客户端的长轮询请求,这样就不需要为每一个长轮询单独分配线程阻塞在那了,从而大大降低了资源的消耗,

    demo

    @RestController
    public class ConfigServer {
    
        @Data
        private static class AsyncTask {
            // 长轮询请求的上下文,包含请求和响应体
            private AsyncContext asyncContext;
            // 超时标记
            private boolean timeout;
    
            public AsyncTask(AsyncContext asyncContext, boolean timeout) {
                this.asyncContext = asyncContext;
                this.timeout = timeout;
            }
        }
    
        // guava 提供的多值 Map,一个 key 可以对应多个 value,这个就是我们上节说的全局集合,不会随着请求的结束而销毁
        private volatile Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap
            .create());
    
        private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
            .build();
      // 创建一个延时任务,这个相当于单独的一个守护线程,所有长轮询的任务的超时检查都由这个线程处理
        private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
    
        //  客户端通过请求这个接口用于感知到配置是否有变更
        @RequestMapping("/listener")
        public void addListener(HttpServletRequest request, HttpServletResponse response) {
    
            String dataId = request.getParameter("dataId");
    
            // 开启异步,这里是将客户端请求的request和response包装成AsyncContext对象,AsyncContext对象又被asyncTask包装
            AsyncContext asyncContext = request.startAsync(request, response);
            AsyncTask asyncTask = new AsyncTask(asyncContext, true);
    
          // 把asyncTask放入到dataIdContext中,这样即使走下面的异步任务,当前主线程的任务结束,当前请求也会被hold住
            dataIdContext.put(dataId, asyncTask);
            // 启动定时器,30s 后写入 304 响应,
            timeoutChecker.schedule(() -> {
                if (asyncTask.isTimeout()) {
                    dataIdContext.remove(dataId, asyncTask);
                    response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                    asyncContext.complete();
                }
            }, 30000, TimeUnit.MILLISECONDS);
        }
      
      
      //  配置发布接入点,这里是修改配置的入口,
        @RequestMapping("/publishConfig")
        public String publishConfig(String dataId, String configInfo) {
            
          // 对应的配置从dataIdContext中取出
            Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
    
            for (AsyncTask asyncTask : asyncTasks) {
                asyncTask.setTimeout(false);
              // 设置response并返回客户端
                HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
                response.setStatus(HttpServletResponse.SC_OK);
                response.getWriter().println(configInfo);
                asyncTask.getAsyncContext().complete();
            }
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    tomcat线程池

    上面是长轮询的demo,有些同学可能还是不太理解线程和request的区别,下面大概讲解一个请求在服务端处理的整个过程。

    • tcp三次握手后
    • Acceptor线程处理 socket accept
    • Acceptor线程处理 注册registered OP_READ到多路复用器
    • ClientPoller线程 监听多路复用器的事件(OP_READ)触发
    • 从tomcat的work线程池取一个工作线程来处理socket[http-nio-8080-exec-xx],下面几个步骤也都是在work线程中进行处理的
    • 因为是http协议所以用Http11Processor来解析协议
    • CoyoteAdapter来适配包装成Request和Response对象
    • 开始走pipeline管道(Valve),最后一个invoke的是把我们的servlet对象包装的StandardWrapperValve管道

    接下来就走到我们的servlet,由于是我们是异步的servlet,当在tomcat的work线程中调用startAsync(),会创建了一个异步的上下文(AsyncContext),并且异步的上下文(AsyncContext)会设置这个状态机状态为 STARTING, 然后把这个异步上下文放到了我们的自定义线程池中去执行,对于tomcat的work线程而言,servlet调用就结束了!在这个时候request和response由于被AsyncContext对象引用,是不会被释放的。虽然Request和Response没有释放,但是这根work线程回到tomcat的线程池中去了。

    过程可以通过下图解释:

    image-20220803095200111

    AsyncContext源码分析

    AsyncContext asyncContext = request.startAsync(request, response);
    asyncContext.complete();
    
    • 1
    • 2

    实现异步主要就是这两步

    1. 通过request创建一个asyncContext对象
    2. 通过asyncContext调用complete方法完成数据返回

    先看第一步的源码:

    Requestpublic AsyncContext startAsync(ServletRequest request, ServletResponse response) {
        if (!this.isAsyncSupported()) {  //要开启异步处理支持,否则在这一步直接就会抛异常
            IllegalStateException ise = new IllegalStateException(sm.getString("request.asyncNotSupported"));
            log.warn(sm.getString("coyoteRequest.noAsync", new Object[]{StringUtils.join(this.getNonAsyncClassNames())}), ise);
            throw ise;
        } else {
            if (this.asyncContext == null) {
                this.asyncContext = new AsyncContextImpl(this);
            }
    
          // 创建并调用setStarted方法
            this.asyncContext.setStarted(this.getContext(), request, response, request == this.getRequest() && response == this.getResponse().getResponse());
          // 设置超时时间
            this.asyncContext.setTimeout(this.getConnector().getAsyncTimeout());
            return this.asyncContext;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    1. 业务方法开启异步化上下文AsynContext;释放tomcat当前处理线程;
    2. tomcat判断当前请求是否开启了异步化,如果开启则不关闭响应流Response,也不进行用户响应的返回;
    AsyncContextImplpublic void setStarted(Context context, ServletRequest request, ServletResponse response, boolean originalRequestResponse) {
        Object var5 = this.asyncContextLock;
        synchronized(this.asyncContextLock) {
            this.request.getCoyoteRequest().action(ActionCode.ASYNC_START, this);
            this.context = context;
            this.servletRequest = request;
            this.servletResponse = response;
            this.hasOriginalRequestAndResponse = originalRequestResponse;
            this.event = new AsyncEvent(this, request, response);
            List<AsyncListenerWrapper> listenersCopy = new ArrayList();
            listenersCopy.addAll(this.listeners);
            this.listeners.clear();
            Iterator i$ = listenersCopy.iterator();
    
            while(i$.hasNext()) {
                AsyncListenerWrapper listener = (AsyncListenerWrapper)i$.next();
    
                try {
                    listener.fireOnStartAsync(this.event);
                } catch (Throwable var11) {
                    ExceptionUtils.handleThrowable(var11);
                    log.warn("onStartAsync() failed for listener of type [" + listener.getClass().getName() + "]", var11);
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    第二步的源码

    public void complete() {
        if (log.isDebugEnabled()) {
            this.logDebug("complete   ");
        }
    
        this.check();
        this.request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, (Object)null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    源码暂时没看太深入,先记个todo吧,等后续深入看下。。。

    参考:

    https://blog.csdn.net/weixin_45727359/article/details/113533256

    https://www.shuzhiduo.com/A/o75NNk1x5W/

  • 相关阅读:
    Library <iconv2.4.0> not found 解决方法
    风控违约场景如何预测,来看看这份常见的三种模型实现算法对比
    Zookeeper集群搭建及原理
    SpringBoot如何自定义starter启动器?看这里
    Linux系统查看有几块硬盘
    【Windows Server 2019】NTP服务的配置和管理——使用GUI与CLI设置
    美团二面:SpringBoot读取配置优先级顺序是什么?
    基于控制性能指标的重放攻击编码检测方案
    Java框架 Spring5--JdbcTemplate
    不同类型时间戳
  • 原文地址:https://blog.csdn.net/u013978512/article/details/126186884