• CompletableFuture异步编程


    Future和Callable接口

    • Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
    • Callable接口中定义了需要有返回的任务需要实现的方法
    • join和get对比,get会抛出异常,join不需要
    • 示例代码如下
    /**
     * FutureTask & Callable
     * 异步任务执行
     */
    public class FutureTaskTest {
    
        public static void main(String[] args) {
            FutureTask<String> task = new FutureTask<>(new MyThread());
            //task.run(); 执行任务线程
            Thread t1 = new Thread(task, "t1");
            t1.start();
            try {
                String res = task.get(); //获取返回结果,容易导致程序执行阻塞
                System.out.println(res);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    class MyThread implements Callable<String>{
    
        @Override
        public String call() throws Exception {
            System.out.println("come in call()...");
            return "hello callable";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    CompletableFuture类

    这里是引用
    在这里插入图片描述
    在这里插入图片描述

    核心的四个静态方法

    //runAsync 无返回值
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
    
    //supplyAsync 有返回值  
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
    //没有指定Executor的方法,
    //直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    runAsync无返回值
    /**
     * CompletableFuture
     * 静态方法 runAsync 无返回值
     */
    public class CompletableFutureTest01 {
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(1); //创建线程池
    
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
                System.out.println("run task...");
                System.out.println(Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },threadPool); //默认有线程池,也可自定义指定
    
            try {
                System.out.println(completableFuture.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            threadPool.shutdown(); //关闭线程池
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    supplyAsync有返回值
    /**
     * CompletableFuture
     * 静态方法 supplyAsync 有返回值
     */
    public class CompletableFutureTest02 {
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(1); //创建线程池
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("run task...");
                System.out.println(Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "hello supplyAsync";
            },threadPool); //默认有线程池,也可自定义指定
    
            try {
                System.out.println(completableFuture.get()); //获取返回结果
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            threadPool.shutdown(); //关闭线程池
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    CompletableFuture常用方法

    supplyAsync有返回值,回调函数 & 异常处理
    /**
     * CompletableFuture
     * 静态方法 supplyAsync 有返回值
     * 回调函数 & 异常处理
     */
    public class CompletableFutureTest03 {
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(1); //创建线程池
            try {
    
                CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.println("run task...");
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int res = ThreadLocalRandom.current().nextInt(10);//产生随机数
                    //res/=0; //模拟异常
                    return res;
                },threadPool).whenComplete((v,e)->{ //任务执行完毕,回调函数
                    if (e==null){
                        System.out.println("任务执行完毕,没有出错!res="+v);
                    }
                }).exceptionally(e->{ //任务执行出现异常,异常处理
                    System.err.println("出现的异常:=>>"+e.getCause());
                    return null;
                });
    
                System.out.println(completableFuture.get()); //获取返回结果
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                threadPool.shutdown(); //关闭线程池
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    supplyAsync有返回值,返回结果处理
    • thenApply(无异常处理) & handle(有异常处理)
    /**
     * CompletableFuture
     * 返回结果处理 thenApply(无异常处理) & handle(有异常处理)
     *
     *             thenApply(f->{
     *                 return f+2;
     *             })
     *
     *            handle((v,e)->{
     *                 if (e==null){
     *                     return v+2;
     *                 }
     *                 System.err.println(e.getCause());
     *                 return null;
     *             })
     */
    public class CompletableFutureTest04 {
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(1); //创建线程池
            try {
    
                CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.println("run task...");
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int res = 1;
                    //res/=0; //模拟异常
                    return res;
                },threadPool).handle((v,e)->{
                    if (e==null){
                        return v+2;
                    }
                    System.err.println(e.getCause());
                    return null;
                }).whenComplete((v, e)->{ //任务执行完毕,回调函数
                    if (e==null){
                        System.out.println("任务执行完毕,没有出错!res="+v);
                    }
                }).exceptionally(e->{ //任务执行出现异常,异常处理
                    System.err.println("出现的异常:=>>"+e.getCause());
                    return null;
                });
    
                System.out.println(completableFuture.get()); //获取返回结果
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                threadPool.shutdown(); //关闭线程池
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    supplyAsync有返回值,返回结果消费 (消费完 无返回值)
    • thenRun & thenAccept
    /**
     * CompletableFuture
     * 返回结果消费 (消费完 无返回值)
     * thenRun & thenAccept
     */
    public class CompletableFutureTest05 {
    
        public static void main(String[] args) {
            //重新启动一个线程,无需上一个任务结果和传入参数
            System.out.println(CompletableFuture.supplyAsync(()->"A").thenRun(()->{}).join());
    
            //消费上一个任务结果
            System.out.println(CompletableFuture.supplyAsync(()->"A").thenAccept(r->{
                System.out.println(r);
            }).join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    supplyAsync有返回值,比较两个任务计算速度 (谁快)
    • applyToEither
    /**
     * CompletableFuture
     * 比较两个任务计算速度 (谁快)
     * applyToEither
     */
    public class CompletableFutureTest06 {
    
        public static void main(String[] args) {
            CompletableFuture<String> res1 = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "1号";
            });
    
            CompletableFuture<String> res2 =CompletableFuture.supplyAsync(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "2号";
            });
    
            CompletableFuture<String> ans = res1.applyToEither(res2, f -> {
                return f + "is win";
            });
    
            System.out.println(ans.join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    supplyAsync有返回值,合并两个任务计算结果
    • thenCombine
    /**
     * CompletableFuture
     * 合并两个任务计算结果
     * thenCombine
     */
    public class CompletableFutureTest07 {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> res1 = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 10;
            });
    
            CompletableFuture<Integer> res2 =CompletableFuture.supplyAsync(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 20;
            });
    
            CompletableFuture<Integer> ans = res1.thenCombine(res2,(x,y)->{
                return x+y;
            });
    
            System.out.println(ans.join());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    C++语法基础知识面经汇总
    skywalking9.4 链路追踪
    【WSL】SSH 远程连接及宿主机端口转发配置
    海量小文件数据传输如何确保安全性
    STM32将FreeRTOS移植到用CubeMX生成的HAL库中
    将Excel表中数据导入MySQL数据库
    docker下安装apollo多环境(DEV 和UAT)
    SQL 经典50题(题目+解答)(1)
    java基于springboot +vue的图书馆图书借阅系统
    unity学习之汇总
  • 原文地址:https://blog.csdn.net/qq_54429571/article/details/127697841