当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,Future的get() 方法会阻塞主线程。Future获取得线程执行结果前,我们的主线程get()得到结果需要一直阻塞等待
Future无法实现下面的场景
实例化
一种是supply开头的方法,一种是run开头的方法
- public static CompletableFuture supplyAsync(Supplier supplier);
- public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
-
- public static CompletableFuture
runAsync(Runnable runnable); - public static CompletableFuture
runAsync(Runnable runnable, Executor executor); -
- CompletableFuture
completableFuture = new CompletableFuture();
注意:
异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。
当不传递线程池时,会使用ForkJoinPool.commonPool()系统级公共线程池,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰
获取结果
- /**
- * Waits if necessary for this future to complete,
- * and then returns its result.
- **/
- public T get()
-
- /**
- * Waits if necessary for at most the given time for this future
- * to complete, and then returns its result, if available.
- **/
- public T get(long timeout, TimeUnit unit)
-
- /**
- * Returns the result value (or throws any encountered exception)
- * if completed, else returns the given valueIfAbsent.
- **/
- public T getNow(T valueIfAbsent)
-
- /**
- *Returns the result value when complete, or throws an
- * (unchecked) exception if completed exceptionally. To better
- * conform with the use of common functional forms, if a
- * computation involved in the completion of this
- * CompletableFuture threw an exception, this method throws an
- * (unchecked) {@link CompletionException} with the underlying
- * exception as its cause.
- **/
- public T join()
get() 方法同样会阻塞直到任务完成,主线程会一直阻塞或等待参数指定的时间,主线程为完成时状态会一直是not completed
getNow() 直接获取结果不等待,参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定一个确定的值
join() 与get() 区别在于join() 返回计算的结果或抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常
- public CompletableFuture
whenComplete(BiConsumer super T,? super Throwable> action) - public CompletableFuture
whenCompleteAsync(BiConsumer super T,? super Throwable> action) - public CompletableFuture
whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
方法1和2的区别在于是否使用异步处理,2和3的区别在于是否使用自定义的线程池,前三个方法都会提供一个返回结果和可抛出异常,我们可以使用lambda表达式的来接收这两个参数,然后自己处理。
- CompletableFuture
future = CompletableFuture.supplyAsync(() -> { - return = "result";
- });
- future.whenComplete((result, error) -> {
- log.info("result:{}",result);
- error.printStackTrace();
- });
- public CompletableFuture handle(BiFunction super T,Throwable,? extends U> fn)
- public CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn)
- public CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn, Executor executor)
handle方法集和上面的complete方法集没有区别,同样有两个参数一个返回结果和可抛出异常,区别就在于返回值,没错,虽然同样返回CompletableFuture类型,但是里面的参数类型,handle方法是可以自定义的
- // 注意这里的两个CompletableFuture包含的返回类型不同
- CompletableFuture
future = CompletableFuture.supplyAsync(() -> {
- List
list = new ArrayList<>(); - list.add("语文");
- list.add("数学");
- // 获取得到今天的所有课程
- return list;
- });
- // 使用handle()方法接收list数据和error异常
- CompletableFuture
future2 = future.handle((list,error)-> { - // 如果报错,就打印出异常
- error.printStackTrace();
- // 如果不报错,返回一个包含Integer的全新的CompletableFuture
- return list.size();
- });
- public CompletableFuture thenApply(Function super T,? extends U> fn)
- public CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
- public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)
apply方法和handle方法一样,都是结束计算之后的后续操作,唯一的不同是,handle方法会给出异常,可以让用户自己在内部处理,而apply方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理。 如果不想每个链式调用都处理异常,那么就使用apply吧
- public CompletableFuture
thenAccept(Consumer super T> action) - public CompletableFuture
thenAcceptAsync(Consumer super T> action) - public CompletableFuture
thenAcceptAsync(Consumer super T> action, Executor executor)
accept()三个方法只做最终结果的消费,注意此时返回的CompletableFuture是空返回。只消费,无返回,有点像流式编程的终端操作
捕获中间产生的异常——exceptionally
public CompletableFuture exceptionally(Function fn)
exceptionally() 可以帮我们捕捉到所有中间过程的异常,方法会给我们一个异常作为参数,我们可以处理这个异常,同时返回一个默认值,跟服务降级 有点像,默认值的类型和上一个操作的返回值相同
- CompletableFuture
future = CompletableFuture.supplyAsync(() -> { - // 返回null
- return null;
- });
-
- CompletableFuture
exceptionally = future.thenApply(result -> { - // 制造一个空指针异常NPE
- int i = result;
- return i;
- }).exceptionally(error -> {
- // 我们选择在这里打印出异常
- error.printStackTrace();
- // 并且当异常发生的时候,我们返回一个默认的值
- return "发生了异常";
- });
将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
- CompletableFuture
> future = CompletableFuture.supplyAsync(() -> {
- // 根据学生姓名获取学生信息
- return StudentService.getStudent(name);
- }).thenApply(student -> {
- // 再根据学生信息获取今天的课程
- return LessonsService.getLessons(student);
- });
将两个异步计算合并为一个,这两个异步计算之间相互独立,互不依赖
- CompletableFuture
> painting = CompletableFuture.supplyAsync(() -> {
- // 第一个任务获取美术课需要带的东西,返回一个list
- List
stuff = new ArrayList<>(); - stuff.add("画笔");
- stuff.add("颜料");
- return stuff;
- });
- CompletableFuture
> handWork = CompletableFuture.supplyAsync(() -> {
- // 第二个任务获取劳技课需要带的东西,返回一个list
- List
stuff = new ArrayList<>(); - stuff.add("剪刀");
- stuff.add("折纸");
- return stuff;
- });
- CompletableFuture
> total = painting
- // 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
- .thenCombine(handWork, (stuff1, stuff2) -> {
- // 合并成新的list
- List
totalStuff = Stream.of(stuff1, stuff1) - .flatMap(Collection::stream)
- .collect(Collectors.toList());
- return totalStuff;
- });
当所有给定的任务完成后,返回一个全新的已完成CompletableFuture
- final ExecutorService executorService = Executors.newFixedThreadPool(100);
-
- List
futures = Lists.newArrayList(); -
- futures.add(CompletableFuture.runAsync(() -> {
- // do something
- },executorService).thenRunAsync(() -> {
- long end = System.currentTimeMillis();
- log.info("combine.combine.time2:{}",(end - start));
- },executorService));
-
- futures.add(CompletableFuture.runAsync(() -> {
- // do something
- },executorService).thenRunAsync(() -> {
- long end = System.currentTimeMillis();
- log.info("combine.combine.time2:{}",(end - start));
- },executorService));
-
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(() -> {
- long end = System.currentTimeMillis();
- log.info("combine.combine.time3:{}",(end - start));
- }).join();
仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。 小贴士 :如果最快完成的任务出现了异常,也会先返回异常,如果害怕出错可以加个exceptionally() 去处理一下可能发生的异常并设定默认返回值
- CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> { - return 1;
- });
-
- CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> { - return 2;
- });
-
- CompletableFuture
- .anyOf(future1, future2)
- .exceptionally(error -> {
- error.printStackTrace();
- return 2;
- });