目录
3.1 runAsync:无返回值 和 SupplyAsync:有返回值
CompletableFuture 是对 Future 的扩展和增强。CompletableFuture 实现了Future和CompletionStage接口,其中最重要的是CompletionStage,它定义了CompletableFuture 实现任务编排的一些基本方法。只能说非常好用。
其继承结构如下图所示,在IDEA中使用ctrl + alt + U 实现
CompletionStage
接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool()
,但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。
主要有同步和异步两大类,这里只讲异步方法。同步方法可以类比。
创建类型的方法
中间处理类型的方法
结果处理类型的方法
- package cn.itcast.n6.c3;
-
- import java.util.concurrent.*;
-
- /**
- * @author : msf
- * @date : 2022/12/4
- * completable 讲解。
- */
- public class CompletableFutureD1 {
- static ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 5,50,10, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(100),Executors.defaultThreadFactory()
- ,new ThreadPoolExecutor.AbortPolicy()
- );
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("主线程 start ...");
- CompletableFuture
future = CompletableFuture.runAsync(()->{ - System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
- },executor);
-
- CompletableFuture
futureInt = CompletableFuture.supplyAsync(() -> { - System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
- return 10;
- }, executor);
-
- System.out.println(futureInt.get());
- System.out.println("主线程 end ...");
- }
- }
上述代码执行结果:
- package cn.itcast.n6.c3;
-
- import java.util.concurrent.*;
-
- /**
- * @author : msf
- * @date : 2022/12/4
- * completable 讲解--串行编排
- */
- public class CompletableFutureD4 {
- static ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 5, 50, 10, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
- , new ThreadPoolExecutor.AbortPolicy()
- );
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("主线程 start ...");
- CompletableFuture
future = CompletableFuture.runAsync(() -> { - System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, executor).thenRunAsync(()->{
- // 等任务1 执行完毕后,任务2执行
- System.out.println("任务2执行了" +Thread.currentThread().getName());
- },executor);
-
-
- CompletableFuture.supplyAsync(() -> {
- System.out.println("任务3子线程执行..." + Thread.currentThread().getName());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 100;
- }, executor).thenAcceptAsync((t)->{
- System.out.println("任务4子线程执行..." + Thread.currentThread().getName());
- System.out.println("t = " + t);
- },executor);
-
- CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务5子线程执行..." + Thread.currentThread().getName());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 100;
- }, executor).thenApplyAsync((t) -> {
- System.out.println("任务6子线程执行..." + Thread.currentThread().getName());
- System.out.println("t = " + t);
- return 120;
- }, executor);
- System.out.println("主线程 end ..." + future1.get());
-
- }
- }
上述代码执行结果:
开启一个任务1,然后开启任务2,我们需要的是,当任务1和2都执行结束后任务3再启动。
场景:例如两个人从不同地方去吃饭,都有两个人都到了然后开始点餐。
- package cn.itcast.n6.c3;
-
- import java.util.concurrent.*;
-
- /**
- * @author : msf
- * @date : 2022/12/4
- * completable 讲解-- 两个一起完成,或者两者其中之一完成
- */
- public class CompletableFutureD5 {
- static ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 5, 50, 10, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
- , new ThreadPoolExecutor.AbortPolicy()
- );
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("主线程 start ...");
- CompletableFuture
future = CompletableFuture.runAsync(() -> { - System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, executor);
-
-
- CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
- try {
- Thread.sleep(600);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 100;
- }, executor);
-
- future.runAfterBothAsync(future1, () -> {
- System.out.println("任务3..." + Thread.currentThread().getName());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务3.... end " + Thread.currentThread().getName());
- }, executor);
-
- System.out.println("主线程 end ..." + future1.get());
-
- }
- }
上述代码执行结果:
场景:当两个人都出去吃饭,其中一个人到了可以先点餐。
- package cn.itcast.n6.c3;
-
- import java.util.concurrent.*;
-
- /**
- * @author : msf
- * @date : 2022/12/4
- * completable 讲解-- 两个一起完成,或者两者其中之一完成
- */
- public class CompletableFutureD6 {
- static ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 5, 50, 10, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
- , new ThreadPoolExecutor.AbortPolicy()
- );
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("主线程 start ...");
- CompletableFuture
future = CompletableFuture.runAsync(() -> { - System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务1子线程执行...end " + Thread.currentThread().getName());
- }, executor);
-
-
- CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
- try {
- Thread.sleep(600);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务2子线程执行...end " + Thread.currentThread().getName());
- return 100;
- }, executor);
-
- future.runAfterEitherAsync(future1, () -> {
- System.out.println("任务3..." + Thread.currentThread().getName());
- }, executor);
-
- System.out.println("主线程 end ..." + future1.get());
-
- }
- }
上述代码执行如下:
handleAsync:相当于whenCompleteAsync和exceptionally的结合体,可以收集结果和异常信息然后进行下一步处理。
- package cn.itcast.n6.c3;
-
- import java.util.concurrent.*;
-
- /**
- * @author : msf
- * @date : 2022/12/4
- * completable 讲解。
- */
- public class CompletableFutureD3 {
- static ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 5, 50, 10, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
- , new ThreadPoolExecutor.AbortPolicy()
- );
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- System.out.println("主线程 start ...");
- CompletableFuture
future = CompletableFuture.runAsync(() -> { - System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
- }, executor).handleAsync((res, exec) -> {
- System.out.println("res1 = " + res);
- System.out.println("exec1 = " + exec);
- return null;
- });
- CompletableFuture
futureInt = CompletableFuture.supplyAsync(() -> { - System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
- int i = 10 / 0;
- return 10;
- }, executor).handleAsync((res, exec) -> {
- System.out.println("res2 = " + res);
- System.out.println("exec2 = " + exec);
- return 55;
- });
-
- System.out.println(futureInt.get());
- System.out.println("主线程 end ...");
- }
- }
上述代码执行结果:
模拟:三人吃饭,只要一个人到了就可以点餐,但是只有所有人到了,并且厨子把菜做好了才能上菜;
- package cn.itcast.n6.c3;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
-
- /**
- * @author : msf
- * @date : 2022/12/5
- * 模拟三人点餐,只要有一个人到了就可以点餐。然后交给厨子进行做饭
- */
- public class Test {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- long begin = System.currentTimeMillis();
- CompletableFuture
client1 = CompletableFuture.runAsync(() -> { - System.out.println("客户1开始出发了.....1小时后到");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("客户1到了");
- });
- CompletableFuture
client2 = CompletableFuture.runAsync(()->{ - System.out.println("客户2开始出发了.....半小时后到");
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("客户2到了");
- });
- CompletableFuture
client3 = CompletableFuture.runAsync(()->{ - System.out.println("客户3开始出发了.....1.5小时后到");
- try {
- Thread.sleep(1500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("客户3到了");
- });
- CompletableFuture
- System.out.println("已经有客户到了开始点餐" +result.get());
-
- CompletableFuture
cook = CompletableFuture.runAsync(() -> { - System.out.println("厨师开始做饭");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("厨师做完饭了");
- });
- CompletableFuture
reuslt1 = CompletableFuture.allOf(client1, client2, client3, cook); - reuslt1.join();
- System.out.println("人全部来齐了,厨师也做好饭了,上菜!");
- long end = System.currentTimeMillis();
- System.out.println("总共时间盲猜2.5秒, 真实时间" + (end - begin) );
- }
- }