目录
CyclicBarrier 和 CountDownLatch 的区别
在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier 和 Semaphore 工具类提供了一种并发流程控制的手段,Exchanger 工具类则提供了在线程间交换数据的一种手段。
CountDownLatch 允许一个或多个线程等待其他线程完成的操作。
集齐 5 个不同的勋章可以解锁成就 - “勋章达人”
- public class CountDownLatchDemo {
-
- static CountDownLatch medalCount = new CountDownLatch(5);
-
- public static void main(String[] args) throws InterruptedException {
- getMedal();
- // 等待所有勋章集齐
- medalCount.await();
- System.out.println("解锁新成就 - 勋章达人");
- }
-
- private static void getMedal() throws InterruptedException {
- for (int i = 1; i <= 5; i++) {
- new Thread(() -> {
- System.out.println("集齐 " + Thread.currentThread().getName() + " 号勋章...");
- medalCount.countDown();
- }, String.valueOf(i)).start();
- }
-
- }
-
- }

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
模拟百米赛跑比赛现场,裁判员等待每组选手准备好后鸣枪开赛。共2组选手,每组3人。
- public class CyclicBarrierDemo {
-
- static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
-
- public static void main(String[] args) throws InterruptedException {
- raceGame();
- }
-
- private static void raceGame() throws InterruptedException {
- for (int i = 1; i <= 2; i++) {
- System.out.println("第 " + i + " 轮准备开始...");
- for (int j = 1; j <= 3; j++) {
- new Thread(() -> {
- System.out.println("第 " + Thread.currentThread().getName() + " 号选手已经准备好了...");
- try {
- // 等待其它选手准备
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }, String.valueOf(j)).start();
- }
- Thread.sleep(100);
- // 比赛结束重置
- cyclicBarrier.reset();
- System.out.println("第 " + i + " 轮结束...");
- }
-
- }
-
- }

CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset() 方法重置。所以 CyclicBarrier 能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
有 6 辆车在一个只有 2 个停车位的停车场抢车位,模拟随机进出顺序。
- public class SemaphoreDemo {
-
- static Semaphore semaphore = new Semaphore(2);
-
- public static void main(String[] args) {
- parkingGame();
- }
-
- private static void parkingGame() {
- for (int i = 0; i < 6; i++) {
- new Thread(() -> {
- try {
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName() + " 号车抢到了车位");
- Thread.sleep(200);
- System.out.println(Thread.currentThread().getName() + " 号车释放了车位");
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }, String.valueOf(i + 1)).start();
- }
- }
-
- }

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger 用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
A、B 两人分别统计公司流水后进行对账,为了避免流水错误。
- public class ExchangerDemo {
-
- static Exchanger
exchanger = new Exchanger<>(); -
- public static void main(String[] args) {
- reconciliationGame();
- }
-
- private static void reconciliationGame() {
- new Thread(() -> {
- Integer moneyA = 500 * 10000;
- try {
- Integer exchangeB = exchanger.exchange(moneyA);
- System.out.println("A 与 B 是否一致: " + exchangeB.equals(moneyA) + " A: " + moneyA + " B: " + exchangeB);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, "A").start();
-
- new Thread(() -> {
- Integer moneyB = 500 * 10000;
- try {
- Integer exchangeA = exchanger.exchange(moneyB);
- System.out.println("A 与 B 是否一致: " + moneyB.equals(exchangeA) + " A: " + exchangeA + " B: " + moneyB);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, "B").start();
- }
-
- }

若将 A 流水修改为与 B 不一致,结果如下:

