Java8 - 使用CompletableFuture 构建异步应用
Java8 - 使用工厂方法 supplyAsync创建 CompletableFuture
每日一博 - Java 异步编程的 Promise 模式 CompletableFuture的前世今生 (上)
Java8 - 自定义实现体会CompletableFuture的原理
Java 8 - CompletableFuture组合式异步编程
CompletableFuture是一个可以通过编程方式显式地设置计算结果和状态以便让任务结束的Future,并且其可以作为一个CompletionStage(计算阶段),当它的计算完成时可以触发一个函数或者行为;当多个线程企图调用同一个CompletableFuture的complete、cancel方式时只有一个线程会成功。
CompletableFuture除了含有可以直接操作任务状态和结果的方法外,还实现了CompletionStage接口的一些方法,这些方法遵循:
当CompletableFuture任务完成后,同步使用任务执行线程来执行依赖任务结果的函数或者行为。
所有异步的方法在没有显式指定Executor参数的情形下都是复用ForkJoinPool.commonPool()线程池来执行。
所有CompletionStage方法的实现都是相互独立的,以便一个方法的行为不会因为重载了其他方法而受影响。
一个CompletableFuture任务可能有一些依赖其计算结果的行为方法,这些行为方法被收集到一个无锁基于CAS操作来链接起来的链表组成的栈中;当Completable-Future的计算任务完成后,会自动弹出栈中的行为方法并执行。
需要注意的是,由于是栈结构,在同一个CompletableFuture对象上行为注册的顺序与行为执行的顺序是相反的。
由于默认情况下支撑CompletableFuture异步运行的是ForkJoinPool
所以这里我们有必要简单讲解下ForkJoinPool。ForkJoinPool本身也是一种ExecutorService,与其他ExecutorService(比如ThreadPoolExecutor)相比,不同点是它使用了工作窃取算法来提高性能,其内部每个工作线程都关联自己的内存队列,正常情况下每个线程从自己队列里面获取任务并执行,当本身队列没有任务时,当前线程会去其他线程关联的队列里面获取任务来执行。这在很多任务会产生子任务或者有很多小的任务被提交到线程池来执行的情况下非常高效。
ForkJoinPool中有一个静态的线程池commonPool可用且适用大多数情况。commonPool会被任何未显式提交到指定线程池的ForkJoinTask执行使用。使用commonPool通常会减少资源使用(其线程数量会在不活跃时缓慢回收,并在任务数比较多的时候按需增加)。默认情况下,commonPool的参数可以通过system properties中的三个参数来控制:
java.util.concurrent.ForkJoinPool.common.parallelism:并行度级别,非负整数。
java.util.concurrent.ForkJoinPool.common.threadFactory:ForkJoinWorker ThreadFactory的类名。
java.util.concurrent.ForkJoinPool.common.exceptionHandler:Uncaught ExceptionHandler的类名。
对于需要根据不同业务对线程池进行隔离或者定制的情况,可以使用ForkJoinPool的构造函数显式设置线程个数,默认情况下线程个数等于当前机器上可用的CPU个数。
ForkJoinPool中提供了任务执行、任务生命周期控制的方法,还提供了任务状态监测的方法,比如getStealCount可以帮助调整和监控fork/join应用程序。另外,toSring方法会非常方便地返回当前线程池的状态(运行状态、线程池线程个数、激活线程个数、队列中任务个数)。
另外,当线程池关闭或者内部资源被耗尽(比如当某个队列大小大于67108864时),再向线程池提交任务会抛出RejectedExecutionException异常。
CompletableFuture是一种可以通过编程显式设置结果的future,下面我们通过一个例子来演示下:
public class TestCompletableFutureSet {
// 0自定义线程池
private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS,
AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5),
new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 1.创建一个CompletableFuture对象
CompletableFuture<String> future = new CompletableFuture<String>();
// 2.开启线程计算任务结果,并设置
POOL_EXECUTOR.execute(() -> {
// 2.1休眠3s,模拟任务计算
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 2.2设置计算结果到future
System.out.println("----" + Thread.currentThread().getName() + " set future result----");
future.complete("hello,artisan");
});
// 3.等待计算结果
System.out.println("---main thread wait future result---");
System.out.println(future.get());
// System.out.println(future.get(1000,TimeUnit.MILLISECONDS));
System.out.println("---main thread got future result---");
}
}
由上述代码可知,代码0创建了一个线程池,代码1创建了一个CompletableFuture对象,代码2将提交任务到异步线程池中执行。
代码3调用future的get()方法企图获取future的结果,如果future的结果没有被设置,则调用线程会被阻塞。
在代码2创建的任务内,代码2.1表示休眠3s,模拟异步任务的执行,代码2.2则表示在休眠3s后,调用future的complete方法设置future的结果,设置完结果后,所有由于调用future的get()方法而被阻塞的线程会被激活,并返回设置的结果。
如上所述,这里使用CompletableFuture实现了通知等待模型,主线程调用future的get()方法等待future返回结果,一开始由于future结果没有设置,所以主线程被阻塞挂起,等异步任务休眠3s,然后调用future的complete方法模拟主线程等待的条件完成,这时候主线程就会从get()方法返回。
当你想异步执行一个任务,并且不需要任务的执行结果时可以使用该方法,比如异步打日志,异步做消息通知等:
public static void runAsync() throws InterruptedException, ExecutionException {
// 1.1创建异步任务,并返回future
CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
// 1.1.1休眠2s模拟任务计算
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("over");
}
});
// 1.2 同步等待异步任务执行结束
System.out.println(future.get());
}
代码1.1创建了一个异步任务,并马上返回一个future对象,其创建了一个异步任务执行,任务内首先休眠2s,然后打印了一行日志。
代码1.2则调用返回的future的get()方法企图等待future任务执行完毕,由于runAsync方法不会有返回值,所以当任务执行完毕后,设置future的结果为null,即代码1.2等任务执行完毕后返回null。
需要注意的是,在默认情况下,runAsync(Runnable runnable)
方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步任务的,使用runAsync(Runnable runnable,Executor executor)
方法允许我们使用自己制定的线程池来执行异步任务。我们创建了一个自己的线程池bizPoolExecutor,在调用runAsync方法提交异步任务时,把其作为第二参数进行传递,则异步任务执行时会使用bizPoolExecutor中的线程执行,具体代码如下所示。
// 0.创建线程池
private static final ThreadPoolExecutor bizPoolExecutor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(10));
//没有返回值的异步执行,异步任务由业务自己的线程池执行
public static void runAsyncWithBizExecutor() throws InterruptedException, ExecutionException {
// 1.1创建异步任务,并返回future
CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
// 1.1.1休眠2s模拟任务计算
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("over");
}
}, bizPoolExecutor);
// 1.2 同步等待异步任务执行结束
System.out.println(future.get());
}
当你想异步执行一个任务,并且需要任务的执行结果时可以使用该方法,比如异步对原始数据进行加工,并需要获取到被加工后的结果等。
// 2. 有返回值的异步执行
public static void supplyAsync() throws InterruptedException, ExecutionException {
// 2.1创建异步任务,并返回future
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 2.1.1休眠2s模拟任务计算
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 2.1.2 返回异步计算结果
return "hello,jiaduo";
}
});
// 2.2 同步等待异步任务执行结束
System.out.println(future.get());
}
代码2.1使用supplyAsync开启了一个异步任务,执行后马上返回一个future对象;异步任务内线程休眠2s,然后返回了一个字符串结果,这个结果会被设置到future内部。
代码2.2则使用future的get()方法获取结果,一开始future结果并没有被设置,所以调用线程会被阻塞;等异步任务把结果设置到future后,调用线程就会从get()处返回异步任务执行的结果。
需要注意的是,在默认情况下,supplyAsync(Suppliersupplier)
方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步任务的,使用supply-Async(Suppliersupplier,Executor executor)
方法允许我们使用自己制定的线程池来执行异步任务,代码如下:
// 0.创建线程池
private static final ThreadPoolExecutor bizPoolExecutor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(10));
// 2. 有返回值的异步执行
public static void supplyAsyncWithBizExecutor() throws InterruptedException, ExecutionException {
// 2.1创建异步任务,并返回future
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 2.1.1休眠2s模拟任务计算
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 2.1.2 返回异步计算结果
return "hello,jiaduo";
}
}, bizPoolExecutor);
// 2.2 同步等待异步任务执行结束
System.out.println(future.get());
}
需要注意的是,这种方式激活的异步任务B是拿不到任务A的执行结果的:
// I thenRun不能访问oneFuture的结果
public static void thenRun() throws InterruptedException, ExecutionException {
// 1.创建异步任务,并返回future
CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 1.1休眠2s,模拟任务计算
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1.2返回计算结果
return "hello";
}
});
// 2.在future上施加事件,当future计算完成后回调该事件,并返回新future
CompletableFuture twoFuture = oneFuture.thenRun(new Runnable() {
@Override
public void run() {
// 2.1.1当oneFuture任务计算完成后做一件事情
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
System.out.println("---after oneFuture over doSomething---");
}
});
// 3.同步等待twoFuture对应的任务完成,返回结果固定为null
System.out.println(twoFuture.get());
}
由上述代码可知,代码1创建异步任务,并返回oneFuture对象,代码2在oneFuture上调用thenRun方法添加异步执行事件,当oneFuture计算完成后回调该事件,并返回twoFuture,另外,在twoFuture上调用get()方法也会返回null,因为回调事件是没有返回值的。
默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()
中的同一个线程来执行的,大家可以使用thenApplyAsync(Function super T,? extends U> fn, Executor executor)
来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调执行将不会在同一个线程中执行。
需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的:
public static void thenAccept() throws InterruptedException, ExecutionException {
// 1.创建异步任务,并返回future
CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 1.1休眠2s,模拟任务计算
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1.2返回计算结果
return "hello";
}
});
// 2.在future上施加事件,当future计算完成后回调该事件,并返回新future
CompletableFuture twoFuture = oneFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String t) {
// 2.1.1对oneFuture返回的结果进行加工
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("---after oneFuture over doSomething---" + t);
}
});
// 3.同步等待twoFuture对应的任务完成,返回结果固定为null
System.out.println(twoFuture.get());
}
在上述代码中,代码1创建异步任务,并返回oneFuture,代码2在oneFuture上调用thenAccept添加了一个任务,这个任务会在oneFuture对应的任务执行完毕后被激活执行。
需要注意的是,这里可以在回调的方法accept(String t)的参数t中来获取oneFuture对应的任务结果,另外需要注意的是,由于accept(String t)方法没有返回值,所以在twoFuture上调用get()方法最终也会返回null。
在默认情况下,oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()
中的同一个线程来执行的,大家可以使用thenAccept-Async(Consumeraction,Executor executor)
来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调执行将不会在同一个线程中执行。
需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的,并且可以获取到异步任务B的执行结果
public class TestCompletableFutureCallBack {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 1.创建异步任务,并返回future
CompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 1.1休眠2s,模拟任务计算
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1.2返回计算结果
return "hello";
}
});
// 2.在future上施加事件,当future计算完成后回调该事件,并返回新future
CompletableFuture<String> twoFuture = oneFuture.thenApply(new Function<String, String>() {
// 2.1在步骤1计算结果基础上进行计算,这里t为步骤1返回的hello
@Override
public String apply(String t) {
// 2.1.1对oneFuture返回的结果进行加工
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
// 2.1.2返回加工后结果
return t + " artisan";
}
});
// 3.同步等待twoFuture对应的任务完成,并获取结果
System.out.println(twoFuture.get());
}
}
在上述代码中,代码1创建异步任务,并返回oneFuture,代码2在oneFuture上调用thenApply添加了一个任务,这个任务会在oneFuture对应的任务执行完毕后被激活执行。需要注意的是,这里可以在回调方法apply(String t)的参数t中获取oneFuture对应的任务结果,另外需要注意的是,由于apply(String t)方法有返回值,所以在twoFuture上调用get()方法最终也会返回回调方法返回的值。
默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()
中的同一个线程来执行的,大家可以使用thenApplyAsync(Functionfn,Executor executor)
来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调执行将不会在同一个线程中执行。
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 1.创建一个CompletableFuture对象
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 1.1模拟异步任务执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1.2返回计算结果
return "hello,jiaduo";
}
});
// 2.添加回调函数
future.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String t, Throwable u) {
// 2.1如果没有异常,打印异步任务结果
if (null == u) {
System.out.println(t);
} else {
// 2.2打印异常信息
System.out.println(u.getLocalizedMessage());
}
}
});
// 3.挂起当前线程,等待异步任务执行完毕
Thread.currentThread().join();
}
这里代码1开启了一个异步任务,任务内先休眠1s,然后代码1.2返回计算结果;代码2则在返回的future上调用whenComplete设置一个回调函数,然后main线程就返回了。
在整个异步任务的执行过程中,main函数所在线程是不会被阻塞的,等异步任务执行完毕后会回调设置的回调函数,在回调函数内,代码2.1表示如果发现异步任务执行正常则打印执行结果,否则打印异常信息。这里代码3挂起了main函数所在线程,是因为具体执行异步任务的是ForkJoin的commonPool线程池,其中线程都是Deamon线程,所以,当唯一的用户线程main线程退出后整个JVM进程就退出了,会导致异步任务得不到执行。
如上所述,当我们使用CompletableFuture实现异步编程时,大多数时候是不需要显式创建线程池,并投递任务到线程池内的。
我们只需要简单地调用CompletableFuture的runAsync或者supplyAsync等方法把异步任务作为参数即可,其内部会使用ForkJoinPool线程池来进行异步执行的支持,这大大简化了我们异步编程的负担,实现了声明式编程(告诉程序我要执行异步任务,但是具体怎么实现我不需要管),当然如果你想使用自己的线程池来执行任务,也是可以非常方便地进行设置的。
CompletableFuture功能强大的原因之一是其可以让两个或者多个Completable-Future进行运算来产生结果,下面我们来看其提供的几组函数:
public class TestTwoCompletableFuture {
// 1.异步任务,返回future
public static CompletableFuture<String> doSomethingOne(String encodedCompanyId) {
// 1.1创建异步任务
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 1.1.1休眠1s,模拟任务计算
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1.1.2 解密,并返回结果
String id = encodedCompanyId;
return id;
}
});
}
// 2.开启异步任务,返回future
public static CompletableFuture<String> doSomethingTwo(String companyId) {
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 2.1 休眠3s,模拟计算
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 2.2 查询公司信息,转换为str,并返回
String str = companyId + ":alibaba";
return str;
}
});
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
// I,等doSomethingOne执行完毕后,接着执行doSomethingTwo
CompletableFuture result = doSomethingOne("123").thenCompose(id -> doSomethingTwo(id));
System.out.println(result.get());
}
}
上述main函数中首先调用方法doSomethingOne(“123”)开启了一个异步任务,并返回了对应的CompletableFuture对象,我们取名为future1,然后在future1的基础上调用了thenCompose方法,企图让future1执行完毕后,激活使用其结果作为doSomethingTwo(String companyId)方法的参数的任务。
CompletableFuture result = doSomethingOne("123").thenCompose(id -> doSomethingTwo(id));
修改为:
result = doSomethingOne("123").thenCombine(doSomethingTwo("456"), (one, two) -> {
return one + " " + two;
});
public static void allOf() throws InterruptedException, ExecutionException {
// 1.创建future列表
List<CompletableFuture<String>> futureList = new ArrayList<>();
futureList.add(doSomethingOne("1"));
futureList.add(doSomethingOne("2"));
futureList.add(doSomethingOne("3"));
futureList.add(doSomethingOne("4"));
// 2.转换多个future为一个
CompletableFuture<Void> result = CompletableFuture
.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
// 3.等待所有future都完成
System.out.println(result.get());
}
如上代码1调用了四次doSomethingOne方法,分别返回一个CompletableFuture对象,然后收集这些CompletableFuture到futureList列表。
代码2调用allOf方法把多个CompletableFuture转换为一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中所有任务执行完毕才返回。
public static void anyOf() throws InterruptedException, ExecutionException {
// 1.创建future列表
List<CompletableFuture<String>> futureList = new ArrayList<>();
futureList.add(doSomethingOne("1"));
futureList.add(doSomethingOne("2"));
futureList.add(doSomethingTwo("3"));
// 2.转换多个future为一个
CompletableFuture<Object> result = CompletableFuture
.anyOf(futureList.toArray(new CompletableFuture[futureList.size()]));
// 3.等待某一个future完成
System.out.println(result.get());
}
如上代码1调用了四次doSomethingOne方法,分别返回一个CompletableFuture对象,然后收集这些CompletableFuture到futureList列表。
代码2调用anyOf方法把多个CompletableFuture转换为一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中有一个任务执行完毕才返回。
前文的代码为我们演示的功能都是当异步任务内可以正常设置任务结果时的情况,但是情况并不总是这样的,比如下面这段代码:
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 1.创建一个CompletableFuture对象
CompletableFuture<String> future = new CompletableFuture<String>();
// 2.开启线程计算任务结果,并设置
new Thread(() -> {
// 2.1休眠3s,模拟任务计算
try {
// 2.1.1抛出异常
if (true) {
throw new RuntimeException("excetion test");
}
// 2.1.2设置正常结果
future.complete("ok");
} catch (Exception e) {
}
// 2.2设置计算结果到future
System.out.println("----" + Thread.currentThread().getName() + " set future result----");
}, "thread-1").start();
// 3.等待计算结果
System.out.println(future.get());
}
由上述代码可知,在代码2.1.2设置正常结果前,代码2.1.1抛出了异常,这会导致代码3一直阻塞,所以我们不仅需要考虑正常设置结果的情况,还需要考虑异常的情况,其实CompletableFuture提供了completeExceptionally方法来处理异常情况,将上述代码修改为如下所示。
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 1.创建一个CompletableFuture对象
CompletableFuture<String> future = new CompletableFuture<String>();
// 2.开启线程计算任务结果,并设置
new Thread(() -> {
// 2.1休眠3s,模拟任务计算
try {
// 2.1.1 抛出异常
if (true) {
throw new RuntimeException("excetion test");
}
// 2.1.2设置正常结果
future.complete("ok");
} catch (Exception e) {
// 2.1.3 设置异常结果
future.completeExceptionally(e);
}
// 2.2设置计算结果到future
System.out.println("----" + Thread.currentThread().getName() + " set future result----");
}, "thread-1").start();
// 3.等待计算结果
System.out.println(future.get());
}
如上代码2.1.3表示当出现异常时把异常信息设置到future内部,这样代码3就会在抛出异常后终止。
其实我们还可以修改代码3为:
System.out.println(future.exceptionally(t -> "default").get());// 默认值
实现当出现异常时返回默认值。