CompletableFuture
实现了CompletionStage
接口和Future
接口,并在此基础上进行了丰富的扩展,完美弥补了Future
的局限性,同时CompletableFuture
实现了对任务编排的能力,增加了异步回调、流式处理、组合处理的能力。
CompletableFuture提供了4种构建方式,如下
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 异步执行一个任务,带返回结果
supplyAsync(Supplier<U> supplier)
// 异步执行一个任务,可以指定一个线程池,带返回结果
supplyAsync(Supplier<U> supplier, Executor executor)
使用方式
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("异步执行任务");
return "异步执行结果";
});
// 阻塞等待执行结果
System.out.println(future.get());
// 也可以指定一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("异步执行任务");
return "异步执行结果";
}, executorService);
// 阻塞等待执行结果
System.out.println(future.get());
// 异步执行一个任务,不带返回结果
runAsync(Runnable runnable)
// 异步执行一个任务,可以指定一个线程池,不带返回结果
runAsync(Runnable runnable, Executor executor)
使用方式
CompletableFuture future = CompletableFuture.runAsync(() -> {
System.out.println("异步执行任务");
});
// 阻塞等待执行结果
future.get();
// 也可以指定一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture future = CompletableFuture.runAsync(() -> {
System.out.println("异步执行任务");
}, executorService);
// 阻塞等待执行结果
future.get();
主动完成一个任务,并告诉它执行结果,可以用来异步数据传递
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future = new CompletableFuture();
// CompletableFuture没有执行任务,调用get方法会阻塞
new Thread(new TestCompletable(future)).start();
future.complete("传递一个数据");
}
static class TestCompletable implements Runnable {
private CompletableFuture future;
public TestCompletable(CompletableFuture future) {
this.future = future;
}
@Override
public void run() {
System.out.println("等待唤醒并输出异步结果");
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
同步等待一个异步执行结果,与get方法一样,只是抛出的异常不一样
传递一个异常信息,与complete类似
public boolean completeExceptionally(Throwable ex)
传递多个CompletableFuture,等待所有传入的CompletableFuture执行完成后,返回一个没有返回值的CompletableFuture
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
传递多个CompletableFuture,任何一个传入的CompletableFuture执行完成后,返回完成的那个CompletableFuture
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
依赖一个异步执行结果
CompletableFuture future = CompletableFuture.supplyAsync(() -> "异步执行结果");
// 接收一个异步执行结果
future.thenAccept((rs) -> {
System.out.println("接收一个异步执行结果: " + rs);
});
// 阻塞等待执行结果
System.out.println(future.get());
还可以使用链式风格
CompletableFuture.supplyAsync(() -> "异步执行结果").thenAccept((rs) -> {
System.out.println("接收一个异步执行结果: " + rs);
}).get();
依赖两个任务执行结果
CompletableFuture.supplyAsync(() -> "future1")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> "future2"), (f1, f2) -> {
System.out.println("执行结果:" + f1 + f2);
}).get();
依赖任意一个任务的执行结果
CompletableFuture.supplyAsync(() -> "future1")
.acceptEither(CompletableFuture.supplyAsync(() -> "future2"), (f) -> {
System.out.println("执行结果: " + f);
});
依赖一个异步执行结果,并且返回新的执行结果
CompletableFuture future = CompletableFuture.supplyAsync(() -> "future1")
.thenApply(f -> f + "apply");
System.out.println(future.get());
依赖两个任务执行结果,并且返回新的执行结果
CompletableFuture future = CompletableFuture.supplyAsync(() -> "future1")
.thenCombineAsync(CompletableFuture.supplyAsync(() -> "furure2"), (f1, f2) -> f1 + f2);
System.out.println(future.get());
// 出现异常时,后面的所有任务都无法执行
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("异常");
}).runAfterBoth(CompletableFuture.supplyAsync(() -> "future"), () -> {
System.out.println("任务执行");
});
System.out.println(future.get());
可以通过whenComplete捕获前置任务异常
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("异常");
}).whenComplete((r, e) -> {
if(null != e) {
System.out.println("出现异常");
} else{
System.out.println(r);
}
});
也可以使用handleAsync,不管前置任务是否异常都会执行
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("异常");
}).handleAsync((r, e) -> null == e ? r : null);
System.out.println(future.get());