


其中ForkJoinTask类及其子类RecursiveTask类,RecursiveAction类,CountedCompleter类属于Fork/Join框架,SwingWorker类属于Swing应用开发,故在此不做阐述,本文主要讲解FutureTask类与CompletableFuture类
- public FutureTask(Callable
callable) { - if (callable == null)
- throw new NullPointerException();
- this.callable = callable;
- this.state = NEW; // ensure visibility of callable
- }
-
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
通过构造方法可以看出,可传入Callable接口或Runnable接口,但Runnable接口会被包装成Callable接口
传入Runnable接口的同时还可传入参数result,即执行结果;因为Runnable接口中的run方法无返回值,而Callable接口中的call方法有返回值,所以需要返回result实现特殊的"run方法返回值",当然,若不需要特殊的返回值,可将result参数设置为null
- public static
Callable callable(Runnable task, T result) { - if (task == null)
- throw new NullPointerException();
- return new RunnableAdapter
(task, result); - }
RunnableAdapter是Callable接口的实现类
- static final class RunnableAdapter
implements Callable { - final Runnable task;
- final T result;
- RunnableAdapter(Runnable task, T result) {
- this.task = task;
- this.result = result;
- }
- public T call() {
- task.run();
- return result;
- }
- }
- public class Demo {
- public static void main(String[] args) {
- Future
task = new FutureTask<>(()-> { - System.out.println(Thread.currentThread().getName() + " starts working");
- //模拟线程工作耗时
- TimeUnit.SECONDS.sleep(3);
- return Thread.currentThread().getName() + " execution completed";
- });
-
- new Thread((Runnable) task).start();
-
- //确保执行异步任务的线程已开启
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //调用者线程继续执行其他任务
- System.out.println(Thread.currentThread().getName());
-
- //获取异步任务回调的返回值
- try {
- System.out.println("task return : " + task.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- }

可以看到是异步执行的,主线程与分支线程各行其事
通常不通过构造函数创建CompeletableFuture对象,而是通过runAsync方法与supplyAsync方法
- public CompletableFuture() {
- }
-
- private CompletableFuture(Object r) {
- this.result = r;
- }
- public static CompletableFuture
runAsync(Runnable runnable) { - return asyncRunStage(asyncPool, runnable);
- }
-
-
- public static CompletableFuture
runAsync(Runnable runnable, - Executor executor) {
- return asyncRunStage(screenExecutor(executor), runnable);
- }
任务执行完成后,可通过get方法获取任务的执行结果(CompletableFuture类的实例的result属性),但值为null
翻看runAsync方法内部,发现Runnable接口的run方法在AsyncRun类的实例中被调用,紧接着调用completeNull方法将任务的执行结果设置为null(将CompletableFuture类的实例的result属性设置为null),所以get方法获取的执行结果为null
Runnable接口的run方法无返回值,即使想将任务的执行结果设置为返回值也是无法办到的,所以通过runAsync方法实现的异步回调被称为无返回值的异步回调
AsyncRun类实现了Runnable接口
- static CompletableFuture
asyncRunStage(Executor e, Runnable f) { - if (f == null) throw new NullPointerException();
- CompletableFuture
d = new CompletableFuture(); - e.execute(new AsyncRun(d, f));
- return d;
- }
- static final class AsyncRun extends ForkJoinTask
- implements Runnable, AsynchronousCompletionTask {
- CompletableFuture
dep; Runnable fn; - AsyncRun(CompletableFuture
dep, Runnable fn) { - this.dep = dep; this.fn = fn;
- }
-
- public final Void getRawResult() { return null; }
- public final void setRawResult(Void v) {}
- public final boolean exec() { run(); return true; }
-
- public void run() {
- CompletableFuture
d; Runnable f; - if ((d = dep) != null && (f = fn) != null) {
- dep = null; fn = null;
- if (d.result == null) {
- try {
- f.run();
- //completeNull() : 以null值完成 除非已经完成
- /** Completes with the null value, unless already completed. */
- d.completeNull();
- } catch (Throwable ex) {
- d.completeThrowable(ex);
- }
- }
- d.postComplete();
- }
- }
- }
示例代码
- public class Demo {
- public static void main(String[] args) {
- //泛型代表返回值的类型 runAsync方法无返回值就填Void
- CompletableFuture
completableFuture = CompletableFuture.runAsync(()->{ - System.out.println(Thread.currentThread().getName() + " starts working");
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
-
- //确保执行异步任务的线程已开启
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName());
-
- //通过get方法获取任务的执行结果
- try {
- System.out.println("get() return : " + completableFuture.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
运行效果

可以看到是异步执行的,主线程与分支线程各行其事
- public static CompletableFuture supplyAsync(Supplier supplier) {
- return asyncSupplyStage(asyncPool, supplier);
- }
-
-
- public static CompletableFuture supplyAsync(Supplier supplier,
- Executor executor) {
- return asyncSupplyStage(screenExecutor(executor), supplier);
- }
任务执行完成后,可通过CompletableFuture类的实例的get方法获取任务的执行结果(CompletableFuture类的实例的result属性),值为Supplier接口的get方法的返回值
翻看supplyAsync方法内部后可发现,Supplier接口的get方法在AsyncSupply类的实例中被调用,其返回值被completeValue方法设置为任务的执行结果(设置CompletableFuture类的实例的result属性),所以通过CompletableFuture类的实例的get方法可用获取Supplier接口的get方法的返回值
由于任务的执行结果被设置为Supplier接口的get方法的返回值,所以通过supplyAsync方法实现的异步回调被称为有返回值的异步回调
AsyncSupply类实现了Runnable接口
- static CompletableFuture asyncSupplyStage(Executor e,
- Supplier f) {
- if (f == null) throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
- e.execute(new AsyncSupply(d, f));
- return d;
- }
- static final class AsyncSupply
extends ForkJoinTask - implements Runnable, AsynchronousCompletionTask {
- CompletableFuture
dep; Supplier fn; - AsyncSupply(CompletableFuture
dep, Supplier fn) { - this.dep = dep; this.fn = fn;
- }
-
- public final Void getRawResult() { return null; }
- public final void setRawResult(Void v) {}
- public final boolean exec() { run(); return true; }
-
- public void run() {
- CompletableFuture
d; Supplier f; - if ((d = dep) != null && (f = fn) != null) {
- dep = null; fn = null;
- if (d.result == null) {
- try {
- //completeValue() : 以非异常结果完成 除非已经完成
- /** Completes with a non-exceptional result, unless already completed. */
- d.completeValue(f.get());
- } catch (Throwable ex) {
- d.completeThrowable(ex);
- }
- }
- d.postComplete();
- }
- }
- }
示例代码
一:直接通过get()方法获取任务执行结果
- public class Demo {
- public static void main(String[] args) {
-
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(()->{ - System.out.println(Thread.currentThread().getName() + "starts working");
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //任务正常完成的返回值
- return 1;
- });
-
- //确保执行任务的线程已开启
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName());
-
- //通过get方法获取任务的执行结果
- try {
- System.out.println("get() return : " + completableFuture.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
运行效果

可以看到是异步执行的,主线程与分支线程各行其事
二:通过whenComplete方法或whenCompleteAsync方法获取执行结果
whenComplete() 该任务(whenComplete)由当前线程执行 whenCompleteAsync() 该任务(whenCompleteAsync)由其他线程池执行(可指定执行线程池 不指定则自动生成) 通常不选用该方法(因为交给线程池处理 需要切换一次上下文 会降低程序的运行效率)
- public class Demo {
- public static void main(String[] args) {
-
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(()->{ - System.out.println(Thread.currentThread().getName() + "starts working");
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //任务正常完成的返回值
- return 1;
- });
-
- //确保执行异步任务的线程已开启
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName());
-
- try {
- Integer result = completableFuture.whenComplete((success, fail) -> {
- System.out.println("success = " + success); //success记录任务正常完成的返回值
- System.out.println("fail = " + fail); //fail记录错误信息
- //执行时发生错误的处理
- }).exceptionally((e) -> {
- System.out.println("exception message : " + e.getMessage());
- System.out.println("stack message : " + Arrays.toString(e.getStackTrace()));
- return -1; //任务发生错误时的返回值(用于判断任务发生了错误)
- //通过get方法获取任务的执行结果
- }).get();
- System.out.println("get() return : " + result);
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
运行效果

可以看到是异步执行的,主线程与分支线程各行其事
人工加入错误的运行效果(int i = 1/0)
