• 关于Semaphore信号量的源码解读


    Semaphore的简单使用

    利用Semaphore可以实现对线程数量的控制。比如如下的代码

    class SemaphoreTest{
        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(3);
    
            for (int i = 1; i <= 9; i++) {
                new Thread(()->{
                    try {
                        semaphore.acquire();
                        TimeUnit.SECONDS.sleep(2);
    
                        System.out.println("线程"+Thread.currentThread().getName()+"抢到了车位。");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
    
                },""+i).start();
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    首先看一下Semaphore的构造函数,这里可以看到Semaphore的构造函数调用了另外一个构造函数。

    public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
    • 1
    • 2
    • 3

    继续看一下这个被调用的构造函数,可以看到这个是Semaphore里面的一个静态内部类NonfairSync。从名字可以看出应该是一个非公平的东西,继承了Sync。

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    		#这里就是被调用的构造函数
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    我们继续看一下这个Sync是什么,可以看到Sync继承了AbstractQueuedSynchronizer ,也就是我们常说的AQS,抽象队列同步器。而上面NonfairSync 调用了super的构造函数,也就是Sync类的构造函数,下面的setState(permits);

    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    		#这里
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }
    
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    继续点进入setState(permits);的方法,这里调用的就是AQS的setState的方法,所以说Semaphore底层是基于AQS(抽象队列同步器实现的)。

    protected final void setState(int newState) {
            state = newState;
        }
    
    • 1
    • 2
    • 3

    我们继续研究semaphore.acquire();方法具体是怎么实现的

    public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    • 1
    • 2
    • 3

    上面方法调用了AQS的方法

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
                #这里返回false
            if (Thread.interrupted())
                throw new InterruptedException();
                #直接执行了这里的方法
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里要注意了,我们直接点进入tryAcquireShared(arg)方法是下面这样子的,这个是AQS的默认实现,千万不要以为我们的代码直接执行了这个函数。

    protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
    • 1
    • 2
    • 3

    实际上执行的是下面这个函数,这个是Semaphore的静态内部类

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    		//注意了,执行的是这个函数
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这个函数跳转到了Semaphore的静态内部类Sync里面执行下面这个函数

    final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                	//这个地方会获取state的值
                    int available = getState();
                    int remaining = available - acquires;
                    //这个时候这个remaining如果是一个负数,直接返回,不是负数说明可以直接获取到锁,然后CAS直接更新state的状态
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    返回之后退出到刚才上面的这个方法,这部分可以得出,这个方法的流程

    • 获取同步状态值
    • 每个线程进来就减去请求的值,此处请求的值是1.然后用可用同步状态值减去请求的值得到同步状态剩余的值。
    • 如果请求的值大于可用的值或者CAS操作把可用值改为剩余可用的值那么就返回剩下可用的值。
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
                //在这个判断如果小于0则直接对请求的线程进行一个入队的操作,我们主要分析 一下这个函数
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这就是semphore处理锁的核心逻辑,我们在看一下sync调用的acquireSharedInterruptibly的方法。此方法主要的目的就是处理那些没有获取到锁的线程在队列中的一个处理。

    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //addWaiter是把当前节点设置为共享模式然后添加到AQS维护的双向队列的尾部。
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    /*通过for循环不断的进行自旋操作,去判断当前节点的前继节点是不是头节点,如果前继节点
                    是头节点那么那么就去挣抢锁,如果争抢锁成功那么就把当前节点设置为头节点,同时唤醒队
                    列中的所有节点,一块在去争夺锁。*/
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    /*如果线程中断或者阻塞那么就抛出异常。最后如果方法中抛出了异常那么就把当前节点先设
                    置为取消状态然后在清除该节点。*/
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    最后看一下主函数里面的semaphore.release();方法。

    public void release() {
            sync.releaseShared(1);
        }
    
    • 1
    • 2
    • 3
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    tryReleaseShared方法也是AQS模版方法中的一个,它会调用Semaphore重写的方法,我们看一下tryReleaseShared释放方法在Semaphore中是怎么实现的

    protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    下面这个代码,就是唤醒阻塞队列中的线程,一旦唤醒线程,在同步队列中排队的队首(不是头结点)线程就会获取许可证,获取成功后,就执行相应的代码。

    private void doReleaseShared() {
        for (;;) {  // 死循环
            Node h = head;  
            if (h != null && h != tail) {  // 至少存在两个节点
                int ws = h.waitStatus;  // 获取状态值
                if (ws == Node.SIGNAL) {  // h若是SIGNAL状态,那么其后继节点即为要唤醒的节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);  // 唤醒头节点的后继节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)  // 头结点未变,直接退出,头结点被改变的唯一条件是有其他的线程修改了头结点
                break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    最后介绍一下公平锁和非公平锁

    公平锁,就是只有队首的节点线程,可以获取许可证,有其他的线程在获取许可证时,会被加入到队尾,等待获取锁。
    非公平锁,因为在对列中排队的线程,只有头结点的后继节点有资格可以获取锁,而在获取许可证时,有其他的线程(不是同步对列中的线程)进入,尝试去获取许可证,这两个线程都有可能获取到许可证,这就是非公平锁的特点。

    从上面的分析中可以看到,多态的特性,很多方法在执行的时候并不是我们直接点的进去的,而是根据实际的类型觉得调用哪些方法,所以分析的时候千万不要只是点进去某个方法,不然运行的时候可能不是这个方法。

    参考文章

  • 相关阅读:
    TS---类型设置
    新巨丰深交所上市:市值82亿 伊利是股东,贡献70%收入
    SpringMvc决战-【SpringMVC之自定义注解】
    文生视频综述
    centos7 设置默认登陆root
    【运行时数据区和程序计数器】
    【钩子函数和cookie与session简介】
    1732. 找到最高海拔
    【历史上的今天】6 月 30 日:冯·诺依曼发表第一份草案;九十年代末的半导体大战;CBS 收购 CNET
    SpringMVC详解
  • 原文地址:https://blog.csdn.net/qq_45401910/article/details/126338344