• Java并发 JUC工具类:Semaphore详解


    Semaphore源码分析

    类的继承关系

    public class Semaphore implements java.io.Serializable {}
    
    • 1

    说明: Semaphore实现了Serializable接口,即可以进行序列化。

    类的内部类

    Semaphore总共有三个内部类,并且三个内部类是紧密相关的,下面先看三个类的关系。
    在这里插入图片描述
    说明: Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。下面逐个进行分析。

    类的内部类 - Sync类

    Sync类的源码如下:

    著作权归https://pdai.tech所有。
    链接:https://pdai.tech/md/java/thread/java-thread-x-juc-tool-semaphore.html
    
    // 内部类,继承自AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 版本号
        private static final long serialVersionUID = 1192457210091910933L;
        
        // 构造函数
        Sync(int permits) {
            // 设置状态数
            setState(permits);
        }
        
        // 获取许可
        final int getPermits() {
            return getState();
        }
    
        // 共享模式下非公平策略获取
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) { // 无限循环
                // 获取许可数
                int available = getState();
                // 剩余的许可
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining)) // 许可小于0或者比较并且设置状态成功
                    return remaining;
            }
        }
        
        // 共享模式下进行释放
        protected final boolean tryReleaseShared(int releases) {
            for (;;) { // 无限循环
                // 获取许可
                int current = getState();
                // 可用的许可
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next)) // 比较并进行设置成功
                    return true;
            }
        }
    
        // 根据指定的缩减量减小可用许可的数目
        final void reducePermits(int reductions) {
            for (;;) { // 无限循环
                // 获取许可
                int current = getState();
                // 可用的许可
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next)) // 比较并进行设置成功
                    return;
            }
        }
    
        // 获取并返回立即可用的所有许可
        final int drainPermits() {
            for (;;) { // 无限循环
                // 获取许可
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0)) // 许可为0或者比较并设置成功
                    return current;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    说明: Sync类的属性相对简单,只有一个版本号,Sync类存在如下方法和作用如下。
    在这里插入图片描述

    类的内部类 - NonfairSync类

    NonfairSync类继承了Sync类,表示采用非公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法,其源码如下:

    static final class NonfairSync extends Sync {
        // 版本号
        private static final long serialVersionUID = -2694183684443567898L;
        
        // 构造函数
        NonfairSync(int permits) {
            super(permits);
        }
        // 共享模式下获取
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    说明: 从tryAcquireShared方法的源码可知,其会调用父类Sync的nonfairTryAcquireShared方法,表示按照非公平策略进行资源的获取。

    类的内部类 - FairSync类

    FairSync类继承了Sync类,表示采用公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法,其源码如下。

    protected int tryAcquireShared(int acquires) {
        for (;;) { // 无限循环
            if (hasQueuedPredecessors()) // 同步队列中存在其他节点
                return -1;
            // 获取许可
            int available = getState();
            // 剩余的许可
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining)) // 剩余的许可小于0或者比较设置成功
                return remaining;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    类的属性

    public class Semaphore implements java.io.Serializable {
        // 版本号
        private static final long serialVersionUID = -3222578661600680210L;
        // 属性
        private final Sync sync;
    }
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    说明: Semaphore自身只有两个属性,最重要的是sync属性,基于Semaphore对象的操作绝大多数都转移到了对sync的操作。

    类的构造函数

    • Semaphore(int)型构造函数
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    • 1
    • 2
    • 3

    说明: 该构造函数会创建具有给定的许可数和非公平的公平设置的Semaphore。

    • Semaphore(int, boolean)型构造函数
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    • 1
    • 2
    • 3

    说明: 该构造函数会创建具有给定的许可数和给定的公平设置的Semaphore。

    核心函数分析 - acquire函数

    此方法从信号量获取一个(多个)许可,在提供一个许可前一直将线程阻塞,或者线程被中断,其源码如下

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    • 1
    • 2
    • 3

    说明: 该方法中将会调用Sync对象的acquireSharedInterruptibly(从AQS继承而来的方法)方法,而acquireSharedInterruptibly方法在上一篇CountDownLatch中已经进行了分析。

    最终可以获取大致的方法调用序列(假设使用非公平策略)。如下图所示。
    在这里插入图片描述
    说明: 上图只是给出了大体会调用到的方法,和具体的示例可能会有些差别,之后会根据具体的示例进行分析。

    核心函数分析 - release函数

    此方法释放一个(多个)许可,将其返回给信号量,源码如下。

    public void release() {
        sync.releaseShared(1);
    }
    
    • 1
    • 2
    • 3

    说明: 该方法中将会调用Sync对象的releaseShared(从AQS继承而来的方法)方法,而releaseShared方法在上一篇CountDownLatch中已经进行了分析。

    最终可以获取大致的方法调用序列(假设使用非公平策略)。如下图所示:
    在这里插入图片描述
    说明: 上图只是给出了大体会调用到的方法,和具体的示例可能会有些差别,之后会根据具体的示例进行分析。

    Semaphore 示例

    原文链接https://pdai.tech/md/java/thread/java-thread-x-juc-tool-semaphore.html#semaphore%E7%A4%BA%E4%BE%8B

    import java.util.concurrent.Semaphore;
    
    class MyThread extends Thread {
        private Semaphore semaphore;
    
        public MyThread(String name, Semaphore semaphore) {
            super(name);
            this.semaphore = semaphore;
        }
    
        public void run() {
            int count = 3;
            System.out.println(Thread.currentThread().getName() + " trying to acquire");
            try {
                semaphore.acquire(count);
                System.out.println(Thread.currentThread().getName() + " acquire successfully");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release(count);
                System.out.println(Thread.currentThread().getName() + " release successfully");
            }
        }
    }
    
    public class SemaphoreDemo {
        public final static int SEM_SIZE = 10;
    
        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(SEM_SIZE);
            MyThread t1 = new MyThread("t1", semaphore);
            MyThread t2 = new MyThread("t2", semaphore);
            t1.start();
            t2.start();
    
            //main线程
            int permits = 5;
            System.out.println(Thread.currentThread().getName() + " trying to acquire");
            try {
                semaphore.acquire(permits);
                System.out.println(Thread.currentThread().getName() + " acquire successfully");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " release successfully");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    运行结果(某一次):

    main trying to acquire
    main acquire successfully
    t1 trying to acquire
    t1 acquire successfully
    t2 trying to acquire
    t1 release successfully
    main release successfully
    t2 acquire successfully
    t2 release successfully
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    说明: 首先,生成一个信号量,信号量有10个许可,然后,main,t1,t2三个线程获取许可运行,根据结果,可能存在如下的一种时序。

    在这里插入图片描述

    更深入理解

    单独使用Semaphore是不会使用到AQS的条件队列的

    不同于CyclicBarrier和ReentrantLock,单独使用Semaphore是不会使用到AQS的条件队列的,其实,只有进行await操作才会进入条件队列,其他的都是在同步队列中,只是当前线程会被park。

    场景问题

    • semaphore初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?
      答案:拿不到令牌的线程阻塞,不会继续往下运行。
    • semaphore初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?
      答案:线程阻塞,不会继续往下运行。可能你会考虑类似于锁的重入的问题,很好,但是,令牌没有重入的概念。你只要调用一次acquire方法,就需要有一个令牌才能继续运行。
    • semaphore初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?
      答案:能,原因是release方法会添加令牌,并不会以初始化的大小为准。
  • 相关阅读:
    让我看看你们公司的代码规范都是啥样的?
    (未完待续)【Netty专题】Netty实战与核心组件详解
    Android 实现ProgressBar菊花旋转进度条的 效果(用于耗时loading时显示)
    如何在Linux系统中安装MySQL数据库
    什么是跨域?及7种跨域解决方法
    Redis 命令工具
    【自然语言处理之语言模型】讲解
    数据结构系列——先进先出队列queue
    Linux使用docker安装elasticsearch-head
    【网关路由测试】——诊断路由测试
  • 原文地址:https://blog.csdn.net/weixin_45773632/article/details/126682056