• Java 多线程的返回对象和资源独享线程


    1. 多线程的返回对象-Future

    1.1 Future 

    如果你在创建线程时,使用的是 Runnable 接口,那么此时你是无法获取线程执行结果的,如果想要获取线程的执行结果,需要实现 Callable 接口,示例如下:

    1. public class J0_Callable {
    2.     static class Task implements Callable {
    3.         @Override
    4.         public Integer call() throws InterruptedException {
    5.             Thread.sleep(3000);
    6.             return 100;
    7.         }
    8.     }
    9.     public static void main(String[] args) throws ExecutionException, InterruptedException {
    10.         ExecutorService executors = Executors.newSingleThreadExecutor();
    11.         Future submit = executors.submit(new Task());
    12.         System.out.println("计算结果为:" + submit.get());
    13.         executors.shutdown();
    14.     }
    15. }

    此时通过 ExecutorService.submit() 进行提交,得到的是一个 Future 对象,它包含了线程的执行结果,当你调用其 get() 方法时,它会阻塞直至获取到线程的返回结果。

    1.2  FutureTask

    使用 Callable 接口的限制是:其只能使用线程池提交,而不能使用单独的线程进行提交。如果想要使用单独的线程提交,可以使用 FutureTask 对其进行包装,FutureTask 是 Runnable 接口的实现类,可以用于任何场景下的提交,示例如下:

    1. static class Task implements Callable<Integer> {
    2. @Override
    3. public Integer call() throws InterruptedException {
    4.         Thread.sleep(3000);
    5. return 100;
    6. }
    7. }
    8. public static void main(String[] args) throws ExecutionException, InterruptedException {
    9.     FutureTask<Integer> futureTask01 = new FutureTask<>(new Task());
    10.     FutureTask<Integer> futureTask02 = new FutureTask<>(new Task());
    11. // 使用独立的线程执行
    12. new Thread(futureTask01).start();
    13.     ExecutorService executorService = Executors.newSingleThreadExecutor();
    14. // 使用线程池提交
    15.     executorService.submit(futureTask02);
    16.     System.out.println("futureTask01 计算结果为:" + futureTask01.get());
    17.     System.out.println("futureTask02 计算结果为:" + futureTask01.get());
    18.     executorService.shutdown();
    19. }

    1.3  CompletableFuture

    CompletableFuture 是 JDK 8 提供的增强后 Future ,它支持流式调用,等待唤醒等一系列新的功能:

    1.3.1 等待唤醒

    1. public class J2_CompletableFuture {
    2. static class Compute implements Runnable {
    3. private CompletableFuture<Integer> future;
    4. Compute(CompletableFuture<Integer> future) {
    5. this.future = future;
    6. }
    7. @Override
    8. public void run() {
    9. try {
    10.                 System.out.println("子线程等待主线程运算完成····");
    11.                 Integer integer = future.get();
    12.                 System.out.println("子线程完成后续运算:" + integer * integer);
    13. } catch (InterruptedException | ExecutionException e) {
    14.                 e.printStackTrace();
    15. }
    16. }
    17. }
    18. public static void main(String[] args) throws InterruptedException {
    19.         int intermediateResult;
    20.         CompletableFuture<Integer> future = new CompletableFuture<>();
    21. // 启动子线程
    22. new Thread(new Compute(future)).start();
    23.         System.out.println("启动主线程");
    24.         Thread.sleep(2000);
    25.         System.out.println("主线程计算完成");
    26. // 假设主线程计算结果为 100
    27.         intermediateResult = 100;
    28. // 传递主线程的计算结果给子线程
    29.         future.complete(intermediateResult);
    30. }
    31. }
    32. // 输出
    33. 启动主线程
    34.     子线程等待主线程运算完成····
    35.     主线程计算完成
    36.     子线程完成后续运算:10000

    1.3.2 supplyAsync

    CompletableFuture 的 supplyAsync 可以将一个正常的方法以异步的方式来执行:

    1. public class J3_SupplyAsync {
    2. private static Integer compute() {
    3. try {
    4.             Thread.sleep(2000);
    5. } catch (InterruptedException e) {
    6.             e.printStackTrace();
    7. }
    8.         System.out.println("子线程计算完成");
    9. return 100;
    10. }
    11. public static void main(String[] args) throws ExecutionException, InterruptedException {
    12.         CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(J3_SupplyAsync::compute);
    13.         System.out.println("主线程等待子线程计算完成");
    14.         Integer integer = supplyAsync.get();
    15.         System.out.println("主线程计算完成:" + integer * integer);
    16. }
    17. }

    1.3.3 流式调用

    CompletableFuture 支持大部分流式处理的特性,示例如下:

    1. public class J4_StreamingCall {
    2. private static Integer compute() {
    3.         System.out.println("compute所在线程:" + Thread.currentThread().getId());
    4. try {
    5.             Thread.sleep(1000);
    6. } catch (InterruptedException e) {
    7.             e.printStackTrace();
    8. }
    9. return 100;
    10. }
    11. private static Integer multi(Integer integer) {
    12. try {
    13.             System.out.println("multi所在线程:" + Thread.currentThread().getId());
    14.             Thread.sleep(1000);
    15. } catch (InterruptedException e) {
    16.             e.printStackTrace();
    17. }
    18. return integer * integer;
    19. }
    20. private static void accept(Integer integer) {
    21.         System.out.println("accept所在线程:" + Thread.currentThread().getId());
    22.         System.out.println("accept方法消费掉计算结果:" + integer);
    23. }
    24. public static void main(String[] args) throws ExecutionException, InterruptedException {
    25.         CompletableFuture<Void> future = CompletableFuture.supplyAsync(J4_StreamingCall::compute)
    26. .thenApply(J4_StreamingCall::multi)
    27. .thenAccept(J4_StreamingCall::accept) //值在这一步被消费掉了
    28. .thenAccept(x -> System.out.println("运算结果:" + x));
    29.         future.get(); //类似于流式计算的惰性求值,如果缺少这一步,不会有任何输出
    30. }
    31. }

    1.3.4 组合多个 CompletableFuture

    除了使用单个的 CompletableFuture,还可以通过 thenCompose 或 thenCombineAsync 来组合多个 CompletableFuture:

    1. public class J6_Combination {
    2. private static Integer compute() {
    3.         System.out.println("compute 所在线程:" + Thread.currentThread().getId());
    4. return 100;
    5. }
    6. private static Integer multi(Integer integer) {
    7.         System.out.println("epr 所在线程:" + Thread.currentThread().getId());
    8. return integer * integer;
    9. }
    10. public static void main(String[] args) throws ExecutionException, InterruptedException {
    11. // 组合实现方式1 thenCompose 一个计算的输入依赖另外一个计算的结果
    12.         CompletableFuture<Void> future01 = CompletableFuture.supplyAsync(J6_Combination::compute)
    13. .thenCompose(x -> CompletableFuture.supplyAsync(() -> multi(x)))
    14. .thenAccept(x -> System.out.println("运算结果:" + x)); // 运算结果:10000
    15.         future01.get();
    16.         System.out.println();
    17. // 组合实现方式2 thenCombineAsync 两个计算之间不依赖
    18.         CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(J6_Combination::compute);
    19.         CompletableFuture<Integer> future03 = CompletableFuture.supplyAsync(() -> J6_Combination.multi(100));
    20.         CompletableFuture<Integer> futureAll = future02.thenCombineAsync(future03, (x, y) -> x + y);
    21.         System.out.println("运算结果:" + futureAll.get()); // 运算结果:10100
    22. }
    23. }

    2.资源独享线程-ThreadLocal

    ThreadLocal 是以增加资源的方式来避免竞态,它会为每一个线程创建一份私有的资源,从而避免对公共资源的竞争。实例如下:

    1. /**
    2.  * 线程不安全的SimpleDateFormat
    3.  */
    4. public class J1_ThreadUnsafe {
    5. private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    6. private static int sum = 1000;
    7. private static CountDownLatch countDownLatch = new CountDownLatch(sum);
    8. private static AtomicInteger atomicInteger = new AtomicInteger(0);
    9. static class Task implements Runnable {
    10. @Override
    11. public void run() {
    12. try {
    13.                 Date parse = sdf.parse("2018-08-08 08:08:08");
    14.                 System.out.println(parse);
    15.                 atomicInteger.incrementAndGet();
    16. } catch (ParseException e) {
    17.                 e.printStackTrace();
    18. finally {
    19.                 countDownLatch.countDown();
    20. }
    21. }
    22. }
    23. public static void main(String[] args) throws InterruptedException {
    24.         ExecutorService executorService = Executors.newFixedThreadPool(10);
    25. for (int i = 0; i < sum; i++) {
    26.             executorService.execute(new Task());
    27. }
    28.         countDownLatch.await();
    29.         System.out.println("格式化成功次数为:" + atomicInteger.get());
    30. }
    31. }

    因为 SimpleDateFormat 是线程不安全的,因此其格式化成功的次数总是小于 100 次,此时可以使用 ThreadLocal 进行改写,让每个线程都持有自己独立的格式化器,具体如下:

    1. public class J2_ThreadSafe {
    2. private static ThreadLocal threadLocal = new ThreadLocal<>();
    3. private static int sum = 1000;
    4. private static CountDownLatch countDownLatch = new CountDownLatch(sum);
    5. private static AtomicInteger atomicInteger = new AtomicInteger(0);
    6. static class Task implements Runnable {
    7. @Override
    8. public void run() {
    9. try {
    10. // 如果当前线程中不存在该值,则创建一个
    11. if (threadLocal.get() == null) {
    12.                     threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    13. }
    14. // 使用线程私有的SimpleDateFormat
    15.                 Date parse = threadLocal.get().parse("2018-08-08 08:08:08");
    16.                 System.out.println(parse);
    17.                 atomicInteger.incrementAndGet();
    18. } catch (ParseException e) {
    19.                 e.printStackTrace();
    20. finally {
    21.                 countDownLatch.countDown();
    22. }
    23. }
    24. }
    25. public static void main(String[] args) throws InterruptedException {
    26.         ExecutorService executorService = Executors.newFixedThreadPool(10);
    27. for (int i = 0; i < sum; i++) {
    28.             executorService.execute(new Task());
    29. }
    30.         countDownLatch.await();
    31.         System.out.println("格式化成功次数为:" + atomicInteger.get());
    32.         executorService.shutdown();
    33. }
    34. }
  • 相关阅读:
    6、ES单机设置用户名密码、集群设置用户名密码、es-head登录、如何去掉密码
    “如何实现高效的应用交付”鲁班会开发者训练营厦门站进行时
    84.(cesium之家)cesium模型在地形上运动
    过五关,斩六将!「网易/美团/菜鸟」已拿offer【Java岗】
    如何为开源项目和社区做贡献 -- 你应该知道的十件事
    Java 的基本类型与引用类型的深拷贝和浅拷贝
    uniapp项目实践总结(十三)封装文件操作方法
    这可能是目前最全的Redis高可用技术解决方案
    mysql之搭建MHA架构实现高可用
    Vue3中全局配置 axios 的两种方式
  • 原文地址:https://blog.csdn.net/shangjg03/article/details/134229437