• JUC_Future异步回调


    概述

    关于Future接口

    其中ForkJoinTask类及其子类RecursiveTask类,RecursiveAction类,CountedCompleter类属于Fork/Join框架,SwingWorker类属于Swing应用开发,故在此不做阐述,本文主要讲解FutureTask类与CompletableFuture类

    FutureTask

    构造方法

    1. public FutureTask(Callable callable) {
    2. if (callable == null)
    3. throw new NullPointerException();
    4. this.callable = callable;
    5. this.state = NEW; // ensure visibility of callable
    6. }
    7. public FutureTask(Runnable runnable, V result) {
    8. this.callable = Executors.callable(runnable, result);
    9. this.state = NEW; // ensure visibility of callable
    10. }

    通过构造方法可以看出,可传入Callable接口或Runnable接口,但Runnable接口会被包装成Callable接口

    传入Runnable接口的同时还可传入参数result,即执行结果;因为Runnable接口中的run方法无返回值,而Callable接口中的call方法有返回值,所以需要返回result实现特殊的"run方法返回值",当然,若不需要特殊的返回值,可将result参数设置为null

    Executors.callable()

    1. public static Callable callable(Runnable task, T result) {
    2. if (task == null)
    3. throw new NullPointerException();
    4. return new RunnableAdapter(task, result);
    5. }

    RunnableAdapter(task,result)

    RunnableAdapter是Callable接口的实现类

    1. static final class RunnableAdapter implements Callable {
    2. final Runnable task;
    3. final T result;
    4. RunnableAdapter(Runnable task, T result) {
    5. this.task = task;
    6. this.result = result;
    7. }
    8. public T call() {
    9. task.run();
    10. return result;
    11. }
    12. }

    示例代码:以传入Callable接口为例

    1. public class Demo {
    2. public static void main(String[] args) {
    3. Future task = new FutureTask<>(()-> {
    4. System.out.println(Thread.currentThread().getName() + " starts working");
    5. //模拟线程工作耗时
    6. TimeUnit.SECONDS.sleep(3);
    7. return Thread.currentThread().getName() + " execution completed";
    8. });
    9. new Thread((Runnable) task).start();
    10. //确保执行异步任务的线程已开启
    11. try {
    12. TimeUnit.SECONDS.sleep(1);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. //调用者线程继续执行其他任务
    17. System.out.println(Thread.currentThread().getName());
    18. //获取异步任务回调的返回值
    19. try {
    20. System.out.println("task return : " + task.get());
    21. } catch (InterruptedException | ExecutionException e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. }

    运行效果

    可以看到是异步执行的,主线程与分支线程各行其事

    CompeletableFuture

    构造函数

    通常不通过构造函数创建CompeletableFuture对象,而是通过runAsync方法与supplyAsync方法

    1. public CompletableFuture() {
    2. }
    3. private CompletableFuture(Object r) {
    4. this.result = r;
    5. }

    runAsync方法:无返回值的异步回调

    1. public static CompletableFuture runAsync(Runnable runnable) {
    2. return asyncRunStage(asyncPool, runnable);
    3. }
    4. public static CompletableFuture runAsync(Runnable runnable,
    5. Executor executor) {
    6. return asyncRunStage(screenExecutor(executor), runnable);
    7. }

    任务执行完成后,可通过get方法获取任务的执行结果(CompletableFuture类的实例的result属性),但值为null

    翻看runAsync方法内部,发现Runnable接口的run方法在AsyncRun类的实例中被调用,紧接着调用completeNull方法将任务的执行结果设置为null(将CompletableFuture类的实例的result属性设置为null),所以get方法获取的执行结果为null

    Runnable接口的run方法无返回值,即使想将任务的执行结果设置为返回值也是无法办到的,所以通过runAsync方法实现的异步回调被称为无返回值的异步回调

    AsyncRun类实现了Runnable接口

    1. static CompletableFuture asyncRunStage(Executor e, Runnable f) {
    2. if (f == null) throw new NullPointerException();
    3. CompletableFuture d = new CompletableFuture();
    4. e.execute(new AsyncRun(d, f));
    5. return d;
    6. }
    1. static final class AsyncRun extends ForkJoinTask
    2. implements Runnable, AsynchronousCompletionTask {
    3. CompletableFuture dep; Runnable fn;
    4. AsyncRun(CompletableFuture dep, Runnable fn) {
    5. this.dep = dep; this.fn = fn;
    6. }
    7. public final Void getRawResult() { return null; }
    8. public final void setRawResult(Void v) {}
    9. public final boolean exec() { run(); return true; }
    10. public void run() {
    11. CompletableFuture d; Runnable f;
    12. if ((d = dep) != null && (f = fn) != null) {
    13. dep = null; fn = null;
    14. if (d.result == null) {
    15. try {
    16. f.run();
    17. //completeNull() : 以null值完成 除非已经完成
    18. /** Completes with the null value, unless already completed. */
    19. d.completeNull();
    20. } catch (Throwable ex) {
    21. d.completeThrowable(ex);
    22. }
    23. }
    24. d.postComplete();
    25. }
    26. }
    27. }

    示例代码

    1. public class Demo {
    2. public static void main(String[] args) {
    3. //泛型代表返回值的类型 runAsync方法无返回值就填Void
    4. CompletableFuture completableFuture = CompletableFuture.runAsync(()->{
    5. System.out.println(Thread.currentThread().getName() + " starts working");
    6. try {
    7. TimeUnit.SECONDS.sleep(3);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. });
    12. //确保执行异步任务的线程已开启
    13. try {
    14. TimeUnit.SECONDS.sleep(1);
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. System.out.println(Thread.currentThread().getName());
    19. //通过get方法获取任务的执行结果
    20. try {
    21. System.out.println("get() return : " + completableFuture.get());
    22. } catch (InterruptedException | ExecutionException e) {
    23. e.printStackTrace();
    24. }
    25. }
    26. }

    运行效果

    可以看到是异步执行的,主线程与分支线程各行其事

    supplyAsync方法:有返回值的异步回调

    1. public static CompletableFuture supplyAsync(Supplier supplier) {
    2. return asyncSupplyStage(asyncPool, supplier);
    3. }
    4. public static CompletableFuture supplyAsync(Supplier supplier,
    5. Executor executor) {
    6. return asyncSupplyStage(screenExecutor(executor), supplier);
    7. }

    任务执行完成后,可通过CompletableFuture类的实例的get方法获取任务的执行结果(CompletableFuture类的实例的result属性),值为Supplier接口的get方法的返回值

    翻看supplyAsync方法内部后可发现,Supplier接口的get方法在AsyncSupply类的实例中被调用,其返回值被completeValue方法设置为任务的执行结果(设置CompletableFuture类的实例的result属性),所以通过CompletableFuture类的实例的get方法可用获取Supplier接口的get方法的返回值

    由于任务的执行结果被设置为Supplier接口的get方法的返回值,所以通过supplyAsync方法实现的异步回调被称为有返回值的异步回调

    AsyncSupply类实现了Runnable接口

    1. static CompletableFuture asyncSupplyStage(Executor e,
    2. Supplier f) {
    3. if (f == null) throw new NullPointerException();
    4. CompletableFuture d = new CompletableFuture();
    5. e.execute(new AsyncSupply(d, f));
    6. return d;
    7. }
    1. static final class AsyncSupply extends ForkJoinTask
    2. implements Runnable, AsynchronousCompletionTask {
    3. CompletableFuture dep; Supplier fn;
    4. AsyncSupply(CompletableFuture dep, Supplier fn) {
    5. this.dep = dep; this.fn = fn;
    6. }
    7. public final Void getRawResult() { return null; }
    8. public final void setRawResult(Void v) {}
    9. public final boolean exec() { run(); return true; }
    10. public void run() {
    11. CompletableFuture d; Supplier f;
    12. if ((d = dep) != null && (f = fn) != null) {
    13. dep = null; fn = null;
    14. if (d.result == null) {
    15. try {
    16. //completeValue() : 以非异常结果完成 除非已经完成
    17. /** Completes with a non-exceptional result, unless already completed. */
    18. d.completeValue(f.get());
    19. } catch (Throwable ex) {
    20. d.completeThrowable(ex);
    21. }
    22. }
    23. d.postComplete();
    24. }
    25. }
    26. }

    示例代码

    一:直接通过get()方法获取任务执行结果

    1. public class Demo {
    2. public static void main(String[] args) {
    3. CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
    4. System.out.println(Thread.currentThread().getName() + "starts working");
    5. try {
    6. TimeUnit.SECONDS.sleep(3);
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. //任务正常完成的返回值
    11. return 1;
    12. });
    13. //确保执行任务的线程已开启
    14. try {
    15. TimeUnit.SECONDS.sleep(1);
    16. } catch (InterruptedException e) {
    17. e.printStackTrace();
    18. }
    19. System.out.println(Thread.currentThread().getName());
    20. //通过get方法获取任务的执行结果
    21. try {
    22. System.out.println("get() return : " + completableFuture.get());
    23. } catch (InterruptedException | ExecutionException e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. }

    运行效果

    可以看到是异步执行的,主线程与分支线程各行其事

    二:通过whenComplete方法或whenCompleteAsync方法获取执行结果

    whenComplete()        该任务(whenComplete)由当前线程执行
    whenCompleteAsync()   该任务(whenCompleteAsync)由其他线程池执行(可指定执行线程池 不指定则自动生成) 通常不选用该方法(因为交给线程池处理 需要切换一次上下文 会降低程序的运行效率)
    1. public class Demo {
    2. public static void main(String[] args) {
    3. CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
    4. System.out.println(Thread.currentThread().getName() + "starts working");
    5. try {
    6. TimeUnit.SECONDS.sleep(3);
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. //任务正常完成的返回值
    11. return 1;
    12. });
    13. //确保执行异步任务的线程已开启
    14. try {
    15. TimeUnit.SECONDS.sleep(1);
    16. } catch (InterruptedException e) {
    17. e.printStackTrace();
    18. }
    19. System.out.println(Thread.currentThread().getName());
    20. try {
    21. Integer result = completableFuture.whenComplete((success, fail) -> {
    22. System.out.println("success = " + success); //success记录任务正常完成的返回值
    23. System.out.println("fail = " + fail); //fail记录错误信息
    24. //执行时发生错误的处理
    25. }).exceptionally((e) -> {
    26. System.out.println("exception message : " + e.getMessage());
    27. System.out.println("stack message : " + Arrays.toString(e.getStackTrace()));
    28. return -1; //任务发生错误时的返回值(用于判断任务发生了错误)
    29. //通过get方法获取任务的执行结果
    30. }).get();
    31. System.out.println("get() return : " + result);
    32. } catch (InterruptedException | ExecutionException e) {
    33. e.printStackTrace();
    34. }
    35. }
    36. }

    运行效果

    可以看到是异步执行的,主线程与分支线程各行其事 

    人工加入错误的运行效果(int i = 1/0)

  • 相关阅读:
    1024程序员节主题征文 | 35种语言输出1024
    mac的终端显示分支名称?mac的终端和idea中的terminal同时修改
    java的匿名内部类的使用
    Linux之从进程角度来理解文件描述符
    【我的前端】CSS启示录:CSS写出超级美观的阴影效果
    Node.js 零基础入门 Node.js 零基础入门第二天 2.4 模块的加载机制
    java学生综合能力的教学评估系统ssm
    Docker-harbor私有仓库部署与管理
    GO面试题集锦
    塑钢门窗三位焊接机中工位设计
  • 原文地址:https://blog.csdn.net/Mudrock__/article/details/127080134