大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行。那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore。
Semaphore(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量Semaphore的许可数量如果小于0个,就会阻塞获取,直到有线程释放许可Semaphore是一个非重入锁public Semaphore(int permits):permits 表示许可线程的数量public Semaphore(int permits, boolean fair):fair 表示公平性,如果设为 true,表示是公平,那么等待最久的线程先执行public void acquire():表示一个线程获取1个许可,那么线程许可数量相应减少一个public void release():表示释放1个许可,那么线程许可数量相应会增加void acquire(int permits):表示一个线程获取n个许可,这个数量由参数permits决定void release(int permits):表示一个线程释放n个许可,这个数量由参数permits决定int availablePermits():返回当前信号量线程许可数量int getQueueLength(): 返回等待获取许可的线程数的预估值- public static void main(String[] args) {
- // 1. 创建 semaphore 对象
- Semaphore semaphore = new Semaphore(2);
- // 2. 10个线程同时运行
- for (int i = 0; i < 8; i++) {
- new Thread(() -> {
- // 3. 获取许可
- try {
- semaphore.acquire();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- log.debug("running...");
- sleep(1);
- log.debug("end...");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- // 4. 释放许可
- semaphore.release();
- }
- }).start();
- }
- }
- 复制代码
运行结果:


上面是Semaphore的类结构图,其中FairSync和NonfairSync是它的内部类,他们共同继承了AQS类,AQS的共享模式提供了Semaphore的加锁、解锁。
为了更好的搞懂原理,我们通过一个例子来帮助我们理解。
假设Semaphore 的 permits为 3,这时 5 个线程来获取资源,其中Thread-1,Thread-2,Thread-4CAS 竞争成功,permits 变为 0,而 Thread-0 和 Thread-3 竞争失败。

acquire()主方法会调用 sync.acquireSharedInterruptibly(1)方法acquireSharedInterruptibly()方法会先调用tryAcquireShared()方法返回许可的数量,如果小于0个,调用doAcquireSharedInterruptibly()方法进入阻塞- // acquire() -> sync.acquireSharedInterruptibly(1),可中断
- public final void acquireSharedInterruptibly(int arg) {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 尝试获取通行证,获取成功返回 >= 0的值
- if (tryAcquireShared(arg) < 0)
- // 获取许可证失败,进入阻塞
- doAcquireSharedInterruptibly(arg);
- }
- 复制代码
tryAcquireShared()方法在终会调用到Sync#nonfairTryAcquireShared()方法nonfairTryAcquireShared()方法中会减去获取的许可数量,返回剩余的许可数量- // tryAcquireShared() -> nonfairTryAcquireShared()
- // 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- // 获取 state ,state 这里【表示通行证】
- int available = getState();
- // 计算当前线程获取通行证完成之后,通行证还剩余数量
- int remaining = available - acquires;
- // 如果许可已经用完, 返回负数, 表示获取失败,
- if (remaining < 0 ||
- // 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- 复制代码
doAcquireSharedInterruptibly()方法将当前线程加入到阻塞队列中阻塞parkAndCheckInterrupt()阻塞当前线程- private void doAcquireSharedInterruptibly(int arg) {
- // 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
- final Node node = addWaiter(Node.SHARED);
- // 获取标记
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- // 前驱节点是头节点可以再次获取许可
- if (p == head) {
- // 再次尝试获取许可,【返回剩余的许可证数量】
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- // 成功后本线程出队(AQS), 所在 Node设置为 head
- // r 表示【可用资源数】, 为 0 则不会继续传播
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
- if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- // 被打断后进入该逻辑
- if (failed)
- cancelAcquire(node);
- }
- }
- 复制代码
最终的AQS状态如下图所示:
Thread-1、Thread-2、Thread-4正常运行
AQS的state也就是等于0
Thread-0、Thread-3再阻塞队列中

现在Thread-4运行完毕,要释放许可,Thread-0、Thread-3又是如何恢复执行的呢?
release()方法释放许可,最终调用 Sync#releaseShared()方法tryReleaseShared(arg)尝试释放许可成功,那么调用doReleaseShared();进行唤醒- // release() -> releaseShared()
- public final boolean releaseShared(int arg) {
- // 尝试释放锁
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
- 复制代码
tryReleaseShared()方法主要是尝试释放许可- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- // 获取当前锁资源的可用许可证数量
- int current = getState();
- int next = current + releases;
- // 索引越界判断
- if (next < current)
- throw new Error("Maximum permit count exceeded");
- // 释放锁
- if (compareAndSetState(current, next))
- return true;
- }
- }
- 复制代码
doReleaseShared()方法唤醒队列中的线程unparkSuccessor()方法是唤醒的核心操作- // 唤醒
- private void doReleaseShared() {
- // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
- // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- // 防止 unparkSuccessor 被多次执行
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue;
- // 唤醒后继节点
- unparkSuccessor(h);
- }
- // 如果已经是 0 了,改为 -3,用来解决传播性
- else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue;
- }
- if (h == head)
- break;
- }
- }
- 复制代码
最终AQS状态如下图所示:

Thread-0开始竞争,如果竞争成功,如下图所示:
Thread-3Semaphore信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本,它用来限制能同时访问共享资源的线程上限,典型的应用场景是可以用来保护有限的公共资源,比如数据库连接等。