• 【juc学习之路第9天】屏障衍生工具


    Semaphore

    在大部分的应用环境下,很多资源实际上都属于有限提供的,例如:服务器提供了4核8线程的CPU资源,这样所有的应用不管如何抢占,最终可以抢占的也只有这固定的8个线程来进行应用的处理,实际上这就属于一种有限的资源。在面对大规模并发访问的应用环境中,为了合理的安排有限资源的调度,在JUC中提供了 Semaphore(信号量)处理类。

    范例:使用信号量控制叫号系统

    package juc.semaphore;
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author birdy
     * @date 2022/6/25 2:53 PM
     */
    public class Main {
        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(2);
            for (int i = 0; i < 10; i ++) {
                new Thread(() -> {
                    try {
                        semaphore.acquire();
                        System.out.printf("【%s】开始办理业务, 当前等待人数:%d\n", 
                                Thread.currentThread().getName(),
                                semaphore.getQueueLength());
                        int randTime = ThreadLocalRandom.current().nextInt(1000);
                        TimeUnit.MILLISECONDS.sleep(randTime);
                        System.err.printf("【%s】办理业务完成\n", 
                                Thread.currentThread().getName());
                        semaphore.release();
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }, "用户-" + i).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    CountDownLanch

    是一种基于倒计数同步的线程管理机制,例如:主线程里面创建了三个子线程,主线程必须在这三个子线程全部执行完成之后再继续向下执行,此时就可以基于CountDownLatch设置等待的线程数量,每当一个子线程执行完就将计数-1。

    范例:等待线程执行完毕的计数操作

    package juc.cdl;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author birdy
     * @date 2022/6/25 3:09 PM
     */
    public class Main {
        public static final int TOTAL_VISITOR = 10;
        public static void main(String[] args) {
            System.out.println("观光旅行团正式出发~");
            CountDownLatch countDownLatch = new CountDownLatch(TOTAL_VISITOR);
            for (int i = 0; i < TOTAL_VISITOR; i ++) {
                new Thread(() -> {
                    int visitTime = ThreadLocalRandom.current().nextInt(500);
                    try {
                        TimeUnit.MILLISECONDS.sleep(visitTime);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }finally {
                        countDownLatch.countDown();
                        System.out.printf("【%s】我回来啦~ 还剩下%d位好兄弟\n",
                                Thread.currentThread().getName(), countDownLatch.getCount());
                    }
                }, "VISITOR - " + i).start();
            }
            try {
                boolean isComplete = countDownLatch.await(400, TimeUnit.MILLISECONDS);
                if (isComplete) {
                    System.out.print("人齐啦!");
                } else {
                    System.err.printf("还剩%d位,过时不候!\n", countDownLatch.getCount());
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("大巴车出发!");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    CyclicBarrier

    Barrier表示的是一个栅栏,而Cyclic表示的是一个循环概念。当等待线程达到param个之后,可以执行相应的Rannable进程。但等待线程超时之后会打破屏障,因此所有处于等待状态的进程都会触发BrokenBarrierException异常。用于等待多少个线程触发操作的场景。

    范例:匹配玩家

    package juc.CyclicBarrier;
    
    import java.util.concurrent.*;
    
    /**
     * @author birdy
     * @date 2022/6/25 3:56 PM
     */
    public class Main {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> {
                System.err.println("匹配成功,起飞!");
    
            });
            for (int i = 0; i < 11; i ++) {
                new Thread(() -> {
                    try {
                        int randTime = ThreadLocalRandom.current().nextInt(2000);
                        TimeUnit.MILLISECONDS.sleep(randTime);
                        System.out.printf("一名玩家已进入:%s \t", Thread.currentThread().getName());
                        cyclicBarrier.await(500, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException e) {
                        System.err.println("玩家匹配失败:"+ Thread.currentThread().getName());
                        cyclicBarrier.reset();
                    } catch (BrokenBarrierException e) {
                        System.err.println(Thread.currentThread().getName() + "等不鸟了!");
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }, "user - " + i).start();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    一名玩家已进入:user - 9 一名玩家已进入:user - 6 一名玩家已进入:user - 4 一名玩家已进入:user - 7 匹配成功,起飞!
    一名玩家已进入:user - 2 一名玩家已进入:user - 5 一名玩家已进入:user - 10 user - 10等不鸟了!
    玩家匹配失败:user - 2
    user - 5等不鸟了!
    一名玩家已进入:user - 3 一名玩家已进入:user - 0 一名玩家已进入:user - 8 一名玩家已进入:user - 1 匹配成功,起飞!

    Exchanger

    交换器。一个线程在调用exchanger.exchange()之后,会到达exchange交换点进入等待状态,直到另一个线程的exchanger也到达交换点之后进行交换。

    范例:生产者和消费者的状态交换

    package juc.exchanger;
    
    import java.util.concurrent.Exchanger;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author birdy
     * @date 2022/6/26 4:50 PM
     */
    public class Main {
        public static final int TOTAL = 10;
        public static void main(String[] args) {
            Exchanger<String> exchanger = new Exchanger<>();
            new Thread(() -> {
                for (int i = 0; i < TOTAL; i ++) {
                    try {
                        String msg = "hello - " + i;
                        int randInt = ThreadLocalRandom.current().nextInt(500);
                        TimeUnit.MILLISECONDS.sleep(randInt);
                        exchanger.exchange(msg);
                        System.out.println("【p】" + msg);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            new Thread(() -> {
                for (int i = 0; i < TOTAL - 1; i ++) {
                    try {
                        String exchange = exchanger.exchange(null);
                        System.err.println("【c】" + exchange);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    CompletableFuture

    在JDK1.5之后提供了一个多线程的新的处理接口:Callable,该接口需要与Future接口整合在一起,而后再进行最终的异步操作调用,提升了多线程的处理性能。
    JDK 1.5提供的Future可以实现异步计算操作,虽然Future的相关方法提供了异步任务的执行能力,但是对于线程执行结果的获取只能够采用阻塞或轮询的方式进行处理,阻塞的方式与多线程异步处理的初衷产生了分歧,轮询的方式又会造成CPU资源的浪费,同时也无法及时的得到结果。为了解决这些设计问题从JDK1.8开始提供了Future的扩展实现类CompletableFuture,可以帮助开发者简化异步编程的复杂性,同时又可以结合函数式编程模式利用回调的方式进行异步处理计算操作。

    范例:模拟打车系统

    package juc.completable_future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author birdy
     * @date 2022/6/27 9:14 PM
     */
    public class Main {
        public static final int DRIVER_NUMBER = 6;
        public static void main(String[] args) {
            CompletableFuture<String> future = new CompletableFuture<>();
            for(int i = 0; i < DRIVER_NUMBER; i ++) {
                new Thread(() -> {
                    try {
                        System.out.printf("【%s】开始接单啦!\n", Thread.currentThread().getName());
                        String passenger = future.get();
                        System.out.printf("【%s】接到新单:%s\n", Thread.currentThread().getName(), passenger);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }, "滴滴车主 - " + i).start();
            }
            new Thread(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                    future.complete(Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "去往腾讯大厦的乘客").start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    【滴滴车主 - 3】开始接单啦!
    【滴滴车主 - 2】开始接单啦!
    【滴滴车主 - 0】开始接单啦!
    【滴滴车主 - 0】接到新单:去往腾讯大厦的乘客
    【滴滴车主 - 5】接到新单:去往腾讯大厦的乘客
    【滴滴车主 - 2】接到新单:去往腾讯大厦的乘客
    【滴滴车主 - 1】接到新单:去往腾讯大厦的乘客

    以上的处理是基于了同步的方式实现的,除了可以通过complete()方法解除所有线程的阻塞状态之外CompletableFuture中也可以通过runAsync()方法定义一个异步任务的处理线程(通过Runnable接口实现),并且在该线程执行完成后才会解除所有子线程的阻塞状态。

    在这里插入图片描述

    范例:使用runAsync()异步调用

    package juc.completable_future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author birdy
     * @date 2022/6/27 9:24 PM
     */
    public class Main {
        public static final int DRIVER_NUMBER = 6;
        public static void main(String[] args) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(5000);
                    System.err.println("附近暂无乘客,请稍后再试!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            for(int i = 0; i < DRIVER_NUMBER; i ++) {
                new Thread(() -> {
                    try {
                        System.out.printf("【%s】开始接单啦!\n", Thread.currentThread().getName());
                        future.get();  // 使用异步调用没有返回值
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }, "滴滴车主 - " + i).start();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    【滴滴车主 - 2】开始接单啦!
    【滴滴车主 - 3】开始接单啦!
    【滴滴车主 - 4】开始接单啦!
    【滴滴车主 - 5】开始接单啦!
    附近暂无乘客,请稍后再试!

  • 相关阅读:
    图文详解JVM中的垃圾回收机制(GC)
    【值符】嵌入式必备知识点&面试点
    大文件分片上传、断点续传、秒传
    如何在centos上安装nvidia驱动
    麒麟软件操作系统下载
    如何批量删除 Word 文档的只读密码?
    ELK单机版部署踩坑及与Springboot整合
    自己动手实现一个深度学习算法——七、卷积神经网络
    基于libjpeg-turbo库的jpeg4py安装与使用记录
    基于SSM在线医疗服务系统+jsp【附开题|万字文档(LW)和搭建文档】
  • 原文地址:https://blog.csdn.net/AlexanderRon/article/details/125492315