• CompletableFuture


    1.Future

    1.1 Future接口理论知识复习

    Future接口(FutueTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
    比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,
    主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。

    Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

    Future接口能干什么?

    Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。
    如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

    代码说话:
    Runnable接口Callable接口
    Future接口和FutureTask实现类
    目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务

    1.2 FutureTask架构

    在这里插入图片描述绿色虚线:表示实现的关系,实现一个接口
    绿色实线:表示接口之间的继承
    蓝色实线:表示类之间的继承

    1.3 Future编码实战
    public class FutureTaskTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            long start = System.currentTimeMillis();
            FutureTask<String> task2 = new FutureTask<>(() -> {
                TimeUnit.SECONDS.sleep(2);
                return "2";
            });
            FutureTask<String> task1 = new FutureTask<>(() -> {
                TimeUnit.SECONDS.sleep(1);
                return "1";
            });
            FutureTask<String> task3 = new FutureTask<>(() -> {
                TimeUnit.SECONDS.sleep(3);
                return "3";
            });
            executorService.submit(task1);
            executorService.submit(task2);
            executorService.submit(task3);
            System.out.println(task1.get());
            System.out.println(task2.get(3,TimeUnit.SECONDS));
            while (true){
                if(task3.isDone()){
                    System.out.println(task3.get());
                    break;
                }else {
                    TimeUnit.MILLISECONDS.sleep(200);
                }
            }
            System.out.println("执行耗时:"+(System.currentTimeMillis()-start));
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    1
    2
    3
    执行耗时:3066
    
    • 1
    • 2
    • 3
    • 4

    优缺点分析
    优点: future+线程池异步多线程任务配合,能显著提高程序的执行效率。

    缺点
    一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞
    isDone()轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果.
    如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞

    Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

    将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
    将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果。
    对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理

    对于简单的业务场景使用Future完全OK,但想完成上述一些复杂的任务,使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。Future能干的,CompletableFuture都能干。

    2. CompletableFuture

    2.1 CompletableFuture对Future的改进

    CompletableFuture异步线程发生异常,不会影响主线程,用来记录日志特别方便。

    CompletableFuture为什么出现
    get()方法在Future 计算完成之前会一直处在阻塞状态下,isDone()方法容易耗费CPU资源,
    对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。

    阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。
    CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

    CompletableFuture和CompletionStage
    在这里插入图片描述CompletionStage
    CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
    一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如: stage.thenApply(x -> square(x)).thenAccept(×->System.out.print(x)).thenRun(( ->systeh.out.println())
    一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
    代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

    CompletableFuture
    在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture 的方法。
    它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
    它实现了Future和CompletionStage接口

    核心的四个静态方法,来创建一个异步任务
    在这里插入图片描述从Java8开始引入了CompletableFuture,它是Future的功能增强版。减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

    CompletableFuture的优点
    异步任务结束时,会自动回调某个对象的方法;
    主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
    异步任务出错时,会自动回调某个对象的方法;

    2.2 案例精讲-从电商网站的比价需求说开去

    函数式编程已经主流
    先说说join和get对比
    说说你过去工作中的项目亮点?大厂业务需求说明
    一波流Java8函数式编程带走-比价案例实战

    Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程
    在这里插入图片描述案例精讲-从电商网站的比价需求讲起

    需求说明
    同一款产品,同时搜索出同款产品在各大电商平台的售价;

    输出返回:
    出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List《mysql》in jd price is 88.05
    《mysql》in dangdang price is 86.11
    《mysql》in taobao price is 90.43

    解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表,
    1 )step by step,按部就班,查完京东查淘宝,查完淘宝查天猫
    2 )all in,万箭齐发,一口气多线程异步任务同时查询

    public class CompletableFutureDemo {
        static List<NetMall> list = Arrays.asList(
                new NetMall("vip"),
                new NetMall("jd"),
                new NetMall("tb"),
                new NetMall("pdd")
        );
        public static void main(String[] args) {
            long cur1 = System.currentTimeMillis();
            getPrice("Phone").forEach(r-> System.out.println(r));
            System.out.println("getPrice耗时"+(System.currentTimeMillis()-cur1));
            long cur2 = System.currentTimeMillis();
            getPriceByCompletableFuture("Phone").forEach(r-> System.out.println(r));
            System.out.println("getPriceByCompletableFuture耗时"+(System.currentTimeMillis()-cur2));
        }
    
        private static List<String> getPrice(String productName){
            return list.stream()
                    .map(r->String.format(productName+" in %s price is %.2f",r.getName(),r.calcPrice(productName)))
                    .collect(Collectors.toList());
        }
    
        private static List<String> getPriceByCompletableFuture(String productName){
            return list.stream()
                    .map(r-> CompletableFuture.supplyAsync(()->String.format(productName+" in %s price is %.2f",r.getName(),r.calcPrice(productName))))
                    .collect(Collectors.toList())
                    .stream()
                    .map(s->s.join())
                    .collect(Collectors.toList());
        }
    }
    class NetMall{
        private String name;
    
        public double calcPrice(String productName){
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return  ThreadLocalRandom.current().nextDouble(100000000)+productName.hashCode();
        }
    
        public NetMall(String name) {
            this.name = name;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    2.3 CompletableFuture常用方法
    2.3.1 获得结果和触发计算

    获得结果
    public T get() 不见不散
    public T get(long timeout,TimeUnit unit) 过时不候
    public T join():join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
    public T getNow(T valuelfAbsent):没有计算完成的情况下,给我一个替代结果。计算完,返回计算完成后的结果。立即获取结果不阻赛。没算完,返回设定的valuelfAbsent值

    主动触发计算
    public bgolean complete(T value)
    是否打断get方法立即返回括号值

    public class CompletableFutureTest {
    
      public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
          CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
              try {
                  Thread.sleep(2000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return "hello CompletableFuture";
          });
    
          System.out.println(completableFuture.getNow("心急吃不了热豆腐"));
          System.out.println(completableFuture.get());
          System.out.println(completableFuture.get(1500, TimeUnit.MILLISECONDS));
          System.out.println(completableFuture.join());
          System.out.println(completableFuture.complete("未雨绸缪")+"\t"+completableFuture.join());
    
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    2.3.2 对计算结果进行处理

    thenApply
    计算结果存在依赖关系,这两个线程串行化
    异常相关:由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。

    public class CompletableFutureTest2 {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 6;
            },executorService).thenApply((r)-> {
                int i=2/0;
                return r * 5;
            }).thenApply((r)-> {
                System.out.println(r);
                return r - 2;
            }).whenComplete((v, e) -> {
                System.out.println("计算结果:"+v);
            }).exceptionally(e -> {
                System.out.println(e.getMessage());
                System.out.println(e);
                return null;
            });
            System.out.println("============主线程==========");
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    发生异常后进入exceptionally代码块,但是thenApply中的代码不会执行,whenComplete依旧会执行

    ============主线程==========
    计算结果:null
    java.lang.ArithmeticException: / by zero
    java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    
    • 1
    • 2
    • 3
    • 4

    handle
    计算结果存在依赖关系,这两个线程串行化
    异常相关:有异常也可以往下一步走,根据带的异常参数可以进步处理

    public class CompletableFutureTest2 {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 6;
            },executorService).handle((r,e)-> {
                int i=2/0;
                return r * 5;
            }).handle((r,e)-> {
                System.out.println(r);
                return r - 2;
            }).whenComplete((v, e) -> {
                System.out.println("计算结果:"+v);
            }).exceptionally(e -> {
                System.out.println(e.getMessage());
                System.out.println(e);
                return null;
            });
            System.out.println("============主线程==========");
            executorService.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    发生异常后进入exceptionally代码块,但是handle和whenComplete依旧会执行

    ============主线程==========
    null
    计算结果:null
    java.lang.NullPointerException
    java.util.concurrent.CompletionException: java.lang.NullPointerException
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    2.3.3 对计算结果进行消费

    接收任务的处理结果,并消费处理,无返回结果thenAccept

    public class CompletableFutureTest3 {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(()->{
                return 3;
            }).thenApply(r->{
                return r*8;
            }).thenApply(r->{
                return r/2;
            }).thenAccept(r-> System.out.println(r));
            System.out.println(CompletableFuture.supplyAsync(()->"6666").thenRun(()->{}).join());
            System.out.println(CompletableFuture.supplyAsync(()->"6666").thenAccept(r-> System.out.println(r)).join());
            System.out.println(CompletableFuture.supplyAsync(()->"6666").thenApply(r->r+"9999").join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    12
    null
    6666
    null
    66669999
    
    • 1
    • 2
    • 3
    • 4
    • 5

    completableFuture和线程池说明

    在这里插入图片描述以thenRun和thenRunAsync为例,有什么区别?

    没有传入自定义线程池,都用默认线程池ForkJoinPool;
    传入了一个自定义线程池,
    如果你执行第一个任务的时候,传入了一个自定义线程池:
    调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
    调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

    有可能处理太快,系统优化切换原则,直接使用main线程处理
    其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理

    2.3.4 对计算速度进行选用与对计算结果进行合并

    applyToEither:谁快用谁
    thenCombine:两个completionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其它分支任务

    public class CompletableFutureTest4 {
        public static void main(String[] args) {
            CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "1号选手";
            });
            CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "2号选手";
            });
            CompletableFuture<String> result = first.applyToEither(second, r -> r + "is winner");
            CompletableFuture<String> res = first.thenCombine(second, (x, y) -> x + y);
            System.out.println(result.join());
            System.out.println(res.join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    1号选手is winner
    1号选手2号选手
    
    • 1
    • 2
    2.3.5 并行执行
    allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
    anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
    
    • 1
    • 2
    	public static void testAllOf(){
            CompletableFuture<String> future1 = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future1执行完成");
            });
    
            CompletableFuture<String> future2 = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future2执行完成");
            });
    
            CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2);
    			try {
                all.get(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    注:本文是学习B站周阳老师《尚硅谷2022版JUC并发编程》课程所做学习笔记。

  • 相关阅读:
    2023Web前端开发面试手册
    基于虚拟力优化的无线传感器网络覆盖率matlab仿真
    模拟信号转换器模块
    java ssm框架的点歌系统的设计与实现源码
    MSDC 4.3 接口规范(4)
    Java Gradle
    【Unity】思考方式与构造 | 碰撞器/刚体/预设/组件
    关于你所不知道的双机互备,确定不了解一下?
    电脑使用小常识(7):Word如何批量修改题注
    (附源码)springboot公益慈善管理系统 毕业设计 281454
  • 原文地址:https://blog.csdn.net/qq_44300280/article/details/127829959