Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的
Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量

PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
①S减1
②若S减1后仍大于或等于0,则进程继续执行
③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度
V操作的主要动作是:
①S加1
②若相加后结果大于0,则进程继续执行
③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度

用于做流量控制,特别是公用资源有限的应用场景
@Slf4j
public class SemaphoreTest2 {
/**
* 实现一个同时只能处理5个请求的限流器
*/
private static final Semaphore semaphore = new Semaphore(5);
/**
* 定义一个线程池
*/
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor
(10, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
/**
* 模拟执行方法
*/
public static void exec() {
try {
//占用1个资源
semaphore.acquire(1);
//TODO 模拟业务执行
log.info("执行exec方法");
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放一个资源
semaphore.release(1);
}
}
public static void main(String[] args) throws InterruptedException {
for (; ; ) {
Thread.sleep(100);
// 模拟请求以10个/s的速度
executor.execute(SemaphoreTest2::exec);
}
}
}
运行结果
同一时间,只会有五个线程在执行
12:22:38.087 [pool-1-thread-1] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:38.197 [pool-1-thread-2] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:38.301 [pool-1-thread-3] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:38.411 [pool-1-thread-4] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:38.523 [pool-1-thread-5] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:40.092 [pool-1-thread-6] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:40.200 [pool-1-thread-7] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:40.307 [pool-1-thread-8] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:40.415 [pool-1-thread-4] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:40.538 [pool-1-thread-9] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:42.098 [pool-1-thread-10] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:42.206 [pool-1-thread-1] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:42.314 [pool-1-thread-2] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:42.418 [pool-1-thread-4] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
12:22:42.540 [pool-1-thread-3] INFO com.example.demo.seven_two_six.concurrent.lock.SemaphoreTest2 - 执行exec方法
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集
CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier


CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成
两种使用场景:
@Slf4j
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
log.info("准备完成");
countDownLatch.await();
log.info("开始赛跑");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);// 裁判准备发令
countDownLatch.countDown();// 发令枪:执行发令
}
}
运行结果
12:35:01.108 [Thread-4] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 准备完成
12:35:01.107 [Thread-1] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 准备完成
12:35:01.107 [Thread-0] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 准备完成
12:35:01.108 [Thread-2] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 准备完成
12:35:01.108 [Thread-3] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 准备完成
12:35:03.112 [Thread-1] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 开始赛跑
12:35:03.112 [Thread-3] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 开始赛跑
12:35:03.112 [Thread-0] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 开始赛跑
12:35:03.112 [Thread-2] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 开始赛跑
12:35:03.112 [Thread-4] INFO com.example.demo.seven_two_six.concurrent.lock.CountDownLatchTest - 开始赛跑
public class CountDownLatchTest2 {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 +
ThreadLocalRandom.current().nextInt(1000));
System.out.println(Thread.currentThread().getName()
+ " finish task" + index);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
countDownLatch.await();
System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
}
}
运行结果
Thread-1 finish task1
Thread-4 finish task4
Thread-0 finish task0
Thread-3 finish task3
Thread-2 finish task2
主线程:在所有任务运行完成后,进行结果汇总
底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的
而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
字面意思回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用

parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞
用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
指定数量的线程全部调用await()方法时,这些线程不再阻塞
通过reset()方法可以进行重置计数器
示例
public class CyclicBarrierTest2 {
//保存每个学生的平均成绩
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
private final ExecutorService threadPool = Executors.newFixedThreadPool(3);
private final CyclicBarrier cb = new CyclicBarrier(3, () -> {
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println("三人平均成绩为:" + (result / 3) + "分");
});
public void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
//获取学生平均成绩
int score = (int) (Math.random() * 40 + 60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + "同学的平均成绩为:" + score);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest2 cb = new CyclicBarrierTest2();
cb.count();
}
}
运行结果:
pool-1-thread-1同学的平均成绩为:80
pool-1-thread-3同学的平均成绩为:78
pool-1-thread-2同学的平均成绩为:82
三人平均成绩为:80分
示例
@Slf4j
public class CyclicBarrierTest3 {
public static void main(String[] args) throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1000, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy());
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~"));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(new Runner(cyclicBarrier));
}
TimeUnit.SECONDS.sleep(2);
threadPoolExecutor.shutdown();
}
static class Runner extends Thread {
private final CyclicBarrier cyclicBarrier;
public Runner(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
int sleepMills = ThreadLocalRandom.current().nextInt(1000);
Thread.sleep(sleepMills);
System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
运行结果
1 号 选手已就位, 准备共用时: 57ms0
3 号 选手已就位, 准备共用时: 273ms1
4 号 选手已就位, 准备共用时: 290ms2
2 号 选手已就位, 准备共用时: 830ms3
5 号 选手已就位, 准备共用时: 991ms4
裁判:比赛开始~~
3 号 选手已就位, 准备共用时: 267ms0
2 号 选手已就位, 准备共用时: 605ms1
4 号 选手已就位, 准备共用时: 706ms2
1 号 选手已就位, 准备共用时: 788ms3
5 号 选手已就位, 准备共用时: 906ms4
裁判:比赛开始~~