• AQS之LimitLatch分析 (十)


    1.LimitLatch 介绍

    之前分析的ReentrantLock和CountDownLatch都是JDK中对AQS的利用,分别实现了独占锁和共享锁。

    接着我们再来看一个LimitLatch,来学习下我们可以怎么将AQS应用到我们自己的程序中。

    LimitLatch则更像是java中的Semaphore,用于控制资源的使用。用作限流器。

    2.LimitLatch使用场景

    Tomcat使用LimitLatch类实现了对请求连接数的控制。LimitLatch并不是JDK实现的,而是tomcat实现的。

    当有一个连接进来的时候就会调用

    AbstractEndpoint#countUpOrAwaitConnectio()

        protected void countUpOrAwaitConnection() throws InterruptedException {
            if (maxConnections==-1) return;
            LimitLatch latch = connectionLimitLatch;
            if (latch!=null) latch.countUpOrAwait();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当一个请求进入到Tomcat的时候,就会调用latch.countUpOrAwaitConnection(),如果当前的链接数已经超过了最大限制,那么当前线程就会被阻塞,如果没有,那么当前线程就继续执行下去。

    当连接关闭或者取消的时候,就会调用countDown()方法来释放链接。在AbstractEndpoint#countDownConnection()方法中被调用。

    AbstractEndpoint#countDownConnection()

     protected long countDownConnection() {
            if (maxConnections==-1) return -1;
            LimitLatch latch = connectionLimitLatch;
            if (latch!=null) {
                long result = latch.countDown();
                if (result<0) {
                    getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
                }
                return result;
            } else return -1;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.LimitLatch 源码分析

    在这里插入图片描述

    在这里插入图片描述

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;
     
        public Sync() {
        }
    	// 获取资源 
        @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();//定义了AtomicLong类型的count数量,每次获取锁之后会加1
            if (!released && newCount > limit) {//是否超过limit的限制
                // Limit exceeded
                count.decrementAndGet();//获取失败后减1
                return -1;//返回-1代表获取锁失败,这是就只能进入队列了
            } else {
                return 1;
            }
        }
     	// 释放资源
        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();//释放锁的时候count数量减1
            return true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述

    LimitLatch首先定义了一个limit,每次获取锁时都会累计获取成功的线程数量,如果大于limit,怎获取成功等待入队,释放锁的时候线程数量会减1。

    3.1 Tomcat 初始化

    tomcat的Nio2EndPoint启动的时候,会创建LimitLatch,而LimitLatch中的limit,正是我们tomcat中配置的最大连接数。

    @Override
    public void startInternal() throws Exception {
        if (!running) {
            allClosed = false;
            running = true;
            paused = false;
            //省略部分代码
            initializeConnectionLatch();//初始化LimitLatch
            startAcceptorThread();
        }
    }
    protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) return null;
        if (connectionLimitLatch==null) {
            connectionLimitLatch = new LimitLatch(getMaxConnections());//根据配置的最大连接数初始化LimitLatch
        }
        return connectionLimitLatch;
    }
    public LimitLatch(long limit) {
        this.limit = limit;
        this.count = new AtomicLong(0);
        this.sync = new Sync();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    3.2 Tomcat 获取和释放连接

    LimitLatch初始化后,就可以对连接的获取和释放进行管理了。下面我们看一下Nio2Endpoint中的内部类Nio2Acceptor

    protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel> implements CompletionHandler<AsynchronousSocketChannel, Void> {
     
        protected int errorDelay = 0;
     
        public Nio2Acceptor(AbstractEndpoint<?, AsynchronousSocketChannel> endpoint) {
            super(endpoint);
        }
     
        @Override
        public void run() {
            // The initial accept will be called in a separate utility thread
            if (!isPaused()) {
                try {
                    countUpOrAwaitConnection();//已经达到了最大的连接数,则入队等待通知
                } catch (InterruptedException e) {
                    // Ignore
                }
                //省略部分代码
            } else {
                state = AcceptorState.PAUSED;
            }
        }
     
        @Override
        public void completed(AsynchronousSocketChannel socket,
                Void attachment) {
            // Successful accept, reset the error delay
            errorDelay = 0;
            // Continue processing the socket on the current thread
            // Configure the socket
            if (isRunning() && !isPaused()) {
                if (getMaxConnections() == -1) {
                    serverSock.accept(null, this);
                } else {
                    // Accept again on a new thread since countUpOrAwaitConnection may block
                    getExecutor().execute(this);
                }
                if (!setSocketOptions(socket)) {//处理socket失败,关闭
                    closeSocket(socket);
                }
            } else {
                if (isRunning()) {
                    state = AcceptorState.PAUSED;
                }
                destroySocket(socket);//调用closeSocket,关闭socket
            }
        }
     
        @Override
        public void failed(Throwable t, Void attachment) {
            if (isRunning()) {
                if (!isPaused()) {
                    if (getMaxConnections() == -1) {
                        serverSock.accept(null, this);
                    } else {
                        // Accept again on a new thread since countUpOrAwaitConnection may block
                        getExecutor().execute(this);
                    }
                } else {
                    state = AcceptorState.PAUSED;
                }
                // We didn't get a socket
                countDownConnection();
                //省略部分代码
            } else {
                // We didn't get a socket
                countDownConnection();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    如果连接数已经达到最大连接,则会调用countUpOrAwaitConnection方法, 入队等待

    protected void countUpOrAwaitConnection() throws InterruptedException {
        if (maxConnections==-1) return;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) latch.countUpOrAwait();//入队等待
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    连接初始化失败,会调用countDownConnection方法,而连接处理结束后会调用closeSocket(destroySocket也调用closeSocket),最终调用countDownConnection

    protected long countDownConnection() {
        if (maxConnections==-1) return -1;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            long result = latch.countDown();//最终调用LimitLatch的countDown方法,见下面代码
            if (result<0) {
                getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
            }
            return result;
        } else return -1;
    }
    public long countDown() {
        sync.releaseShared(0);//调用AQS中的releaseShared释放锁
        long result = getCount();
        if (log.isDebugEnabled()) {
            log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
    }
        return result;//返回count数量
    }
    public final boolean releaseShared(int arg) {//AQS中的代码
        if (tryReleaseShared(arg)) {//见文中开头的LimitLatch的中的锁代码
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    countUpOrAwait()

    public void countUpOrAwait() throws InterruptedException {
            if (log.isDebugEnabled()) {
                log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
            }
            sync.acquireSharedInterruptibly(1);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    和CountDownLatch一样,都是直接调用的AQS中的final方法acquireSharedInterruptibly(),来尝试获取共享锁。

    	    @Override
            protected int tryAcquireShared(int ignored) {
        	
                long newCount = count.incrementAndGet();
                //如果count大于最大的限制并且released标识为false(默认为false,在调用relaseAll的时候会置为true)
                if (!released && newCount > limit) {
                    // Limit exceeded
                    //原子性的减1,还原
                    count.decrementAndGet();
                    //这里返回-1 返回-1的时候就会将当前线程放入CLH队列中等待被唤醒
                    return -1;
                } else {
                    //返回1 则当前线程不需要被挂起
                    return 1;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    1. count是一个原子类,将count原子性的加一,然后将newCount与最大连接数比较,如果不超过最大连接数,那么成功获取共享锁,当前线程继续执行。
    2. 如果超过最大连接数,而且released属性为false(默认为false),那么就需要还原之前原子性的加一,然后返回-1,当前线程将进入自旋获取锁的过程,自旋过程中没有成功获取锁会被阻塞。
    3. 当前的链接数已经超过了最大限制,那么当前线程就会被阻塞,如果没有,那么当前线程就继续执行下去。
        public long countDown() {
            sync.releaseShared(0);
            long result = getCount();
            if (log.isDebugEnabled()) {
                log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
        }
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    AQS的final方法releaseShared()。

        protected boolean tryReleaseShared(int arg) {
                count.decrementAndGet();
                return true;
            }
    
    • 1
    • 2
    • 3
    • 4

    count原子性的减一,然后始终返回true。也就意味着调用LimitLatch.countDown()方法时,AQS.doReleaseShared()方法是一定会调用的。

    doReleaseShared(): 当头节点不是初始化的节点或者不为null时,就会去唤醒CLH队列中挂起的线程。而我们知道,当我们调用countUpOrAwait的时候,只有count超过limit限制的时候,才会构造一个Node,挂起当前线程。

  • 相关阅读:
    云主机、云服务器、VPS的区别性能比较
    C# 监测 Windows 设备变动事件
    计算机视觉与深度学习-经典网络解析-ZFNet-[北邮鲁鹏]
    Ipad2022可以用电容笔吗?双十一值得入手电容笔推荐
    Android扫码连接WIFI实现
    R中的min()函数 和max()函数
    亚远景科技-ASPICE评估输入
    Go语言结构体
    Nginx快速入门教程,域名转发、负载均衡
    双十二哪些数码好物值得入手?盘点双十二最值得入手的数码好物
  • 原文地址:https://blog.csdn.net/qq_43141726/article/details/127855315