从JDK 8开始,在Concurrent包中提供了一个强大的异步编程工具CompletableFuture。在JDK8之前,异步编程可以通过线程池和Future来实现,但功能还不够强大。
demo1,示例代码:
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture();
new Thread() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 另一个线程执行任务,将结果赋值给future
future.complete("hello 伟大的架构师");
}
}.start();
System.out.println("任务已经提交");
// 阻塞的方法
String result = future.get();
System.out.println(result);
}
}
CompletableFuture实现了Future接口,所以它也具有Future的特性:调用get()方法会阻塞在那,直到结果返回。
另外1个线程调用complete方法完成该Future,则所有阻塞在get()方法的线程都将获得返回结果。
以下是各api的详细详解
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 通过异步的方式给future指派任务,future没有返回值
CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行完毕");
}
});
Object o = future.get();
System.out.println(o);
}
CompletableFuture.runAsync(…)传入的是一个Runnable接口。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "异步线程返回结果";
}
});
String result = future.get();
System.out.println(result);
}
例2和例1的区别在于,例2的任务有返回值。没有返回值的任务,提交的是Runnable,返回的是
CompletableFuture;有返回值的任务,提交的是 Supplier,返回的是CompletableFuture。Supplier和前面的Callable很相似。
通过上面两个例子可以看出,在基本的用法上,CompletableFuture和Future很相似,都可以提交
两类任务:一类是无返回值的,另一类是有返回值的。
对于 Future,在提交任务之后,只能调用 get()等结果返回;但对于 CompletableFuture,可以在结果上面再加一个callback,当得到结果之后,再接着执行callback。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello 阿瑞";
}
});
CompletableFuture<Void> voidCompletableFuture = future.thenRun(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行结束后的代码执行");
}
});
voidCompletableFuture.get();
System.out.println("任务执行结束");
}
该案例最后不能获取到结果,只会得到一个null。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello 伟大的架构师";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
// 可以获取上个任务的执行结果,接着进行处理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s.length());
}
});
Void aVoid = future.get();
System.out.println(aVoid);
}
上述代码在thenAccept中可以获取任务的执行结果,接着进行处理。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello arui";
}
}).thenApply(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
// 接收上个任务的返回值,接着处理,同时将处理结果返回给future
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
}
});
Integer integer = future.get();
System.out.println(integer);
}
三个例子都是在任务执行完成之后,接着执行回调,只是回调的形式不同:
在上面的例子中,thenApply接收的是一个Function,但是这个Function的返回值是一个通常的基本数据类型或一个对象,而不是另外一个CompletableFuture。如果 Function 的返回值也是一个CompletableFuture,就会出现嵌套的CompletableFuture。考虑下面的例子:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<CompletableFuture<Integer>> future
= CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello lagou";
}
}).thenApply(new Function<String, CompletableFuture<Integer>>() {
@Override
public CompletableFuture<Integer> apply(String s) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return s.length();
}
});
}
});
Integer integer = future.get().get();
System.out.println(integer);
}
如果希望返回值是一个非嵌套的CompletableFuture,可以使用thenCompose:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "返回结果";
}
}).thenCompose(new Function<String, CompletableFuture<Integer>>() {
@Override
public CompletableFuture<Integer> apply(String s) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return s.length();
}
});
}
});
System.out.println(future.get());
}
下面是thenCompose方法的接口定义:
CompletableFuture中的实现:
从该方法的定义可以看出,它传入的参数是一个Function类型,并且Function的返回值必须是
CompletionStage的子类,也就是CompletableFuture类型
thenCombine方法的接口定义如下,从传入的参数可以看出,它不同于thenCompose。
第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就
是该方法有2个输入参数,1个返回值。
从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个
CompletableFuture的返回值传进去,再额外做一些事情。实例如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello";
}
}).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hello1";
}
}), new BiFunction<String, String, Integer>() {
@Override
public Integer apply(String s, String s2) {
System.out.println("s = " + s);
System.out.println("s2 = " + s2);
return s.length() + s2.length();
}
});
System.out.println(future.get());
}
上面的thenCompose和thenCombine只能组合2个CompletableFuture,而接下来的allOf 和anyOf 可以组合任意多个CompletableFuture。方法接口定义如下所示。
首先,这两个方法都是静态方法,参数是变长的CompletableFuture的集合。其次,allOf和anyOf
的区别,前者是“与”,后者是“或”。
allOf的返回值是CompletableFuture类型,这是因为每个传入的CompletableFuture的返回
值都可能不同,所以组合的结果是无法用某种类型来表示的,索性返回Void类型。
anyOf 的含义是只要有任意一个 CompletableFuture 结束,就可以做接下来的事情,而无须像
AllOf那样,等待所有的CompletableFuture结束。
但由于每个CompletableFuture的返回值类型都可能不同,任意一个,意味着无法判断是什么类
型,所以anyOf的返回值是CompletableFuture类型。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture[] futures = new CompletableFuture[10];
for (int i = 0; i < 10; i++) {
CompletableFuture<Void> myFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000 + RANDOM.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
result++;
}
});
futures[i] = myFuture;
}
// for (int i = 0; i < 10; i++) {
// futures[i].get();
// System.out.println(result);
// }
// CompletableFuture future = CompletableFuture.allOf(futures).thenRun(new Runnable() {
// @Override
// public void run() {
// System.out.println("计算完成");
// }
// });
//
// future.get();
// System.out.println(result);
CompletableFuture myfuture = CompletableFuture.anyOf(futures).thenRun(new Runnable() {
@Override
public void run() {
System.out.println(result);
}
});
myfuture.get();
}
通过上面的例子可以总结出,提交给CompletableFuture执行的任务有四种类型:Runnable、Consumer、Supplier、Function。下面是这四种任务原型的对比。
runAsync 与 supplierAsync 是 CompletableFuture 的静态方法;而 thenAccept、thenAsync、thenApply是CompletableFutre的成员方法。
因为初始的时候没有CompletableFuture对象,也没有参数可传,所以提交的只能是Runnable或者Supplier,只能是静态方法;通过静态方法生成CompletableFuture对象之后,便可以链式地提交其他任务了,这个时候就可以提交Runnable、Consumer、Function,且都是成员方法。