• 【JUC】信号量Semaphore详解


    前言

    大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行。那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore

    介绍和使用

    • Semaphore(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量
    • Semaphore的许可数量如果小于0个,就会阻塞获取,直到有线程释放许可
    • Semaphore是一个非重入锁

    API介绍

    1. 构造方法
    • public Semaphore(int permits)permits 表示许可线程的数量
    • public Semaphore(int permits, boolean fair)fair 表示公平性,如果设为 true,表示是公平,那么等待最久的线程先执行
    1. 常用API
    • public void acquire():表示一个线程获取1个许可,那么线程许可数量相应减少一个
    • public void release():表示释放1个许可,那么线程许可数量相应会增加
    1. 其他API
    • void acquire(int permits):表示一个线程获取n个许可,这个数量由参数permits决定
    • void release(int permits):表示一个线程释放n个许可,这个数量由参数permits决定
    • int availablePermits():返回当前信号量线程许可数量
    • int getQueueLength(): 返回等待获取许可的线程数的预估值

    基本使用

    1. public static void main(String[] args) {
    2. // 1. 创建 semaphore 对象
    3. Semaphore semaphore = new Semaphore(2);
    4. // 2. 10个线程同时运行
    5. for (int i = 0; i < 8; i++) {
    6. new Thread(() -> {
    7. // 3. 获取许可
    8. try {
    9. semaphore.acquire();
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. try {
    14. log.debug("running...");
    15. sleep(1);
    16. log.debug("end...");
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. } finally {
    20. // 4. 释放许可
    21. semaphore.release();
    22. }
    23. }).start();
    24. }
    25. }
    26. 复制代码

    运行结果:

    原理介绍

    上面是Semaphore的类结构图,其中FairSyncNonfairSync是它的内部类,他们共同继承了AQS类,AQS的共享模式提供了Semaphore的加锁、解锁。

    如果对AQS不了解的请移步深入浅出理解Java并发AQS的共享锁模式

    为了更好的搞懂原理,我们通过一个例子来帮助我们理解。

    假设Semaphorepermits为 3,这时 5 个线程来获取资源,其中Thread-1Thread-2Thread-4CAS 竞争成功,permits 变为 0,而 Thread-0 Thread-3 竞争失败。

    获取许可acquire()

    • acquire()主方法会调用 sync.acquireSharedInterruptibly(1)方法
    • acquireSharedInterruptibly()方法会先调用tryAcquireShared()方法返回许可的数量,如果小于0个,调用doAcquireSharedInterruptibly()方法进入阻塞
    1. // acquire() -> sync.acquireSharedInterruptibly(1),可中断
    2. public final void acquireSharedInterruptibly(int arg) {
    3. if (Thread.interrupted())
    4. throw new InterruptedException();
    5. // 尝试获取通行证,获取成功返回 >= 0的值
    6. if (tryAcquireShared(arg) < 0)
    7. // 获取许可证失败,进入阻塞
    8. doAcquireSharedInterruptibly(arg);
    9. }
    10. 复制代码
    • tryAcquireShared()方法在终会调用到Sync#nonfairTryAcquireShared()方法
    • nonfairTryAcquireShared()方法中会减去获取的许可数量,返回剩余的许可数量
    1. // tryAcquireShared() -> nonfairTryAcquireShared()
    2. // 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
    3. final int nonfairTryAcquireShared(int acquires) {
    4. for (;;) {
    5. // 获取 state ,state 这里【表示通行证】
    6. int available = getState();
    7. // 计算当前线程获取通行证完成之后,通行证还剩余数量
    8. int remaining = available - acquires;
    9. // 如果许可已经用完, 返回负数, 表示获取失败,
    10. if (remaining < 0 ||
    11. // 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
    12. compareAndSetState(available, remaining))
    13. return remaining;
    14. }
    15. }
    16. 复制代码
    • 如果剩余的许可数量<0, 会调用doAcquireSharedInterruptibly()方法将当前线程加入到阻塞队列中阻塞
    • 方法中调用parkAndCheckInterrupt()阻塞当前线程
    1. private void doAcquireSharedInterruptibly(int arg) {
    2. // 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
    3. final Node node = addWaiter(Node.SHARED);
    4. // 获取标记
    5. boolean failed = true;
    6. try {
    7. for (;;) {
    8. final Node p = node.predecessor();
    9. // 前驱节点是头节点可以再次获取许可
    10. if (p == head) {
    11. // 再次尝试获取许可,【返回剩余的许可证数量】
    12. int r = tryAcquireShared(arg);
    13. if (r >= 0) {
    14. // 成功后本线程出队(AQS), 所在 Node设置为 head
    15. // r 表示【可用资源数】, 为 0 则不会继续传播
    16. setHeadAndPropagate(node, r);
    17. p.next = null; // help GC
    18. failed = false;
    19. return;
    20. }
    21. }
    22. // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
    23. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    24. throw new InterruptedException();
    25. }
    26. } finally {
    27. // 被打断后进入该逻辑
    28. if (failed)
    29. cancelAcquire(node);
    30. }
    31. }
    32. 复制代码

    最终的AQS状态如下图所示:

    • Thread-1Thread-2Thread-4正常运行

    • AQS的state也就是等于0

    • Thread-0Thread-3再阻塞队列中

    释放许可release()

    现在Thread-4运行完毕,要释放许可,Thread-0Thread-3又是如何恢复执行的呢?

    • 调用release()方法释放许可,最终调用 Sync#releaseShared()方法
    • 如果方法tryReleaseShared(arg)尝试释放许可成功,那么调用doReleaseShared();进行唤醒
    1. // release() -> releaseShared()
    2. public final boolean releaseShared(int arg) {
    3. // 尝试释放锁
    4. if (tryReleaseShared(arg)) {
    5. doReleaseShared();
    6. return true;
    7. }
    8. return false;
    9. }
    10. 复制代码
    • tryReleaseShared()方法主要是尝试释放许可
    • 获取当前许可数量 + 释放的数量,然后通过cas设置回去
    1. protected final boolean tryReleaseShared(int releases) {
    2. for (;;) {
    3. // 获取当前锁资源的可用许可证数量
    4. int current = getState();
    5. int next = current + releases;
    6. // 索引越界判断
    7. if (next < current)
    8. throw new Error("Maximum permit count exceeded");
    9. // 释放锁
    10. if (compareAndSetState(current, next))
    11. return true;
    12. }
    13. }
    14. 复制代码
    • 调用doReleaseShared()方法唤醒队列中的线程
    • 其中unparkSuccessor()方法是唤醒的核心操作
    1. // 唤醒
    2. private void doReleaseShared() {
    3. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
    4. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
    5. for (;;) {
    6. Node h = head;
    7. if (h != null && h != tail) {
    8. int ws = h.waitStatus;
    9. if (ws == Node.SIGNAL) {
    10. // 防止 unparkSuccessor 被多次执行
    11. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    12. continue;
    13. // 唤醒后继节点
    14. unparkSuccessor(h);
    15. }
    16. // 如果已经是 0 了,改为 -3,用来解决传播性
    17. else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    18. continue;
    19. }
    20. if (h == head)
    21. break;
    22. }
    23. }
    24. 复制代码

    最终AQS状态如下图所示:

    • 许可state变回1
    • 然后Thread-0开始竞争,如果竞争成功,如下图所示:

    • 由于Thread-0竞争成功,再次获取到许可,许可数量减1,最终又变回0
    • 然后等待队列中剩余Thread-3

    总结

    Semaphore信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本,它用来限制能同时访问共享资源的线程上限,典型的应用场景是可以用来保护有限的公共资源,比如数据库连接等。

  • 相关阅读:
    搞懂 Vue3 中的各种 ref:toRef,toRefs,isRef,unref...
    相等全等运算符
    MyBatis-Plus
    Python利用pandas、request写自动化读取测试用例脚本
    谁说 Mysql 单表最大 2000 W ?我硬要塞它 1 个亿
    Openlayers 使用天地图Web服务
    Git版本管理
    Axios 中的文件上传(Upload File)方法
    14.PyQt5应用程序主窗口QmainWindow详解
    现在啥软件都有开源,BI 呢?
  • 原文地址:https://blog.csdn.net/BASK2311/article/details/128109729