前置知识:Future及其唯一实现类FutureTask的作用: 对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
详情可查看之前我的博文->链接
但是有一些弊端:
它有可能已经完成了计算并返回结果,也有可能至今还没完成。 我们只能手动的判断时候处理完成,以及处理完成后,怎么做。比如:有结果对结果怎么处理?如果出现异常怎么处理?等等。
由此就引入了CompletableFuture和ListenableFuture。
- public T get()
该方法为阻塞方法,会等待计算结果完成,会抛出三种异常
CancellationException -如果这个future被取消
ExecutionException -如果这个future异常完成 InterruptedException -如果当前线程在等待时被中断
- public T get(long timeout,TimeUnit unit)
有时间限制的阻塞方法,会抛出四种异常:
CancellationException -如果这个future被取消
ExecutionException -如果这个future异常完成 InterruptedException -如果当前线程在等待时被中断
TimeoutException -如果等待超时
- public T getNow(T valueIfAbsent)
立即获取方法结果,如果没有计算结束则返回传的值
- public T join()
完成时返回结果值,或在异常完成时抛出(未检查的)异常。为了更好地使用常见的函数形式,如果在CompletableFuture的完成过程中涉及的计算抛出了一个异常,此方法将抛出一个(未检查的) CompletionException,其原因是底层异常。
public boolean complete(T value)
立即完成计算,并把结果设置为传的值,返回是否设置成功
如果 CompletableFuture 没有关联任何的Callback、异步任务等,如果调用get方法,那会一直阻塞下去,可以使用complete方法主动完成计算
public boolean completeExceptionally(Throwable ex)
如果尚未完成,则会导致调用get()和相关方法抛出给定的异常。
demo
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//demo1
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10 / 1;
});
System.out.println(future.join());//10
System.out.println(future.get());//10
System.out.println(future.getNow(10));//10
future.complete(10);
System.out.println(future.get());//10
try {
future.completeExceptionally(new TimeoutException("超时了!"));
future.get();//引起超时异常
}catch (Exception e){
System.out.println(e.getMessage());
}
}
}
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
补:1.run表示执行没有返回值的线程,以 Runable 类型为参数 。
2.有Executor 参数的表示可以传入自己线程池,否则默认使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码 。 ForkJoinPool始自JDK7,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法。极大的提高效率。
demo:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync"));
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "supplyAsync");
System.out.println(future1.get());
System.out.println(future2.get());
}
//输出:
runAsync
null
supplyAsync
public CompletableFuture whenComplete(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function
fn) public CompletableFuture handle(BiFunction super T,Throwable,? extends U> fn)
public CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn)
public CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn, Executor executor)
补:1.whenCompleteAsync:可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果
2.但是handleAsync可以获取异步任务的返回值和抛出的异常信息,而且可以显示的修改返回的结果
3.exceptionally当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行,可以执行返回值
demo
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 20;
}).whenCompleteAsync((v, e) -> {
//v是上面返回的结果,无返回值v为null
//e是上面抛出的异常
System.out.println(v);
System.out.println(e);
});
System.out.println(future.get());
}
//也可以写
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 10 / 0;
}).whenCompleteAsync((v, e) -> {
System.out.println(v);
System.out.println(e);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 30;
});
System.out.println(future.get());
}
//run
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("线程开始了");
int i = 100 / 10;
System.out.println("线程结束了");
return i;
}, executor).handleAsync((v, e) -> {
System.out.println("res = " + v+" throwable="+e);
return res*10;//这个既可以获得上面执行的结果 也可以修改 返回值
});
}
public CompletableFuture thenApply(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)
补充: 这些方法不是马上执行的,也不会阻塞,而是前一个执行完成后继续执行下一个。 和 handle 方法的区别是,handle 会处理正常计算值和异常,不会抛出异常。而 thenApply 只会处理正常计算值,有异常则抛出。
demo:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 1)
.thenApply((a) -> {
System.out.println(a);//1
return a * 10;
}).thenApply((a) -> {
System.out.println(a);//10
return a + 10;
}).thenApply((a) -> {
System.out.println(a);//20
return a - 5;
});
System.out.println(future.get());//15
}
public CompletableFuture thenAccept(Consumer super T> action)
public CompletableFuture thenAcceptAsync(Consumer super T> action)
public CompletableFuture thenAcceptAsync(Consumer super T> action, Executor executor)
**补:**和thenApply相比, 其单纯的去消费结果而不会返回新的值,因些计算结果为 Void;。
public CompletableFuture thenAcceptBoth(CompletionStage extends U> other, BiConsumer super T,? super U> action)
public CompletableFuture thenAcceptBothAsync(CompletionStage extends U> other, BiConsumer super T,? super U> action)
public CompletableFuture thenAcceptBothAsync(CompletionStage extends U> other, BiConsumer super T,? super U> action, Executor executor)
public CompletableFuture runAfterBoth(CompletionStage> other, Runnable action)
**补:**runAfterBoth和thenAcceptBoth不同的是, 传一个 Runnable 类型的参数,不接收上一级的返回值
public CompletableFuture thenRun(Runnable action)
public CompletableFuture thenRunAsync(Runnable action)
public CompletableFuture thenRunAsync(Runnable action, Executor executor)
**补:**run的方法都不接收参数,且是void类型
demo:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> 1)
.thenAccept(System.out::println) //消费 上一级返回值 1
.thenAcceptAsync(System.out::println); //上一级没有返回值 输出null
System.out.println(future.get()); //消费函数没有返回值 输出null
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.supplyAsync(() -> 1)
//第一个参数是当前CompletableFuture,后边的函数式上面接收的CompletableFuture和当前CompletableFuture的返回结果
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2), (a, b) -> {
System.out.println(a);
System.out.println(b);
}).get();
}
public CompletableFuture thenCompose(Function super T,? extends CompletionStage> fn)
public CompletableFuture thenComposeAsync(Function super T,? extends CompletionStage> fn)
public CompletableFuture thenComposeAsync(Function super T,? extends CompletionStage> fn, Executor executor)
demo:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 1)
.thenApply((a) -> {
ThreadUtil.sleep(1000);
return a + 10;
})
.thenCompose((s) -> {
System.out.println(s); //11
return CompletableFuture.supplyAsync(() -> s * 5);
});
System.out.println(future.get());//55
}
public CompletableFuture thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
public CompletableFuture thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
public CompletableFuture thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn, Executor executor)
demo:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
//注意:supplyAsync和thenCombine 不一定哪个先哪个后,执行
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("supplyAsync");
return 2;
}).thenApply((a) -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("thenApply");
return a * 3;
})
.thenCombine(CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("thenCombineAsync");
return 10;
}), (a, b) -> {
System.out.println(a);
System.out.println(b);
return a + b;
});
System.out.println(future.get());
}
public CompletableFuture acceptEither(CompletionStage extends T> other, Consumer super T> action)
public CompletableFuture acceptEitherAsync(CompletionStage extends T> other, Consumer super T> action)
public CompletableFuture acceptEitherAsync(CompletionStage extends T> other, Consumer super T> action, Executor executor)
补: acceptEither方法是当任意一个 CompletionStage 完成的时候,action 这个消费者就会被执行。
demo
有时输出A,有时输出B,哪个Future先执行完就会根据它的结果计算。
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture
.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return "A";
})
.acceptEither(CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return "B";
}), System.out::println)
.get();
}
- public static CompletableFuture allOf(CompletableFuture>… cfs)
- public static CompletableFuture anyOf(CompletableFuture>… cfs)
demo:
//这个方法的意思是把有方法都执行完才往下执行,没有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println(1);
}),
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println(2);
}))
.get();
}
//有返回的例子
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f1";
});
f1.whenCompleteAsync((s, throwable) -> System.out.println(System.currentTimeMillis() + ":" + s));
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f2";
});
f2.whenCompleteAsync((s, throwable) -> System.out.println(System.currentTimeMillis() + ":" + s));
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
//阻塞,直到所有任务结束。
System.out.println(System.currentTimeMillis() + ":阻塞");
all.join();
System.out.println(System.currentTimeMillis() + ":阻塞结束");
//一个需要耗时2秒,一个需要耗时3秒,只有当最长的耗时3秒的完成后,才会结束。
System.out.println("任务均已完成。");
}
//输出结果有时为1 有时间为 2
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
Object obj = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return 1;
}),
CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return 2;
})).get();
System.out.println(obj);
}
ListenableFuture
可以让你注册一个回调函数,一旦计算完毕,就会执行它。或者,这个任务早已经执行完毕,那就立刻执行这个回调函数。ListenableFuture
增加了这一项简单的功能,就可以高效的支持到许多基础的Future
无法支持的操作。
ListenableFuture
的基本操作就是addListener(Runnable, Executor)
方法,它指定了当这个Future
代表的计算执行完成,指定的Runnable
将会被指定的Executor
运行。
更加常用的是:
Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)
根据JDK的
ExecutorService.submit(Callable)
这个方法的返回,可以初始化一个异步的计算future,Guava 提供了ListeningExecutorService
接口,这个接口无论在ExecutorService
的哪里返回一个正常的Future
,都会返回一个ListenableFuture
,将ExecutorService
转化为ListeningExecutorService
,很简单:MoreExecutors.listeningDecorator(ExecutorService).
基于FutureTask
如果你是基于
FutureTask
来转化的的,你可以选择Guava提供的ListenableFutureTask.create(Callable)
和ListenableFutureTask.create(Runnable, V)
demo
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
//函数表达式写法
Futures.addCallback(result->{},ex->{})
参考博文:https://www.jianshu.com/p/220d05525f27
https://juejin.cn/post/6844903892728168461
https://cloud.tencent.com/developer/article/1706034