• SpringBoot之异步方法


    1、Future

    Future代表异步计算的结果。提供了检查计算是否完成、等待其完成以及检索计算结果的方法。只有在计算完成后,才能使用方法get检索结果,如有必要,将其阻塞,直到准备就绪。取消是通过取消方法执行的。还提供了其他方法来确定任务是否正常完成或被取消。

    	//等待异步任务完成,然后检索其结果
        V get() throws InterruptedException, ExecutionException;
    	//最多等待给定的时间以完成计算,然后检索其结果
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    	//如果此任务已完成,则返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true
    	boolean isDone();
    
    
    	private static final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
            int i = 0;
    
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "test-" + i++);
            }
        });
    	
        public static void demo01() {
            log.info("创建异步任务");
            Future submit = executor.submit(new Callable() {
                @Override
                public String call() {
                    String result = "fail";
                    try {
                        log.info("开始执行异步任务");
                        // 执行任务耗时
                        Thread.sleep(10000);
                        result = "success";
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return result;
                }
            });
    
            try {
                String result = submit.get();
                log.info("获取异步任务结果 " + result);
            } catch (InterruptedException e) {
                System.out.println("中断异常");
            } catch (ExecutionException e) {
                System.out.println("执行异常");
            }
    
            log.info("Future的get方法,会使当前线程阻塞");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    在这里插入图片描述

        public static void demo02() throws InterruptedException, ExecutionException {
            log.info("创建异步任务");
            Future submit = executor.submit(new Callable() {
                @Override
                public String call() {
                    String result = "fail";
                    try {
                        log.info("开始执行异步任务");
                        // 执行任务耗时
                        Thread.sleep(10000);
                        result = "success";
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return result;
                }
            });
    
            log.info("轮询调用isDone方法查询异步任务是否完成");
            while (true) {
                if (submit.isDone()) {
                    String result = submit.get();
                    log.info(result);
                    break;
                } else {
                    log.info("异步任务还未完成,先干点别的事");
                    Thread.sleep(1000);
                }
            }
    
            log.info("Future的get方法,会使当前线程阻塞");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述
    使用Future,并不能实现真正的异步,要么需要阻塞的获取结果,要么不断的轮询

    2、CompletableFuture

    CompletableFuture实现了CompletionStage接口和Future接口,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

    	//创建带返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定		
    	public static  CompletableFuture supplyAsync(Supplier supplier) {
            return asyncSupplyStage(asyncPool, supplier);
        }
        public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor) {
            return asyncSupplyStage(screenExecutor(executor), supplier);
        }
        
    	//创建无返回值的异步任务,要么使用的默认线程池ForkJoinPool.commonPool(),要么入参时给定	
    	public static CompletableFuture runAsync(Runnable runnable) {
            return asyncRunStage(asyncPool, runnable);
        }
        public static CompletableFuture runAsync(Runnable runnable, Executor executor) {
            return asyncRunStage(screenExecutor(executor), runnable);
        }
    
    	//如果以任何方式完成,则返回true:正常、异常或通过取消
    	public boolean isDone() {
            return result != null;
        }
    	//等待此任务完成,然后返回其结果
        public T get() throws InterruptedException, ExecutionException {
            Object r;
            return reportGet((r = result) == null ? waitingGet(true) : r);
        }
    	//最多等待给定的时间,以完成此任务,然后返回其结果
        public T get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            Object r;
            long nanos = unit.toNanos(timeout);
            return reportGet((r = result) == null ? timedGet(nanos) : r);
        }
        //如果任务完成则返回结果集,否则返回给定的valueIfAbsent
        public T getNow(T valueIfAbsent) {
            Object r;
            return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    thenApply / thenAccept / thenRun

    在流式处理中,等待上层任务正常执行完成后,再执行回调方法;
    thenApply:上层任务的结果值作为回调方法的入参值,该回调方法有返回值
    thenAccept:上层任务的结果值作为回调方法的入参值,该回调方法没有返回值
    thenRun:没有入参也没有返回值的回调方法

        public  CompletableFuture thenApply(Function fn) {
            return uniApplyStage(null, fn);
        }
        public  CompletableFuture thenApplyAsync(Function fn) {
            return uniApplyStage(asyncPool, fn);
        }
        public  CompletableFuture thenApplyAsync(Function fn, Executor executor) {
            return uniApplyStage(screenExecutor(executor), fn);
        }
    
    	public CompletableFuture thenAccept(Consumer action) {
            return uniAcceptStage(null, action);
        }
        public CompletableFuture thenAcceptAsync(Consumer action) {
            return uniAcceptStage(asyncPool, action);
        }
        public CompletableFuture thenAcceptAsync(Consumer action, Executor executor) {
            return uniAcceptStage(screenExecutor(executor), action);
        }
    
    
    	public CompletableFuture thenRun(Runnable action) {
            return uniRunStage(null, action);
        }
        public CompletableFuture thenRunAsync(Runnable action) {
            return uniRunStage(asyncPool, action);
        }
        public CompletableFuture thenRunAsync(Runnable action, Executor executor) {
            return uniRunStage(screenExecutor(executor), action);
        }
    
    
        public static void demo03() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "success";
                }
            }, fixedThreadPool).thenApplyAsync((result) -> {
                log.info("上层任务结果: " + result);
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "over";
            }, fixedThreadPool);
    
            log.info("最终结果 = " + finalResult.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    在这里插入图片描述
    如果上层任务抛异常则不会进入回调方法中

        public static void demo03() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).thenApplyAsync((result) -> {
                log.info("上层任务结果: " + result);
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "over";
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在这里插入图片描述

    exceptionally

    上层任务执行中,若抛出异常可被该方法接收,异常即该方法的参数;
    若无异常,不会进入该方法并将上层的结果值继续下传。

        public static void demo03() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).exceptionally((exception) -> {
                try {
                    log.info("异常处理 " + exception);
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "exception";
            }).thenApplyAsync((result) -> {
                log.info("上层任务结果: " + result);
                try {
                    Thread.sleep((long) (Math.random() * 5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "over";
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    异常情况
    在这里插入图片描述
    正常情况
    在这里插入图片描述

    whenComplete

    接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法无返回值

        public CompletableFuture whenComplete(
            BiConsumer action) {
            return uniWhenCompleteStage(null, action);
        }
        public CompletableFuture whenCompleteAsync(
            BiConsumer action) {
            return uniWhenCompleteStage(asyncPool, action);
        }
        public CompletableFuture whenCompleteAsync(
            BiConsumer action, Executor executor) {
            return uniWhenCompleteStage(screenExecutor(executor), action);
        }
    
    
        public static void demo04() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).whenCompleteAsync((result, exception) -> {
                if (exception == null) {
                    log.info("上层任务无异常,获取到上层结果为:" + result);
                } else {
                    log.info("上层任务有异常,获取到上层结果为:" + result);
                }
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    无异常
    在这里插入图片描述
    有异常
    在这里插入图片描述

    handle

    接收上层任务的结果值和异常,若上层任务无异常,则异常参数为null,该方法有返回值

        public  CompletableFuture handle(
            BiFunction fn) {
            return uniHandleStage(null, fn);
        }
        public  CompletableFuture handleAsync(
            BiFunction fn) {
            return uniHandleStage(asyncPool, fn);
        }
        public  CompletableFuture handleAsync(
            BiFunction fn, Executor executor) {
            return uniHandleStage(screenExecutor(executor), fn);
        }
    
    
        public static void demo04() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture finalResult = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public String get() {
                    log.info("执行异步任务");
                    try {
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //有异常
                    //if (true) throw new RuntimeException("异常");
                    return "success";
                }
            }, fixedThreadPool).handleAsync((result, exception) -> {
                if (exception == null) {
                    log.info("上层任务无异常,获取到上层结果为:" + result);
                } else {
                    log.info("上层任务有异常,获取到上层结果为:" + result, exception);
                }
                return "handle " + result;
            }, fixedThreadPool);
    
            //异常通过future的get方法抛出,如果注释掉get的方法就不会打印异常信息
            log.info("最终结果 = " + finalResult.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    无异常
    在这里插入图片描述
    有异常
    在这里插入图片描述

    thenCombine / thenAcceptBoth / runAfterBoth

    将两个CompletableFuture组合起来,当这两个future都正常执行完了才会执行回调任务
    thenCombine:2个future的返回值作为回调方法的入参值,该回调方法有返回值
    thenAcceptBoth:2个future的返回值作为回调方法的入参值,该回调方法没有返回值
    runAfterBoth:没有入参也没有返回值

        public  CompletableFuture thenCombine(
            CompletionStage other,
            BiFunction fn) {
            return biApplyStage(null, other, fn);
        }
        public  CompletableFuture thenCombineAsync(
            CompletionStage other,
            BiFunction fn) {
            return biApplyStage(asyncPool, other, fn);
        }
        public  CompletableFuture thenCombineAsync(
            CompletionStage other,
            BiFunction fn, Executor executor) {
            return biApplyStage(screenExecutor(executor), other, fn);
        }
    
        public  CompletableFuture thenAcceptBoth(
            CompletionStage other,
            BiConsumer action) {
            return biAcceptStage(null, other, action);
        }
        public  CompletableFuture thenAcceptBothAsync(
            CompletionStage other,
            BiConsumer action) {
            return biAcceptStage(asyncPool, other, action);
        }
        public  CompletableFuture thenAcceptBothAsync(
            CompletionStage other,
            BiConsumer action, Executor executor) {
            return biAcceptStage(screenExecutor(executor), other, action);
        }
    
        public CompletableFuture runAfterBoth(CompletionStage other, Runnable action) {
            return biRunStage(null, other, action);
        }
        public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action) {
            return biRunStage(asyncPool, other, action);
        }
        public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) {
            return biRunStage(screenExecutor(executor), other, action);
        }
    
    
        public static void demo05() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("开始执行异步任务");
                        Thread.sleep((long) (Math.random() * 5000));
                        i = 1;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture supplyAsync2 = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("开始执行异步任务");
                        Thread.sleep((long) (Math.random() * 8000));
                        i = 2;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture thenCombineAsync = supplyAsync.thenCombineAsync(supplyAsync2, (a, b) -> {
                log.info("a = " + a + ", b = " + b);
                return a + b;
            }, fixedThreadPool);
            log.info("thenCombineAsync = " + thenCombineAsync.get());
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    在这里插入图片描述
    其中任意一个有异常都会导致thenCombineAsync方法不执行

    applyToEither / acceptEither / runAfterEither

    将两个CompletableFuture组合起来,只要有一个future正常执行完了就可以执行回调任务
    applyToEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法有返回值
    acceptEither:较快执行完的任务结果值作为回调方法的入参值,该回调方法没有返回值
    runAfterEither:只要有任务执行完就调用回调方法

        public  CompletableFuture applyToEither(
            CompletionStage other, Function fn) {
            return orApplyStage(null, other, fn);
        }
        public  CompletableFuture applyToEitherAsync(
            CompletionStage other, Function fn) {
            return orApplyStage(asyncPool, other, fn);
        }
        public  CompletableFuture applyToEitherAsync(
            CompletionStage other, Function fn, Executor executor) {
            return orApplyStage(screenExecutor(executor), other, fn);
        }
    
        public CompletableFuture acceptEither(
            CompletionStage other, Consumer action) {
            return orAcceptStage(null, other, action);
        }
        public CompletableFuture acceptEitherAsync(
            CompletionStage other, Consumer action) {
            return orAcceptStage(asyncPool, other, action);
        }
        public CompletableFuture acceptEitherAsync(
            CompletionStage other, Consumer action, Executor executor) {
            return orAcceptStage(screenExecutor(executor), other, action);
        }
    
        public CompletableFuture runAfterEither(CompletionStage other,Runnable action) {
            return orRunStage(null, other, action);
        }
        public CompletableFuture runAfterEitherAsync(CompletionStage other,Runnable action) {
            return orRunStage(asyncPool, other, action);
        }
        public CompletableFuture runAfterEitherAsync(CompletionStage other,Runnable action,Executor executor) {
            return orRunStage(screenExecutor(executor), other, action);
        }
    
    
        public static void demo06() throws ExecutionException, InterruptedException {
            log.info("创建异步任务");
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("执行异步任务");
                        Thread.sleep((long) (Math.random() * 5000));
                        i = 1;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture supplyAsync2 = CompletableFuture.supplyAsync(new Supplier() {
                @Override
                public Integer get() {
                    int i = 0;
                    try {
                        log.info("执行异步任务");
                        Thread.sleep((long) (Math.random() * 5000));
                        i = 2;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                }
            }, fixedThreadPool);
    
            CompletableFuture thenCombineAsync = supplyAsync.applyToEitherAsync(supplyAsync2, (result) -> {
                log.info("result " + result);
                return 3;
            }, fixedThreadPool);
    
            log.info("final result = " + thenCombineAsync.get());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    在这里插入图片描述
    在这里插入图片描述
    任意一个任务有异常,都不会进入applyToEitherAsync方法

    3、@Async

    基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作

    启用@Async注解

    package com.yzm.thread.async;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    @EnableAsync // 开启异步调用功能,即使@Async注解生效
    @Slf4j
    public class AsyncConfig implements AsyncConfigurer {
    
        @Bean(name = "default_async_pool", destroyMethod = "shutdown")
        public ThreadPoolTaskExecutor defaultAsyncPool() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            // 设置线程池前缀:方便排查
            executor.setThreadNamePrefix("default-async-");
            // 设置线程池的大小
            executor.setCorePoolSize(10);
            // 设置线程池的最大值
            executor.setMaxPoolSize(15);
            // 设置线程池的队列大小
            executor.setQueueCapacity(250);
            // 设置线程最大空闲时间,单位:秒
            executor.setKeepAliveSeconds(3000);
            // 饱和策略
            // AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常
            // CallerRunsPolicy:若已达到待处理队列长度,将由主线程直接处理请求
            // DiscardOldestPolicy:抛弃旧的任务;会导致被丢弃的任务无法再次被执行
            // DiscardPolicy:抛弃当前任务;会导致被丢弃的任务无法再次被执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            return executor;
        }
    
        @Bean(name = "another_async_pool", destroyMethod = "shutdown")
        public ThreadPoolTaskExecutor anotherAsyncPool() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setThreadNamePrefix("another-task-");
            executor.setCorePoolSize(3);
            executor.setMaxPoolSize(6);
            executor.setQueueCapacity(5);
            executor.setKeepAliveSeconds(10);
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            return executor;
        }
    
        /**
         * 自定义异步线程池,若不重写,则使用默认的
         */
        @Override
        public Executor getAsyncExecutor() {
            return defaultAsyncPool();
        }
    
        /**
         * 1.无参无返回值方法
         * 2.有参无返回值方法
         * 返回值为void的, 通过IllegalArgumentException异常, AsyncUncaughtExceptionHandler处理异常
         * 3.有参有返回值方法
         * 返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
         * 或者在调用方在调用Future.get时捕获异常进行处理
         */
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            System.out.println("正在处理无返回值的@Async异步调用方法");
            return (throwable, method, objects) -> {
                log.info("Exception message - " + throwable.getMessage());
                log.info("Method name - " + method.getName());
                for (Object param : objects) {
                    log.info("Parameter value - " + param);
                }
            };
        }
    
    }
    
    
    package com.yzm.thread.async;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.Future;
    
    @Slf4j
    @Component
    public class AsyncService {
    
        /**
         * 1.无参无返回值方法
         * 最简单的异步调用,返回值为void
         */
        @Async
        public void async() {
            log.info("无参无返回值方法,通过观察线程名称以便查看效果");
    //        int a = 1 / 0;
        }
    
        /**
         * 2.有参无返回值方法
         * 指定线程池
         *
         * @param i 传入参数
         */
        @Async("another_async_pool")
        public void async(int i) {
            log.info("有参无返回值方法, 参数={}", i);
        }
    
        /**
         * 3.有参有返回值方法
         *
         * @param i 传入参数
         * @return Future
         */
        @Async
        public Future asyncReturn(int i) throws InterruptedException {
            log.info("有参有返回值方法, 参数={}", i);
    //        int a = 1 / 0;
            Thread.sleep(100);
            return new AsyncResult("success:" + i);
        }
    
        /**
         * @Async  必须不同类间调用:
         */
        public void D() {
            log.info("在同类下调用 @Async 方法是同步执行的");
            async();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139

    调用无参无返回值的异步方法

    @Component
    public class AsyncDemo {
    
        private final AsyncService asyncService;
    
        public AsyncDemo(AsyncService asyncService) {
            this.asyncService = asyncService;
        }
    
        @PostConstruct
        public void demo() {
            asyncA();
        }
    
        public void asyncA() {
            asyncService.async();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述

    调用有参无返回值的异步方法并指定线程池

    AsyncService类

        /**
         * 2.有参无返回值方法
         * 指定线程池
         *
         * @param i 传入参数
         */
        @Async("another_async_pool")
        public void async(int i) {
            log.info("有参无返回值方法, 参数={}", i);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    AsyncDemo类

        @PostConstruct
        public void demo() {
    //        asyncA();
            asyncB(1);
        }
    
        public void asyncA() {
            asyncService.async();
        }
    
        public void asyncB(int i) {
            asyncService.async(i);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这里插入图片描述

    调用有参有返回值的异步方法

        public void asyncC(int i) {
            try {
                Future future = asyncService.asyncReturn(i);
                // 这里使用了循环判断,等待获取结果信息
                while (true) {
                    // 判断是否执行完毕
                    if (future.isDone()) {
                        System.out.println("执行完毕,结果为:" + future.get());
                        break;
                    }
                    System.out.println("还未执行完毕,请稍等。。。");
                    Thread.sleep(1000);
                }
            } catch (InterruptedException | ExecutionException e) {
                System.out.println("异步调用失败");
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述

    调用方法内部调用一个异步方法是不行的,仍是同步调用

    AsyncService类

        /**
         * @Async  必须不同类间调用:
         */
        public void D() {
            log.info("在同类下调用 @Async 方法是同步执行的");
            // 调用本类的异步方法
            async();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    AsyncDemo类

    	public void asyncD() {
            asyncService.D();
        }
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    异常处理

    AsyncConfig类

    	// 可处理无返回值的异步方法异常
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            System.out.println("正在处理无返回值的@Async异步调用方法");
            return (throwable, method, objects) -> {
                log.info("Exception message - " + throwable.getMessage());
                log.info("Method name - " + method.getName());
                for (Object param : objects) {
                    log.info("Parameter value - " + param);
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    AsyncService类

        /**
         * 1.无参无返回值方法
         * 最简单的异步调用,返回值为void
         */
        @Async
        public void async() {
            log.info("无参无返回值方法,通过观察线程名称以便查看效果");
            int a = 1 / 0;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    AsyncDemo类

        @PostConstruct
        public void demo() {
            asyncA();
    //        asyncB(1);
    //        asyncC(11);
    //        asyncD();
        }
    
        public void asyncA() {
            asyncService.async();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述
    有返回值的异步方法异常,需要手动try{}catch(){}处理

    事务处理机制

    在@Async标注的方法,同时也适用了@Transactional进行了标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。
    那该如何给这些操作添加事务管理呢?可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.
    例如:
    方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
    方法B,使用了@Async来标注, B中调用了C,C使用@Transactional做了标注,则可实现事务控制的目的。

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    (N-144)基于微信小程序在线订餐系统
    开始报名!龙蜥社区走进 Arm MeetUp 议程硬核剧透来了
    【TCP】确认应答 与 超时重传
    GStreamer 进阶
    [userfaultfd] 2019-BalsnCTF_KrazyNote
    【每日一题】打卡 47
    java Netty通信例子
    Tomcat下载及使用说明
    安装MongoDb(mac系统)步骤以及踩坑笔记(图文详解)
    不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
  • 原文地址:https://blog.csdn.net/m0_67400973/article/details/126114397