2.1 runAsync(Runnable runnable)
2.2 runAsync(Runnable runnable, Executor executor)
2.3 supplyAsync(Supplier supplier)
2.4 supplyAsync(Supplier supplier, Executor executor)
4.电商小案例——简单应用CompletableFuture
5.4 CompletableFuture和线程池(thenRun、thenRunAsync)
在上一篇文章中和大家分享了Future和FutureTask,也在文末指明了这二者的缺点,链接:Java——聊聊JUC中的Future和FutureTask
由此引出了异步编程界的大佬 CompletableFuture。
由图可知,二者在都实现了 Future 接口,而 CompletableFuture 又实现了 CompletionStage 这个接口,强大之处就在这里。
CompletableFuture 提供了非常强大的针对Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调方式处理计算结果,也提供了转换和组合 CompletableFuture 的一系列方法,它可以代表一个明确完成的 Future,也可以代表一个完成阶段 CompletionStage。
我们现在都知道了 CompletableFuture 是一个类,学Java的最清楚了,给我个类,老子就 new,你别说别的,老子就是new,new就完事了,但是翻了翻jdk官方文档,打脸了。。。
他告诉我们对于 CompletableFuture 的无参构造,其实是创建了一个不完整的 CompletableFuture,那还new个屁啊。 所以这就需要它提供的四大静态方法来获取 CompletableFuture 对象了。(实际开发中常用的是第二组),对于这两组,我们都可以单独的传入 Runnable 接口或者 Supplier 供给型接口,也可以结合线程池传入。如果没有传入线程池参数,那么将使用默认的 ForkJoinPool.commonPool() ;如果指定了线程池参数,将以我们自定义的为主。
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeUnit;
-
- public class CompletableFutureDemo1 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
completableFuture = CompletableFuture.runAsync(() -> { - System.out.println(Thread.currentThread().getName());
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- System.out.println(completableFuture.get());
- }
- }
可以看到,这是没有返回值、没有指定线程池的静态方法,所以使用默认的 ForkJoinPool.commonPool,返回值因为没有所以就是 null。
- package com.szh.demo;
-
- import java.util.concurrent.*;
-
- public class CompletableFutureDemo2 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService threadPool = Executors.newFixedThreadPool(3);
-
- CompletableFuture
completableFuture = CompletableFuture.runAsync(() -> { - System.out.println(Thread.currentThread().getName());
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, threadPool);
- System.out.println(completableFuture.get());
-
- threadPool.shutdown();
- }
- }
这里,我们自定义了线程池,所以就使用我们自己定义的了。
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeUnit;
-
- public class CompletableFutureDemo3 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName());
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "hello supplyAsync";
- });
- System.out.println(completableFuture.get());
- }
- }
这是有返回值的一组,不指定线程池就使用默认的,get可以正常获取到异步线程的执行结果。
- package com.szh.demo;
-
- import java.util.concurrent.*;
-
- public class CompletableFutureDemo4 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService threadPool = Executors.newFixedThreadPool(3);
-
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName());
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "hello supplyAsync";
- }, threadPool);
- System.out.println(completableFuture.get());
-
- threadPool.shutdown();
- }
- }
这里,我们自定义了线程池,所以就使用我们自己定义的了。 get可以正常获取到异步线程的执行结果。
- package com.szh.demo;
-
- import java.util.Objects;
- import java.util.concurrent.*;
-
- public class CompletableFutureDemo5 {
- public static void main(String[] args) {
- ExecutorService threadPool = Executors.newFixedThreadPool(1);
- try {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName() + " ---- come in");
- int result = ThreadLocalRandom.current().nextInt(10);
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // if (result > 2) {
- // int ans = 10 / 0;
- // }
- System.out.println("出结果了:" + result);
- return result;
- }, threadPool).whenComplete((v, e) -> {
- if (Objects.isNull(e)) {
- System.out.println("计算完成:" + v);
- }
- }).exceptionally(e -> {
- e.printStackTrace();
- System.out.println("异常情况:" + e.getMessage());
- return -1;
- });
- System.out.println(Thread.currentThread().getName() + " 线程在忙其他事情....");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- threadPool.shutdown();
- }
- }
- }
这里的异步线程计算结果需要3秒,而main线程在忙自己的业务,当异步线程计算完毕之后,会自动回调 whenComplete 方法。
将代码中注释的部分打开,出现异常之后,当异步线程计算结果超过2之后,就会发生异常。
总结:
- 异步线程执行任务结束时,会自动回调某个对象的方法。(上面的案例是 whenComplete )
- 主线程设置好回调之后,不再关心异步线程的任务执行的究竟怎样,异步任务之间可以顺序执行。
- 异步线程执行任务出异常时,会自动回调某个对象的方法。(上面的案例是 exceptionally )
对于同一款产品,同时搜索出本产品在各大电商平台的售价,案例中的产品就拿 mysql 书籍为例。
解决方案:
- step by step:一步一步执行,按部就班,先查京东,再查当当,最后查淘宝。
- all in:异步线程执行,万箭齐发,京东、当当、淘宝同时多任务同时查询。
- package com.szh.demo;
-
- import lombok.Getter;
-
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ThreadLocalRandom;
- import java.util.concurrent.TimeUnit;
- import java.util.stream.Collectors;
-
- @Getter
- class NetMail {
- private String netMailName;
-
- public NetMail(String netMailName) {
- this.netMailName = netMailName;
- }
-
- //根据商品名称模拟一个随机商品价格,睡了一秒模拟在电商网站中搜索的耗时
- //这里偷懒,价格这块就使用double简单点,不再使用BigDecimal了
- public double calculatePrice(String productName) {
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
- }
- }
-
- public class CompletableFutureDemo6 {
- static List
netMailList = Arrays.asList( - new NetMail("jd"),
- new NetMail("dangdang"),
- new NetMail("taobao"),
- new NetMail("tmail"),
- new NetMail("pdd")
- );
-
- //step by step,一家一家搜索
- public static List
getPrice(List netMailList, String productName) { - return netMailList.stream()
- .map(netMail -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName)))
- .collect(Collectors.toList());
- }
-
- //CompletableFuture,万箭齐发同时搜索
- //List
---> List> ---> List - public static List
getPriceByCompletableFuture(List netMailList, String productName) { - return netMailList.stream()
- //开启异步多线程模式同时搜索,将每一个搜索任务都映射成一个CompletableFuture异步任务
- .map(netMail -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName))))
- .collect(Collectors.toList())
- .stream()
- //将异步多线程的执行结果再次映射成新的List流
- //join方法和get方法是一样的,获取到异步线程的执行结果,区别是join方法不会产生异常
- .map(CompletableFuture::join)
- .collect(Collectors.toList());
- }
-
- public static void main(String[] args) {
- //方式一耗时
- long startTime1 = System.currentTimeMillis();
- List
list1 = getPrice(netMailList, "Redis"); - list1.forEach(System.out::println);
- long endTime1 = System.currentTimeMillis();
- System.out.println("step by step原始方式耗时:" + (endTime1 - startTime1) + " ms");
-
- System.out.println("---------------------------------------");
-
- //方式二耗时
- long startTime2 = System.currentTimeMillis();
- List
list2 = getPriceByCompletableFuture(netMailList, "Redis"); - list2.forEach(System.out::println);
- long endTime2 = System.currentTimeMillis();
- System.out.println("CompletableFuture方式耗时:" + (endTime2 - startTime2) + " ms");
- }
- }
step by step 就是一个一个搜索,代码中模拟搜索一次就是1秒,所以五家电商平台就搜索了5次,大概5秒。
all in 就是开启异步多线程,在五家电商平台同时搜索,所以整体就是这1秒完事。
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
-
- public class CompletableFutureDemo7 {
- public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - try {
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "abc";
- });
- //等待拿到结果之后再走人
- System.out.println(completableFuture.get());
-
- //异步线程需要2秒才可以计算完成,主线程这边最多等待1秒,过期不候
- //System.out.println(completableFuture.get(1, TimeUnit.SECONDS));
-
- //和get()方法一样,只是join不会抛出异常
- //System.out.println(completableFuture.join());
-
- //如果主线程这边立刻获取的时候,异步线程还没有计算完成,则返回getNow中设定的备胎值
- //打开下面的try-catch语句,主线程会等待3秒之后再去获取异步线程的计算结果,而异步线程只需要2秒就可以计算完成,所以主线程可以拿到异步线程的计算结果
- // try {
- // TimeUnit.SECONDS.sleep(3);
- // } catch (InterruptedException e) {
- // e.printStackTrace();
- // }
- // System.out.println(completableFuture.getNow("xyz"));
-
- //根据主线程、异步线程的执行时间来决定是否打断异步线程的计算过程,直接返回complete设定的值
- //异步线程计算耗时2秒,主线程等待3秒,则不会打断,正常拿到异步线程的计算结果,false
- //异步线程计算耗时2秒,主线程等待1秒,则会打断,此时直接输出主线程complete设定的值,true
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //System.out.println(completableFuture.complete("rst") + " " + completableFuture.join());
- }
- }
public T get() 方法执行结果如下:↓↓↓
public T get(long timeout, TimeUnit unit) 打开注释,执行结果如下:↓↓↓
public T join() 打开注释,执行结果如下:↓↓↓
public T getNow(T valueIfAbsent) 打开注释,执行结果如下:↓↓↓ (详情参考上面代码中的注释,写的很详细了)
public boolean complete(T value) 打开注释,执行结果如下:↓↓↓(详情参考上面代码中的注释,写的很详细了)
这几个API的计算结果存在依赖关系,多个线程串行化。第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为第二个任务的入参,传递到回调方法中。
下面的代码是针对 handle 方法走的,它和 thenApply 的区别主要在遇到异常的时候。
handle如果遇到异常,那么仅仅是异常所在的这一步出错,而之后的会继续正常执行。(有点类似于 try-catch-finally)
thenApply如果遇到异常,后续的就都不会再执行了。(有点类似于 try-catch)
- package com.szh.demo;
-
- import java.util.Objects;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
-
- public class CompletableFutureDemo8 {
- public static void main(String[] args) {
- ExecutorService threadPool = Executors.newFixedThreadPool(2);
-
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("111");
- return 1;
- }, threadPool).handle((f, e) -> {
- //int x = 10 / 0;
- System.out.println("222");
- return f + 2;
- }).handle((f, e) -> {
- System.out.println("333");
- return f + 3;
- }).whenComplete((v, e) -> {
- if (Objects.isNull(e)) {
- System.out.println("计算结果:" + v);
- }
- }).exceptionally(e -> {
- e.printStackTrace();
- System.out.println(e.getMessage());
- return -1;
- });
-
- System.out.println(Thread.currentThread().getName() + " 在忙其他任务");
-
- threadPool.shutdown();
- }
- }
出现异常之后
将上述代码中的 handle 方法换成 thenApply之后(二者的函数式接口不一致,所以代码做如下修改),在没有异常情况下,和 handle 执行结果是一样的,就不再截图了。 如果出现异常了,thenApply是下图这种情况:↓↓↓
- .thenApply(f -> {
- int x = 10 / 0;
- System.out.println("222");
- return f + 2;
- }).thenApply(f -> {
- System.out.println("333");
- return f + 3;
- })
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
-
- /**
- * thenRun: 任务A执行完再执行任务B,并且B不需要A的结果
- * thenAccept: 任务A执行完再执行任务B,并且B需要A的结果,但是B无返回值
- * thenApply: 任务A执行完再执行任务B,并且B需要A的结果,同时B有返回值
- */
- public class CompletableFutureDemo9 {
- public static void main(String[] args) {
- System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
- System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
- System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
- }
- }
如果没有传入自定义线程池,那么大家都用默认的 ForkJoinPool。
如果你执行第一个任务的时候,传入了一个自定义线程池:
- 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
- 调用thenRunAsync方法执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
这样的方法有好多组,thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
-
- public class CompletableFutureDemo10 {
- public static void main(String[] args) {
- ExecutorService threadPool = Executors.newFixedThreadPool(3);
-
- try {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - try {
- TimeUnit.MILLISECONDS.sleep(20);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
- return "abcd";
- }, threadPool).thenRun(() -> {
- try {
- TimeUnit.MILLISECONDS.sleep(20);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
- }).thenRun(() -> {
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
- }).thenRun(() -> {
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
- });
- System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- threadPool.shutdown();
- }
- }
- }
全部都是 thenRun,则会使用我们自定义的线程池。当然了如果全部都是thenRunAsync,也一样使用的是我们自定义的线程池。
将代码中的第一个 thenRun 改成 thenRunAsync,就不一样了。
在源码中是可以看到的,如果你调用的 thenRunAsync,那么他就会给你用默认的线程池。
下面那个布尔方法表示CPU的内核数是否大于1,现在的电脑基本都满足了,所以可以理解为恒定的true。
public CompletableFuture applyToEither( CompletionStage extends T> other, Function super T, U> fn)
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.TimeUnit;
-
- public class CompletableFutureDemo11 {
- public static void main(String[] args) {
- CompletableFuture
playA = CompletableFuture.supplyAsync(() -> { - System.out.println("A come in");
- try {
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "playA";
- });
-
- CompletableFuture
playB = CompletableFuture.supplyAsync(() -> { - System.out.println("B come in");
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "playB";
- });
-
- CompletableFuture
completableFuture = playA.applyToEither(playB, f -> f + " is winner...."); - System.out.println(completableFuture.join());
- }
- }
public CompletableFuturethenCombine( CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.TimeUnit;
-
- public class CompletableFutureDemo12 {
- public static void main(String[] args) {
- CompletableFuture
completableFuture1 = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName() + " 启动....");
- try {
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 10;
- });
-
- CompletableFuture
completableFuture2 = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName() + " 启动....");
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 20;
- });
-
- CompletableFuture
result = completableFuture1.thenCombine(completableFuture2, (x, y) -> { - System.out.println("开始对两个结果进行合并....");
- return x * y;
- });
- System.out.println(result.join());
- }
- }
第二种写法如下
- package com.szh.demo;
-
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.TimeUnit;
-
- public class CompletableFutureDemo12 {
- public static void main(String[] args) {
- CompletableFuture
completableFuture1 = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName() + " 启动....");
- try {
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 10;
- }).thenCombine(CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + " 启动....");
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 20;
- }), (x, y) -> {
- System.out.println("开始对两个结果进行合并....");
- return x * y;
- });
- System.out.println(completableFuture1.join());
- }
- }