Future接口(FutueTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,
主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。
Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
Future接口能干什么?
Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
代码说话:
Runnable接口Callable接口
Future接口和FutureTask实现类
目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务
绿色虚线:表示实现的关系,实现一个接口
绿色实线:表示接口之间的继承
蓝色实线:表示类之间的继承
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
long start = System.currentTimeMillis();
FutureTask<String> task2 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(2);
return "2";
});
FutureTask<String> task1 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(1);
return "1";
});
FutureTask<String> task3 = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(3);
return "3";
});
executorService.submit(task1);
executorService.submit(task2);
executorService.submit(task3);
System.out.println(task1.get());
System.out.println(task2.get(3,TimeUnit.SECONDS));
while (true){
if(task3.isDone()){
System.out.println(task3.get());
break;
}else {
TimeUnit.MILLISECONDS.sleep(200);
}
}
System.out.println("执行耗时:"+(System.currentTimeMillis()-start));
executorService.shutdown();
}
}
1
2
3
执行耗时:3066
优缺点分析
优点: future+线程池异步多线程任务配合,能显著提高程序的执行效率。
缺点
一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞
isDone()轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果.
如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞
Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果。
对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理
对于简单的业务场景使用Future完全OK,但想完成上述一些复杂的任务,使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。Future能干的,CompletableFuture都能干。
CompletableFuture异步线程发生异常,不会影响主线程,用来记录日志特别方便。
CompletableFuture为什么出现
get()方法在Future 计算完成之前会一直处在阻塞状态下,isDone()方法容易耗费CPU资源,
对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。
CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
CompletableFuture和CompletionStage
CompletionStage
CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如: stage.thenApply(x -> square(x)).thenAccept(×->System.out.print(x)).thenRun(( ->systeh.out.println())
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
CompletableFuture
在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture 的方法。
它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
它实现了Future和CompletionStage接口
核心的四个静态方法,来创建一个异步任务
从Java8开始引入了CompletableFuture,它是Future的功能增强版。减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
CompletableFuture的优点
异步任务结束时,会自动回调某个对象的方法;
主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
异步任务出错时,会自动回调某个对象的方法;
函数式编程已经主流
先说说join和get对比
说说你过去工作中的项目亮点?大厂业务需求说明
一波流Java8函数式编程带走-比价案例实战
Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程
案例精讲-从电商网站的比价需求讲起
需求说明
同一款产品,同时搜索出同款产品在各大电商平台的售价;
输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List《mysql》in jd price is 88.05
《mysql》in dangdang price is 86.11
《mysql》in taobao price is 90.43
解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表,
1 )step by step,按部就班,查完京东查淘宝,查完淘宝查天猫
2 )all in,万箭齐发,一口气多线程异步任务同时查询
public class CompletableFutureDemo {
static List<NetMall> list = Arrays.asList(
new NetMall("vip"),
new NetMall("jd"),
new NetMall("tb"),
new NetMall("pdd")
);
public static void main(String[] args) {
long cur1 = System.currentTimeMillis();
getPrice("Phone").forEach(r-> System.out.println(r));
System.out.println("getPrice耗时"+(System.currentTimeMillis()-cur1));
long cur2 = System.currentTimeMillis();
getPriceByCompletableFuture("Phone").forEach(r-> System.out.println(r));
System.out.println("getPriceByCompletableFuture耗时"+(System.currentTimeMillis()-cur2));
}
private static List<String> getPrice(String productName){
return list.stream()
.map(r->String.format(productName+" in %s price is %.2f",r.getName(),r.calcPrice(productName)))
.collect(Collectors.toList());
}
private static List<String> getPriceByCompletableFuture(String productName){
return list.stream()
.map(r-> CompletableFuture.supplyAsync(()->String.format(productName+" in %s price is %.2f",r.getName(),r.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s->s.join())
.collect(Collectors.toList());
}
}
class NetMall{
private String name;
public double calcPrice(String productName){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble(100000000)+productName.hashCode();
}
public NetMall(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
获得结果
public T get() 不见不散
public T get(long timeout,TimeUnit unit) 过时不候
public T join():join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
public T getNow(T valuelfAbsent):没有计算完成的情况下,给我一个替代结果。计算完,返回计算完成后的结果。立即获取结果不阻赛。没算完,返回设定的valuelfAbsent值
主动触发计算
public bgolean complete(T value)
是否打断get方法立即返回括号值
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello CompletableFuture";
});
System.out.println(completableFuture.getNow("心急吃不了热豆腐"));
System.out.println(completableFuture.get());
System.out.println(completableFuture.get(1500, TimeUnit.MILLISECONDS));
System.out.println(completableFuture.join());
System.out.println(completableFuture.complete("未雨绸缪")+"\t"+completableFuture.join());
}
}
thenApply
计算结果存在依赖关系,这两个线程串行化
异常相关:由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。
public class CompletableFutureTest2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 6;
},executorService).thenApply((r)-> {
int i=2/0;
return r * 5;
}).thenApply((r)-> {
System.out.println(r);
return r - 2;
}).whenComplete((v, e) -> {
System.out.println("计算结果:"+v);
}).exceptionally(e -> {
System.out.println(e.getMessage());
System.out.println(e);
return null;
});
System.out.println("============主线程==========");
executorService.shutdown();
}
}
发生异常后进入exceptionally代码块,但是thenApply中的代码不会执行,whenComplete依旧会执行
============主线程==========
计算结果:null
java.lang.ArithmeticException: / by zero
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
handle
计算结果存在依赖关系,这两个线程串行化
异常相关:有异常也可以往下一步走,根据带的异常参数可以进步处理
public class CompletableFutureTest2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 6;
},executorService).handle((r,e)-> {
int i=2/0;
return r * 5;
}).handle((r,e)-> {
System.out.println(r);
return r - 2;
}).whenComplete((v, e) -> {
System.out.println("计算结果:"+v);
}).exceptionally(e -> {
System.out.println(e.getMessage());
System.out.println(e);
return null;
});
System.out.println("============主线程==========");
executorService.shutdown();
}
}
发生异常后进入exceptionally代码块,但是handle和whenComplete依旧会执行
============主线程==========
null
计算结果:null
java.lang.NullPointerException
java.util.concurrent.CompletionException: java.lang.NullPointerException
接收任务的处理结果,并消费处理,无返回结果thenAccept
public class CompletableFutureTest3 {
public static void main(String[] args) {
CompletableFuture.supplyAsync(()->{
return 3;
}).thenApply(r->{
return r*8;
}).thenApply(r->{
return r/2;
}).thenAccept(r-> System.out.println(r));
System.out.println(CompletableFuture.supplyAsync(()->"6666").thenRun(()->{}).join());
System.out.println(CompletableFuture.supplyAsync(()->"6666").thenAccept(r-> System.out.println(r)).join());
System.out.println(CompletableFuture.supplyAsync(()->"6666").thenApply(r->r+"9999").join());
}
}
12
null
6666
null
66669999
completableFuture和线程池说明
以thenRun和thenRunAsync为例,有什么区别?
没有传入自定义线程池,都用默认线程池ForkJoinPool;
传入了一个自定义线程池,
如果你执行第一个任务的时候,传入了一个自定义线程池:
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
有可能处理太快,系统优化切换原则,直接使用main线程处理
其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理
applyToEither:谁快用谁
thenCombine:两个completionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其它分支任务
public class CompletableFutureTest4 {
public static void main(String[] args) {
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "1号选手";
});
CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "2号选手";
});
CompletableFuture<String> result = first.applyToEither(second, r -> r + "is winner");
CompletableFuture<String> res = first.thenCombine(second, (x, y) -> x + y);
System.out.println(result.join());
System.out.println(res.join());
}
}
1号选手is winner
1号选手2号选手
allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
public static void testAllOf(){
CompletableFuture<String> future1 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("future1执行完成");
});
CompletableFuture<String> future2 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("future2执行完成");
});
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2);
try {
all.get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
注:本文是学习B站周阳老师《尚硅谷2022版JUC并发编程》课程所做学习笔记。