• Apache Tomcat如何高并发处理请求


    介绍

    作为常用的http协议服务器,tomcat应用非常广泛。tomcat也是遵循Servelt协议的,Servelt协议可以让服务器与真实服务逻辑代码进行解耦。各自只需要关注Servlet协议即可。
    对于tomcat是如何作为一个高性能的服务器的呢?你是不是也会有这样的疑问?

    tomcat是如何接收网络请求?

    如何做到高性能的http协议服务器?

    tomcat从8.0往后开始使用了NIO非阻塞io模型,提高了吞吐量,本文的源码是tomcat 9.0.48版本

    接收Socket请求

    org.apache.tomcat.util.net.Acceptor实现了Runnable接口,在一个单独的线程中以死循环的方式一直进行socket的监听

    线程的初始化及启动是在方法org.apache.tomcat.util.net.AbstractEndpoint#startAcceptorThread

    有个很重要的属性org.apache.tomcat.util.net.AbstractEndpoint;同时实现了run方法,方法中主要有以下功能:

    • 请求最大连接数限制: 最大为 8*1024;请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收
    • 获取socketChannel
    public void run() {
            int errorDelay = 0;
            try {
                // Loop until we receive a shutdown command
                while (!stopCalled) {
    					...
                    if (stopCalled) {
                        break;
                    }
                    state = AcceptorState.RUNNING;
    
                    try {
                        //if we have reached max connections, wait
                        // 如果连接超过了 8*1024,则线程阻塞等待; 是使用org.apache.tomcat.util.threads.LimitLatch类实现了分享锁(内部实现了AbstractQueuedSynchronizer)
                        // 请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。
                        endpoint.countUpOrAwaitConnection();
    
                        // Endpoint might have been paused while waiting for latch
                        // If that is the case, don't accept new connections
                        if (endpoint.isPaused()) {
                            continue;
                        }
    
                        U socket = null;
                        try {
                            // Accept the next incoming connection from the server
                            // socket
                            // 抽象方法,不同的endPoint有不同的实现方法。NioEndPoint为例,实现方法为serverSock.accept(),这个方法主要看serverSock实例化时如果为阻塞,accept方法为阻塞;反之为立即返回,如果没有socket链接,则为null
                            socket = endpoint.serverSocketAccept();
                        } catch (Exception ioe) {
                            // We didn't get a socket
                            endpoint.countDownConnection();
                            if (endpoint.isRunning()) {
                                // Introduce delay if necessary
                                errorDelay = handleExceptionWithDelay(errorDelay);
                                // re-throw
                                throw ioe;
                            } else {
                                break;
                            }
                        }
                        // Successful accept, reset the error delay
                        errorDelay = 0;
    
                        // Configure the socket
                        if (!stopCalled && !endpoint.isPaused()) {
                            // setSocketOptions() will hand the socket off to
                            // an appropriate processor if successful
                            // endPoint类的抽象方法,不同的endPoint有不同的实现。处理获取到的socketChannel链接,如果该socket链接能正常处理,那么该方法会返回true,否则为false
                            if (!endpoint.setSocketOptions(socket)) {
                                endpoint.closeSocket(socket);
                            }
                        } else {
                            endpoint.destroySocket(socket);
                        }
                    } catch (Throwable t) {
                        ...
                    }
                }
            } finally {
                stopLatch.countDown();
            }
            state = AcceptorState.ENDED;
        }
    

    再来看下org.apache.tomcat.util.net.NioEndpoint#setSocketOptions方法的具体实现(NioEndpoint为例)

    这个方法中主要做的事:

    • 创建NioChannel
    • 设置socket为非阻塞
    • 将socket添加到Poller的队列中
     protected boolean setSocketOptions(SocketChannel socket) {
            NioSocketWrapper socketWrapper = null;
            try {
                // Allocate channel and wrapper
                // 优先使用已有的缓存nioChannel
                NioChannel channel = null;
                if (nioChannels != null) {
                    channel = nioChannels.pop();
                }
                if (channel == null) {
                    SocketBufferHandler bufhandler = new SocketBufferHandler(
                            socketProperties.getAppReadBufSize(),
                            socketProperties.getAppWriteBufSize(),
                            socketProperties.getDirectBuffer());
                    if (isSSLEnabled()) {
                        channel = new SecureNioChannel(bufhandler, this);
                    } else {
                        channel = new NioChannel(bufhandler);
                    }
                }
                // 将nioEndpoint与NioChannel进行包装
                NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
                channel.reset(socket, newWrapper);
                connections.put(socket, newWrapper);
                socketWrapper = newWrapper;
    
                // Set socket properties
                // Disable blocking, polling will be used
                // 设置当前链接的socket为非阻塞
                socket.configureBlocking(false);
                if (getUnixDomainSocketPath() == null) {
                    socketProperties.setProperties(socket.socket());
                }
    
                socketWrapper.setReadTimeout(getConnectionTimeout());
                socketWrapper.setWriteTimeout(getConnectionTimeout());
                socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
                // 将包装后的nioChannel与nioEndpoint进行注册,注册到Poller,将对应的socket包装类添加到Poller的队列中,同时唤醒selector
                poller.register(socketWrapper);
                return true;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                try {
                    log.error(sm.getString("endpoint.socketOptionsError"), t);
                } catch (Throwable tt) {
                    ExceptionUtils.handleThrowable(tt);
                }
                if (socketWrapper == null) {
                    destroySocket(socket);
                }
            }
            // Tell to close the socket if needed
            return false;
        }
    
    

    Socket请求轮询

    上一小节是接收到了socket请求,进行包装之后,将socket添加到了Poller的队列上,并可能唤醒了Selector,本小节就来看看,Poller是如何进行socket的轮询的。

    首先org.apache.tomcat.util.net.NioEndpoint.Poller也是实现了Runnable接口,是一个可以单独启动的线程

    初始化及启动是在org.apache.tomcat.util.net.NioEndpoint#startInternal

    重要的属性:

    • java.nio.channels.Selector:在Poller对象初始化的时候,就会启动轮询器
    • SynchronizedQueue<PollerEvent>:同步的事件队列

    再来看下具体处理逻辑,run方法的源码

    		public void run() {
                // Loop until destroy() is called
                while (true) {
    
                    boolean hasEvents = false;
    
                    try {
                        if (!close) {
                            // 去SynchronizedQueue事件队列中拉去,看是否已经有了事件,如果有,则返回true
                            // 如果从队列中拉取到了event(即上一步将NioSocketWrapper封装为PollerEvent添加到次队列中),将socketChannel注册到Selector上,标记为SelectionKey.OP_READ,添加处理函数attachment(为Accetpor添加到Poller时的    
                            // NioSocketWrapper)
                            hasEvents = events();
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                // If we are here, means we have other stuff to do
                                // Do a non blocking select
                                keyCount = selector.selectNow();
                            } else {
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        if (close) {
                            events();
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                        // Either we timed out or we woke up, process events first
                        if (keyCount == 0) {
                            hasEvents = (hasEvents | events());
                        }
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                        continue;
                    }
    
                    Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    // selector轮询获取已经注册的事件,如果有事件准备好,此时通过selectKeys方法就能拿到对应的事件
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        // 获取到事件后,从迭代器删除事件,防止事件重复轮询
                        iterator.remove();
                        // 获取事件的处理器,这个attachment是在event()方法中注册的,后续这个事件的处理,就交给这个wrapper去处理
                        NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (socketWrapper != null) {
                            processKey(sk, socketWrapper);
                        }
                    }
    
                    // Process timeouts
                    timeout(keyCount,hasEvents);
                }
    
                getStopLatch().countDown();
            }
    

    在这里,有一个很重要的方法,org.apache.tomcat.util.net.NioEndpoint.Poller#events(),他是从Poller的事件队列中获取Acceptor接收到的可用socket,并将其注册到Selector

    		/**
             * Processes events in the event queue of the Poller.
             *
             * @return <code>true</code> if some events were processed,
             *   <code>false</code> if queue was empty
             */
            public boolean events() {
                boolean result = false;
    
                PollerEvent pe = null;
                // 如果Acceptor将socket添加到队列中,那么events.poll()方法就能拿到对应的事件,否则拿不到就返回false
                for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
                    result = true;
                    NioSocketWrapper socketWrapper = pe.getSocketWrapper();
                    SocketChannel sc = socketWrapper.getSocket().getIOChannel();
                    int interestOps = pe.getInterestOps();
                    if (sc == null) {
                        log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
                        socketWrapper.close();
                    } else if (interestOps == OP_REGISTER) {
                        // 如果是Acceptor刚添加到队列中的事件,那么此时的ops就是OP_REGISTER
                        try {,
                            // 将次socket注册到selector上,标记为OP_READ事件,添加事件触发时处理函数socketWrapper
                            sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
                        } catch (Exception x) {
                            log.error(sm.getString("endpoint.nio.registerFail"), x);
                        }
                    } else {
                        // ??这里的逻辑,不清楚什么情况下会进入到这个分支里面
                        final SelectionKey key = sc.keyFor(getSelector());
                        if (key == null) {
                            // The key was cancelled (e.g. due to socket closure)
                            // and removed from the selector while it was being
                            // processed. Count down the connections at this point
                            // since it won't have been counted down when the socket
                            // closed.
                            socketWrapper.close();
                        } else {
                            final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
                            if (attachment != null) {
                                // We are registering the key to start with, reset the fairness counter.
                                try {
                                    int ops = key.interestOps() | interestOps;
                                    attachment.interestOps(ops);
                                    key.interestOps(ops);
                                } catch (CancelledKeyException ckx) {
                                    cancelledKey(key, socketWrapper);
                                }
                            } else {
                                cancelledKey(key, socketWrapper);
                            }
                        }
                    }
                    if (running && !paused && eventCache != null) {
                        pe.reset();
                        eventCache.push(pe);
                    }
                }
    
                return result;
            }
    

    还有一个重要方法就是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey,上一个方法是获取event,并注册到selector,那这个方法就是通过Selector获取到的数据准备好的event,并开始封装成对应的业务处理线程SocketProcessorBase,扔到线程池里开始处理

    	    protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
                try {
                    if (close) {
                        cancelledKey(sk, socketWrapper);
                    } else if (sk.isValid()) {
                        if (sk.isReadable() || sk.isWritable()) {
                            if (socketWrapper.getSendfileData() != null) {
                                processSendfile(sk, socketWrapper, false);
                            } else {
                                unreg(sk, socketWrapper, sk.readyOps());
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    //这里如果是异步的操作,就会走这里
                                    if (socketWrapper.readOperation != null) {
                                        if (!socketWrapper.readOperation.process()) {
                                            closeSocket = true;
                                        }
                                    } else if (socketWrapper.readBlocking) {
                                        // readBlocking默认为false
                                        synchronized (socketWrapper.readLock) {
                                            socketWrapper.readBlocking = false;
                                            socketWrapper.readLock.notify();
                                        }
                                    } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                        // 处理正常的事件,这里的processSocket就要正式开始处理请求了。
                                        // 将对应的事件封装成对应的线程,然后交给线程池去处理正式的请求业务
                                        closeSocket = true;
                                    }
                                }
                                if (!closeSocket && sk.isWritable()) {
                                    if (socketWrapper.writeOperation != null) {
                                        if (!socketWrapper.writeOperation.process()) {
                                            closeSocket = true;
                                        }
                                    } else if (socketWrapper.writeBlocking) {
                                        synchronized (socketWrapper.writeLock) {
                                            socketWrapper.writeBlocking = false;
                                            socketWrapper.writeLock.notify();
                                        }
                                    } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    cancelledKey(sk, socketWrapper);
                                }
                            }
                        }
                    } else {
                        // Invalid key
                        cancelledKey(sk, socketWrapper);
                    }
                } catch (CancelledKeyException ckx) {
                    cancelledKey(sk, socketWrapper);
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
                }
            }
    

    请求具体处理

    上一步,Selector获取到了就绪的请求socket,然后根据socket注册的触发处理函数等,将这些数据进行封装,扔到了线程池里,开始具体的业务逻辑处理。本节就是从工作线程封装开始,org.apache.tomcat.util.net.SocketProcessorBase为工作线程类的抽象类,实现了Runnable接口,不同的Endpoint实现具体的处理逻辑,本节以NioEndpoint为例

    以下为org.apache.tomcat.util.net.AbstractEndpoint#processSocket方法源码

        /**
         * Process the given SocketWrapper with the given status. Used to trigger
         * processing as if the Poller (for those endpoints that have one)
         * selected the socket.
         *
         * @param socketWrapper The socket wrapper to process
         * @param event         The socket event to be processed
         * @param dispatch      Should the processing be performed on a new
         *                          container thread
         *
         * @return if processing was triggered successfully
         */
        public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                SocketEvent event, boolean dispatch) {
            try {
                if (socketWrapper == null) {
                    return false;
                }
                // 优先使用已经存在的线程
                SocketProcessorBase<S> sc = null;
                if (processorCache != null) {
                    sc = processorCache.pop();
                }
                if (sc == null) {
                    sc = createSocketProcessor(socketWrapper, event);
                } else {
                    sc.reset(socketWrapper, event);
                }
                // 获取线程池。线程池的初始化,是在Acceptor、Poller这两个单独线程启动之前创建
                // tomcat使用了自定义的org.apache.tomcat.util.threads.TaskQueue,这块tomcat也进行了小的适配开发
                // 核心线程为10个,最大200线程
                Executor executor = getExecutor();
                if (dispatch && executor != null) {
                    executor.execute(sc);
                } else {
                    sc.run();
                }
            } catch (RejectedExecutionException ree) {
                getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
                return false;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                // This means we got an OOM or similar creating a thread, or that
                // the pool and its queue are full
                getLog().error(sm.getString("endpoint.process.fail"), t);
                return false;
            }
            return true;
        }
    

    上面的方法是得到了处理业务逻辑的线程SocketProcessorBase,NioEndpoint内部类org.apache.tomcat.util.net.NioEndpoint.SocketProcessor继承了这个抽象类,也就是具体的业务处理逻辑在org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun方法中,最终调用到我们的Servlet

            protected void doRun() {
                /*
                 * Do not cache and re-use the value of socketWrapper.getSocket() in
                 * this method. If the socket closes the value will be updated to
                 * CLOSED_NIO_CHANNEL and the previous value potentially re-used for
                 * a new connection. That can result in a stale cached value which
                 * in turn can result in unintentionally closing currently active
                 * connections.
                 */
                Poller poller = NioEndpoint.this.poller;
                if (poller == null) {
                    socketWrapper.close();
                    return;
                }
    
                try {
                    int handshake = -1;
                    try {
                        // 握手相关判断逻辑
                       ... 
                    } catch (IOException x) {
                      ...
                    }
                    // 三次握手成功了
                    if (handshake == 0) {
                        SocketState state = SocketState.OPEN;
                        // Process the request from this socket
                        // event为SocketEvent.OPEN_READ,这个变量是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey方法赋值
                        if (event == null) {
                            state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                        } else {
                            // 这里就开始正式处理请求了
                            state = getHandler().process(socketWrapper, event);
                        }
                        if (state == SocketState.CLOSED) {
                            poller.cancelledKey(getSelectionKey(), socketWrapper);
                        }
                    } else if (handshake == -1 ) {
                        getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                        poller.cancelledKey(getSelectionKey(), socketWrapper);
                    } else if (handshake == SelectionKey.OP_READ){
                        socketWrapper.registerReadInterest();
                    } else if (handshake == SelectionKey.OP_WRITE){
                        socketWrapper.registerWriteInterest();
                    }
                } catch (CancelledKeyException cx) {
                    poller.cancelledKey(getSelectionKey(), socketWrapper);
                } catch (VirtualMachineError vme) {
                    ExceptionUtils.handleThrowable(vme);
                } catch (Throwable t) {
                    log.error(sm.getString("endpoint.processing.fail"), t);
                    poller.cancelledKey(getSelectionKey(), socketWrapper);
                } finally {
                    socketWrapper = null;
                    event = null;
                    //return to cache
                    if (running && !paused && processorCache != null) {
                        processorCache.push(this);
                    }
                }
            }
    

    总结

    • Tomcat是如何接收网络请求?

      使用java nio的同步非阻塞去进行网络监听。

      org.apache.tomcat.util.net.AbstractEndpoint#bindWithCleanup中初始化网络监听、SSL

      		{	
                  ....
                  serverSock = ServerSocketChannel.open();
                  socketProperties.setProperties(serverSock.socket());
                  InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
                  // 当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过acceptCount 参数配置,默认是 100
                  serverSock.bind(addr, getAcceptCount());
              }
              serverSock.configureBlocking(true); //mimic APR behavior
      

      org.apache.tomcat.util.net.NioEndpoint#startInternal中初始化业务处理的线程池、连接限制器、Poller线程、Acceptor线程

    • 如何做到高性能的http协议服务器?

      Tomcat把接收连接、检测 I/O 事件以及处理请求进行了拆分,用不同规模的线程去做对应的事情,这也是tomcat能高并发处理请求的原因。不让线程阻塞,尽量让CPU忙起来

    tomcat-socket

    • 是怎么设计的呢?

      通过接口、抽象类等,将不同的处理逻辑拆分,各司其职

      • org.apache.tomcat.util.net.AbstractEndpoint:I/O事件的检测、处理逻辑都在这个类的实现类里面。使用模板方法,不同的协议有不同的实现方法。NioEndpoint/Nio2Endpoint/AprEndpoint
        • org.apache.tomcat.util.net.NioEndpoint.Poller:引用了java.nio.channels.Selector,内部有个事件队列,监听I/O事件具体就是在这里做的
        • org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper
        • org.apache.tomcat.util.net.NioEndpoint.SocketProcessor: 具体处理请求的线程类

    参考:

    NioEndpoint组件:Tomcat如何实现非阻塞I/O?

    Java NIO浅析

  • 相关阅读:
    解决mvn常用指令build failure的问题
    信道加密设备与网络加密设备差异性研究和运维保障管理
    docker-compose 网络配置- IP 主机名 hosts配置
    那些破釜沉舟入局Web3.0的互联网精英都怎么样了?
    【从零开始的Java开发】2-8-3 CSS浮动:DIV、盒子模型、浮动
    通过示例详细了解ES6导入导出模块
    这15个大模型LLM最新研究成果很有看点
    mysql varchar & int
    安全渗透测试基础知识之网络基础知识
    51单片机基础(C语言):定时器时钟
  • 原文地址:https://www.cnblogs.com/chenzw93/p/16072325.html