public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(() ->{
System.out.println("我进来了");
return "老六是我";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(futureTask.get());
}
get()方法在Future 计算完成之前会一直处在阻寒状态下
isDone()方法容易耗费CPU资源
对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。
因此,JDK8设计出CompletableFuture
CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
参数说明:
它是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println(1/0);
return "卧槽,外挂";
}).whenComplete((res, e) -> {
if (e != null) {
System.out.println("有异常");
e.printStackTrace();
} else {
System.out.println("计算结果:" + res);
}
}).exceptionally(e -> {
System.out.println("出异常了");
e.printStackTrace();
return null;
});
System.out.println("主线程:"+ Thread.currentThread().getName());
try {
// ForkJoinPool线程池是守护线程,必须要保持存在用户线程
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
运行结果
ForkJoinPool.commonPool-worker-9
主线程:main
有异常
出异常了
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.ArithmeticException: / by zero
at Main.lambda$main$0(Main.java:20)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 5 more
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.ArithmeticException: / by zero
at Main.lambda$main$0(Main.java:20)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 5 more
Process finished with exit code 0
以下方法带Async的对应方法会使用默认线程池ForkJoinPool进行线程管理
public T get() // 阻塞
public T get(long timeout, TimeUnit unit) // 阻塞,超时则抛出异常
public T join() // 阻塞,代码中无需抛出编译异常
public T getNow(T valueIfAbsent) // 获得当前计算结果,如线程已经计算完成则返回计算结果;如当前未计算出结果则返回valueIfAbsent;不会打断线程的执行
public boolean complete(T value) // 返回值表示是否有打断get方法,有打断则后续join获取到的为括号内的值,未打断则获取到的是线程执行完的结果值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) // 计算结果存在依赖关系,线程串行化执行;有异常不执行下一步
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) // 同上,但是有异常时会带着异常到下一步
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) // 接收任务的处理结果,并消费处理,无返回结果
// 调用该方法的返回值的join会返回速度执行快的线程的执行结果,同时执行慢的线程也会继续执行而不会中断
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理,先完成的先等着,等待其它分支任务
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)