• Java多线程同步工具类:Semaphore原理剖析


    Java多线程同步工具类:Semaphore原理剖析

    前驱知识准备:AbstractQueuedSynchronizer队列同步器

    [Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析]

    Semaphore原理

    Semaphore也就是信号量,提供了资源数量的并发访问控制,可以用于限制访问某些资源(物理或逻辑的)的线程数目。Semaphore是AQS队列同步器中对共享锁的子类实现。Semaphore维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。

    Semaphore核心代码功能使用如下所示:

    // ⼀开始有5份共享资源。第⼆个参数表示是否是公平
    Semaphore myResources = new Semaphore(5, true);
    
    
    // 工作线程每获取⼀份资源,就在该对象上记下来
    // 在获取的时候是按照公平的方式还是非公平的方式,就要看上⼀行代码的第二个参数了。
    // ⼀般⾮公平抢占效率较高。
    myResources.acquire();
    
    // 工作线程每归还⼀份资源,就在该对象上记下来
    // 此时资源可以被其他线程使⽤
    myResources.release();
    
    /*
    释放指定数目的许可,并将它们归还给信标。
    可用许可数加上该指定数目。
    如果线程需要获取N个许可,在有N个许可可用之前,该线程阻塞。
    如果线程获取了N个许可,还有可用的许可,则依次将这些许可赋予等待获取许可的其他线程。
    */
    semaphore.release(2);
    
    /*
    从信标获取指定数⽬的许可。如果可用许可数目不够,则线程阻塞,直到被中断。
    
    该⽅法效果与循环相同,
    for (int i = 0; i < permits; i++) acquire();
    只不过该方法是原⼦操作。
    
    如果可用许可数不够,则当前线程阻塞,直到:(⼆选⼀)
    1. 如果其他线程释放了许可,并且可用的许可数满足当前线程的请求数字;
    2. 其他线程中断了当前线程。
    
    permits – 要获取的许可数
    */
    semaphore.acquire(3);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Semaphore在争抢资源时的示意图如下图所示,假设有n个线程来获取Semaphore⾥⾯的10份资源(n > 10), n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L7ePxdyV-1670158909096)(E:\笔记\截图\SemaPhore争抢资源示意图.png)]

    当初始的资源个数为1的时候, Semaphore退化为排他锁。正因为如此, Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分

    public void acquire() throws InterruptedException {
    	sync.acquireSharedInterruptibly(1);
    }
    public void release() {
    	sync.releaseShared(1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    SemaPhore也是使用了队列同步器AbstractQueuedSynchronizer来实现多线程间的同步操作,abstract static class Sync extends AbstractQueuedSynchronizer

    acquire()方法通过判断剩余资源数和线程所需资源数的差值是否小于0,如果小于0,则当前线程进行阻塞,如果大于0,对state变量(state变量在AQS中表示同步状态)进行CAS减操作,减到0之后,线程阻塞。在release里对state变量进行CAS加操作。

    public class Semaphore {
    	protected final boolean tryReleaseShared(int releases) {
    	for (;;) {
    		int current = getState();
    		int next = current + releases;
    		if (next < current) // overflow
    			throw new Error("Maximum permit count exceeded");
    		if (compareAndSetState(current, next))
    			return true;
    		}
    	}
    	static final class FairSync extends Sync {
    		// ...
    		FairSync(int permits) {
    		super(permits);
    	}
    	protected int tryAcquireShared(int acquires) {
    		for (;;) {
    			if (hasQueuedPredecessors())
    				return -1;
    			int available = getState();
                // 判断剩余资源数,如果<0,说明资源数不够了,获取资源失败
    			int remaining = available - acquires;
    			if (remaining < 0 || compareAndSetState(available, remaining))
    				return remaining;
    			}
    		}
    	}
    }
    
    /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // addWaiter将当前线程封装成共享节点,放在等待队列尾部
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    // 当前线程前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 尝试获取args数量的资源数,如果资源数不够,则返回<0
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 向后传播唤醒当前节点后的节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    关于该方法的更多详细原理剖析,可以看AQS介绍的相关博客:[Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析](https://blog.csdn.net/Urbanears/article/details/128177063?spm=1001.2014.3001.5502)

    实战案例

    下面给出一个案例来测试Semaphore的使用:

    case: 自习室抢座,写作业:

    抢座位的线程:

    import java.util.Random;
    import java.util.concurrent.Semaphore;
    
    public class MyThread extends Thread {
        private final Semaphore semaphore;
        private final Random random = new Random();
        public MyThread(String name, Semaphore semaphore) {
            super(name);
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " - 抢座成功,开始写作业");
                Thread.sleep(random.nextInt(1000));
                System.out.println(Thread.currentThread().getName() + " - 作业完成,腾出座位");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    主方法:

    import java.util.concurrent.Semaphore;
    public class Demo {
        public static void main(String[] args) throws InterruptedException {
            Semaphore semaphore = new Semaphore(2);
            for (int i = 0; i < 5; i++) {
                new MyThread("学⽣-" + (i + 1), semaphore).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    上面主方法中,调用new Semaphore(2)定义了2份共享资源,或者两份许可证,也就是同一时刻最多只允许2个线程执行。输出结果:

    学⽣-1 - 抢座成功,开始写作业
    学⽣-2 - 抢座成功,开始写作业
    学⽣-2 - 作业完成,腾出座位
    学⽣-3 - 抢座成功,开始写作业
    学⽣-1 - 作业完成,腾出座位
    学⽣-4 - 抢座成功,开始写作业
    学⽣-3 - 作业完成,腾出座位
    学⽣-5 - 抢座成功,开始写作业
    学⽣-5 - 作业完成,腾出座位
    学⽣-4 - 作业完成,腾出座位
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    出结果:

    学⽣-1 - 抢座成功,开始写作业
    学⽣-2 - 抢座成功,开始写作业
    学⽣-2 - 作业完成,腾出座位
    学⽣-3 - 抢座成功,开始写作业
    学⽣-1 - 作业完成,腾出座位
    学⽣-4 - 抢座成功,开始写作业
    学⽣-3 - 作业完成,腾出座位
    学⽣-5 - 抢座成功,开始写作业
    学⽣-5 - 作业完成,腾出座位
    学⽣-4 - 作业完成,腾出座位
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    目前最先进的神经网络算法,神经网络算法发展
    工业4.0 资产管理壳学习笔记(1)
    Redis单线程为什么这么快
    MySQL字符集设置、密码管理
    C++版本的OpenCV 5.x编译生成opencv-python==5.x(GPU版本)接口并进行调用
    OpenHamony开发笔记一:在HarmonyOS虚拟机上运行openharmony工程
    SQL Server教程 - SQL Server 压缩(Compression)
    go语言操作数据库
    SpringMVC
    FPGA SERDESE2 (SDR收发仿真)
  • 原文地址:https://blog.csdn.net/Urbanears/article/details/128177406