利用核心的四个静态方法创建一个异步操作 | 不建议用new
关键就是 |有没有返回值|是否用了线程池|
参数说明:
没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。
- public class CompletableFutureBuildDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
- System.out.println(Thread.currentThread().getName());
- //停顿几秒线程
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- System.out.println(voidCompletableFuture.get());
- }
- }
- //ForkJoinPool.commonPool-worker-9 //默认的线程池
- //null --- 没有返回值
- public class CompletableFutureBuildDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
-
- ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
-
- CompletableFuture
voidCompletableFuture = CompletableFuture.runAsync(() -> { -
- System.out.println(Thread.currentThread().getName());
- //停顿几秒线程
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- },executorService);
- System.out.println(voidCompletableFuture.get());
- executorService.shutdown();
- }
- }
- //pool-1-thread-1 ----指定的线程池
- //null ----没有返回值
- public class CompletableFutureBuildDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
-
- ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
-
- CompletableFuture
objectCompletableFuture = CompletableFuture.supplyAsync(()->{ - System.out.println(Thread.currentThread().getName());
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "helllo supplyasync";
- });
- System.out.println(objectCompletableFuture.get());
- }
- }
- //ForkJoinPool.commonPool-worker-9---------默认的线程池
- //helllo supplyasync-------------supplyasync有返回值了
- public class CompletableFutureBuildDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
-
- ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
-
- CompletableFuture
objectCompletableFuture = CompletableFuture.supplyAsync(()->{ - System.out.println(Thread.currentThread().getName());
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "helllo supplyasync";
- },executorService);
- System.out.println(objectCompletableFuture.get());
- executorService.shutdown();
- }
- }
- //pool-1-thread-1
- //helllo supplyasync-------------supplyasync有返回值了
CompletableFuture
通过whenComplete
来减少阻塞和轮询(自动回调)
- public class CompletableFutureUseDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture.supplyAsync(()->{
- System.out.println(Thread.currentThread().getName()+"--------副线程come in");
- int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return result;
- }).whenComplete((v,e) -> {//没有异常,v是值,e是异常
- if(e == null){
- System.out.println("------------------计算完成,更新系统updataValue"+v);
- }
- }).exceptionally(e->{//有异常的情况
- e.printStackTrace();
- System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
- return null;
- });
-
- //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
- //ForkJoinPool 类似于守护线程mian线程结束的太快,CompletableFuture还没执行完也会结束
- System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- //ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool)
- //main线程先去忙其他任务
- //------------------计算完成,更新系统updataValue3
int i = 10 / 0 ;
- public class CompletableFutureUseDemo
- {
- public static void main(String[] args) throws ExecutionException, InterruptedException
- {
-
- ExecutorService threadPool = Executors.newFixedThreadPool(3);
-
- try
- {
- CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + "----come in");
- int result = ThreadLocalRandom.current().nextInt(10);
- try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
- System.out.println("-----1秒钟后出结果:" + result);
- if(result > 2)
- {
- int i=10/0;
- }
- return result;
- },threadPool).whenComplete((v,e) -> {
- if (e == null) {
- System.out.println("-----计算完成,更新系统UpdateValue:"+v);
- }
- }).exceptionally(e -> {
- e.printStackTrace();
- System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());
- return null;
- });
-
- System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- threadPool.shutdown();
- }
-
-
- //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
- //try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
-
- }
-
- }
功能几乎一样,区别在于编码时是否需要抛出异常
get()方法需要抛出异常
join()方法不需要抛出异常
- public class Chain {
- public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - return "hello 12345";
- });
- System.out.println(completableFuture.get());
- }
-
- }
-
- public class Chain {
- public static void main(String[] args) {//不需要抛出异常
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - return "hello 12345";
- });
- System.out.println(completableFuture.join());
- }
- }
getNow调用的时候如果计算完了,就拿取这个计算完的值;否则就拿备胎值
1.获得结果和触发计算
获取结果
public T get() 不见不散,容易阻塞
public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常
public T join() 类似于get(),区别在于是否需要抛出异常
public T getNow(T valueIfAbsent) 没有计算完成的情况下,给一个替代结果
立即获取结果不阻塞
计算完,返回计算完成后的结果
没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
主动触发计算
public boolean complete(T value) 是否立即打断get()方法返回括号值
(执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
- public class CompletableFutureAPIDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture
uCompletableFuture = CompletableFuture.supplyAsync(() -> { - try {
- TimeUnit.SECONDS.sleep(2);//执行需要2秒
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "abc";
- });
-
- try {
- TimeUnit.SECONDS.sleep(1);//等待需要1秒
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxx
-
- //执2-等1 返回true+备胎值1111111
- // 反之 则是 false 输出 abc
-
- System.out.println(uCompletableFuture.complete("1111111")+"\t"+uCompletableFuture.get());
- }
- }
thenApply
计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。
- public class CompletableFutureDemo2
- {
- public static void main(String[] args) throws ExecutionException, InterruptedException
- {
- ExecutorService threadPool = Executors.newFixedThreadPool(3);
-
- //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
- CompletableFuture.supplyAsync(() -> {
- //暂停几秒钟线程
- try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
- System.out.println("111");
- return 1024;
- },threadPool ).thenApply(f -> {
- System.out.println("222");
- return f + 1;
- }).thenApply(f -> {
- //int age = 10/0; // 异常情况:那步出错就停在那步。
- System.out.println("333");
- return f + 1;
- }).whenCompleteAsync((v,e) -> {
- System.out.println("*****v: "+v);
- }).exceptionally(e -> {
- e.printStackTrace();
- return null;
- });
-
- System.out.println("-----主线程结束,END");
- threadPool .shutdown();
-
- }
- }
- //-----正常情况
- //111
- //222
- //333
- //----计算结果: 6
-
- //-----异常情况
- //111
- //异常.....
handle
类似于thenApply,但是有异常的话仍然可以往下走一步。
- public class CompletableFutureDemo2
- {
-
- public static void main(String[] args) throws ExecutionException, InterruptedException
- {
- //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
- // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
- CompletableFuture.supplyAsync(() -> {
- //暂停几秒钟线程
- try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
- System.out.println("111");
- return 1024;
- }).handle((f,e) -> {
- int age = 10/0;//异常语句
- System.out.println("222");
- return f + 1;
- }).handle((f,e) -> {
- System.out.println("333");
- return f + 1;
- }).whenCompleteAsync((v,e) -> {
- System.out.println("*****v: "+v);
- }).exceptionally(e -> {
- e.printStackTrace();
- return null;
- });
-
- System.out.println("-----主线程结束,END");
-
- // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
- try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
- }
- }
- //-----异常情况
- //111
- //333
- //异常,可以看到多走了一步333
接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口
thenAccept
不需要return
- public static void main(String[] args) throws ExecutionException, InterruptedException
- {
- CompletableFuture.supplyAsync(() -> {
- return 1;
- }).thenApply(f -> {
- return f + 2;
- }).thenApply(f -> {
- return f + 3;
- }).thenAccept(r -> System.out.println(r));
- }
- //6
- //消费一下,直接得到6
补充:Code之任务之间的顺序执行
1.thenRun
thenRun(Runnable runnable)
任务A执行完执行B,并且B不需要A的结果
2.thenAccept
thenAccept(Consumer action)
任务A执行完执行B,B需要A的结果,但是任务B无返回值
3.thenApply
thenApply(Function fn)
任务A执行完执行B,B需要A的结果,同时任务B有返回值
-
- System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
- //null
-
- System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
- //resultA打印出来的 null因为没有返回值
-
- System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
- //resultAresultB 返回值
上面的几个方法都有普通版本和后面加Async的版本
以 thenRun 和 thenRunAsync 为例,有什么区别?
先看结论
1. 没有传入自定义线程池,都用默认线程池ForkJoinPool
2. 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池
1.调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
2.调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
3.也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)
2-1
-
- public class CompletableFutureAPIDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService threadPool = Executors.newFixedThreadPool(5);
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(()->{ - try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
- return "abcd";
- },threadPool).thenRun(()->{
- try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
- }).thenRun(()->{
- try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
- }).thenRun(()->{
- try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
- });
- }
- }
- //1号任务 pool-1-thread-1
- //2号任务 pool-1-thread-1
- //3号任务 pool-1-thread-1
- //4号任务 pool-1-thread-1
2-2
-
- public class CompletableFutureAPIDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService threadPool = Executors.newFixedThreadPool(5);
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(()->{ - try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
- return "abcd";
- },threadPool).thenRunAsync(()->{
- try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
- }).thenRun(()->{
- try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
- }).thenRun(()->{
- try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
- });
- }
- }
- //1号任务 pool-1-thread-1
- //2号任务 ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
- //3号任务 ForkJoinPool.commonPool-worker-9
- //4号任务 ForkJoinPool.commonPool-worker-9
3
- public class CompletableFutureAPIDemo {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService threadPool = Executors.newFixedThreadPool(5);
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(()->{ - // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
- return "abcd";
- },threadPool).thenRun(()->{
- // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
- }).thenRun(()->{
- // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
- }).thenRun(()->{
- //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
- System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
- });
- }
- }
- //1号任务 1号任务 pool-1-thread-1
- //2号任务 main
- //3号任务 main
- //4号任务 main
applyToEither
方法,那个快用哪个- public class CompletableFutureDemo2 {
- public static void main(String[] args) throws ExecutionException, InterruptedException
- {
- CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
- //暂停几秒钟线程
- try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
- return "play1 ";
- });
-
- CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
- try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
- return "play2";
- });
-
- CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用
- return f + " is winner";
- });
-
- System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
- }
- }
- //ForkJoinPool.commonPool-worker-9 ---come in
- //ForkJoinPool.commonPool-worker-2 ---come in
- //main play2 is winner
thenCombine
合并
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
先完成的先等着,等待其它分支任务
- public class CompletableFutureDemo2
- {
- public static void main(String[] args) throws ExecutionException, InterruptedException
- {
- CompletableFuture
completableFuture1 = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
- return 10;
- });
-
- CompletableFuture
completableFuture2 = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
- return 20;
- });
-
- CompletableFuture
thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> { - System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
- return x + y;
- });
-
- System.out.println(thenCombineResult.get());
- }
- }
- //30
- public class CompletableFutureDemo2
- {
- public static void main(String[] args) throws ExecutionException, InterruptedException
- {
- CompletableFuture
thenCombineResult = CompletableFuture.supplyAsync(() -> { - System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
- return 10;
- }).thenCombine(CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
- return 20;
- }), (x,y) -> {
- System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
- return x + y;
- }).thenCombine(CompletableFuture.supplyAsync(() -> {
- System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
- return 30;
- }),(a,b) -> {
- System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
- return a + b;
- });
- System.out.println("-----主线程结束,END");
- System.out.println(thenCombineResult.get());
-
-
-
- }
- }