• 【多线程】优雅使用线程池结合CompletableFuture实现异步编排


    参考

    Java中线程池,你真的会用吗?

    深入理解线程池及相关面试题

    线程池创建之后,会立即创建核心线程吗

    【Java多线程】CompletableFuture实现多线程异步编排

    另外一种接收文件集合上传返回图片链接的上传方式:【OSS】SpringBoot搭配线程池整合阿里云OSS实现图片异步上传

    1、线程池引入

    所谓线程池,通俗来讲,就是一个管理线程的池子。它可以容纳多个线程,其中的线程可以反复利用,省去了频繁创建线程对象的操作。

    在 Java 并发编程框架中的线程池是运用场景最多的技术,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来至少以下4个好处:

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;

    • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行;

    • 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

    • 提供更强大的功能,比如延时定时线程池;

    2、Executors

    2.1、概述


    Executors 是一个Java中的工具类。提供工厂方法来创建不同类型的线程池。

    核心概念:这四个线程池的本质都是ThreadPoolExecutor对象:

    • newFiexedThreadPool(int Threads): 创建固定数目线程的线程池。

    • newCachedThreadPool(): 创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

    • newSingleThreadExecutor(): 创建一个单线程化的Executor。

    • newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

    2.2、Executors缺陷


    在阿里巴巴Java开发手册中明确指出,不允许使用Executors创建线程池,这是因为使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出)。

    img

    3、优雅的创建线程池

    3.1、正确挑选方法


    避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    上面放出来的是ThreadPoolExecutor的全参构造函数,其中的参数分别为:

    • corePoolSize:线程池中的核心线程数。指定线程数回一直存在与线程池中,除非设置了 allowCoreThreadTimeOut参数。当创建完成之后就会准备好等待接收异步任务去执行;
    • maximumPoolSize:最大线程数。当请求的线程超过最大线程数时,将会扩充线程数量到最大线程数,但不会无限扩充,达到控制资源的效果;
    • keepAliveTime:非核心线程的存活时间。如果当前存活的线程数量大于核心线程数corePoolSize,则会释放空闲的线程直到线程数回到最大线程数corePoolSize
    • unit:keepAliveTime 参数的时间单位,如TimeUnit.SECONDS
    • workQueue:阻塞队列。用于保存多余的任务,如果任务很多,就会将多的任务存放进队列中,只要有空闲的线程就会去队列中取出新的任务执行直到队列为空;
    • threadFactory:线程池工厂,标识线程,即为线程起一个具有意义的名称,可自定义;
    • handler:拒绝策略。如果阻塞队列满了,就会按照我们指定的拒绝策略拒绝后续任务,默认为丢弃任务。

    在线程创建过程中有一个细节,即创建阻塞队列时,队列默认的最大值为Integer的最大值,这很显然是不合理的,容易内存不够造成oom,因此一般都需要在创建时设定容量,如new LinkedBlockingDeque<>(1000)

    3.2、线程池配置类


    在开发过程中一般会将线程池的创建抽取成一个配置类,其中的各类参数则会配置在配置文件中去。

    这里有个细节,就是创建线程池的时候并不会立马准备好corePoolSize数量的线程来准备接收任务,而是要等到有任务提交时才会启动。

    这一部分在下面的4、线程池执行流程/线程池创建中也有提及,这里使用了prestartCoreThread方法在初始化线程池的时候开启一个核心线程,避免在执行异步操作的时候初始化核心线程耗时巨大(可自行尝试,在后面的例子中因为加上了这一方法,接口的耗时减少了50倍

    @Configuration
    public class ThreadPoolConfig {
    
        @Bean
        public ThreadPoolExecutor threadPoolExecutor(
                @Value("${thread.pool.coreSize}") Integer coreSize,
                @Value("${thread.pool.maxSize}") Integer maxSize,
                @Value("${thread.pool.keepalive}") Integer keepalive,
                @Value("${thread.pool.blockQueueSize}") Integer blockQueueSize
        ) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    coreSize,
                    maxSize,
                    keepalive,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(blockQueueSize),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
            executor.prestartCoreThread();
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4、线程池执行流程

    当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?下面就先来看一下它的主要处理流程。

    img

    下面详细介绍线程池的详细运行流程:

    1. 线程池创建,但是并不会立马准备好corePoolSize数量的线程来准备接收任务,线程并不会立即启动,而是要等到有任务提交时才会启动。除非调用了prestartCoreThread/prestartAllCoreThreads 事先启动核心线程:

      1.1. prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new t asks are executed;

      1.2. prestartAllCoreThreads:Starts all core threads。

    2. 任务到来,用准备好的corePoolSize个空闲线程执行:

      2.1. 核心线程数已满,就将再进来的任务放入阻塞队列中,期间如果运行中的线程小于核心线程数时,就会去阻塞队列中获取任务执行;

      2.2. 阻塞队列已满,就会创建新线程去执行阻塞队列中的任务,但最大只能创建到最大线程数maximumPoolSize

      2.3. 存活且运行的线程数达到最大线程数maximumPoolSize,即线程已满时,根据设定的拒绝策略handler来对后来任务进行相应处理;

      2.4. 当所有线程都执行完,在指定时间keepAliveTime之后,将会自动销毁线程,最终保持在corePoolSize大小。

    3. 在线程创建过程中,所有的线程都由指定的工厂threadFactory进行创建,并为线程设置标识,即起名。

    线程池场景模拟:

    一个线程池corePoolSize为7,maximumPoolSize为20,阻塞队列最大50,100并发进来怎么分配的?

    答案:先有7个线程能够直接处理7个任务,接下来50个进入队列排队,再多开13个继续执行。此时所有线程池和阻塞队列都已满,但只有70个被安排上,剩下的30个走设定好的拒绝策略进行相对应操作。

    最终以一张图来总结和概括下线程池的执行示意图:

    image-20210322145600064

    5、CompletableFuture

    5.1、概述


    Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

    CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

    CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

    img

    5.2、核心三词


    CompletableFuture中API众多,其中对于线程串行化方法的可大致分为三种类型,而三种类型对应着三种不同的单词前缀:

    • run:不能接收上次任务的执行结果,也没有返回值;
    • accept:可以接收上次任务的执行结果,但没有返回值;
    • apply:可以接收上次任务的执行结果,也拥有返回值。

    5.4、单异步任务


    5.4.1、runAsync

    runAsync为单个异步任务中其中之一的API,其没有返回值。

    public void testRunAsync() {
        for (int i = 0; i < 5; i++) {
            System.err.println("第" + i + "个循环开始……");
            CompletableFuture.runAsync(() -> {
                System.out.println("当前线程" + Thread.currentThread().getId());
                int calc = 10 / 2;
                System.out.println("运行结果:" + calc);
            }, executor);
            System.err.println("第" + i + "个循环结束……");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    运行结果:

    image-20220914180521900

    5.4.2、supplyAsync

    supplyAsync相对于前面的runAsyns的方法,则是多了一个返回值,其可以结合以下两个方法进行使用:

    • whenComplete:能感知异常,能感知结果,但无返回值。当执行完成supplyAsync时会执行方法中的逻辑;
    • exceptionally:能感知异常,不能感知结果,有返回值。当执行supplyAsync时出现异常终端之后会先执行whenComplete方法再执行本方法,对异常进行处理;
    • handle:相当于整合了上面的两个方法,即可感知结果,也可处理异常。
    public void testSupplyAsync() {
        CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int calc = 10 / 0;
            System.out.println("运行结果:" + calc);
            return calc;
        }, executor).whenComplete((res, exception) -> {
            // 当执行完成之后获取任务执行结果和异常
            System.out.println("异步任务成功完成...结果是:" + res + ";异常是:" + exception);
        }).exceptionally(throwable -> {
            // 异常之后对结果进行处理
            return 10;
        });
        System.out.println(exceptionally.join());
        System.err.println("=====不严谨的分割线=====");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).handle((res, exception) -> {
            // 当执行完成之后获取任务执行结果和异常
            if (exception != null) {
                // 存在异常,处理结果
                return 0;
            } else {
                // 正常运行,返回正确处理后的结果
                return res;
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    运行结果:

    image-20220914181535725

    5.4、双异步任务


    5.4.1、thenRunAsync

    thenRunAsync是CompletableFuture中线程串行化方法中的其中之一,其不能接收上一次的执行结果,也没返回值。

    public void testThenRunAsync() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenRunAsync(() -> {
            System.out.println("任务2启动了...");
        }, executor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    运行结果:

    image-20220914183412536

    5.4.2、thenAcceptAsync

    正如5.2中所说,accept与上方的run不同,是可以获取到返回值的,但是其本身则是没有返回值。

    public void testThenAcceptAsync() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenAcceptAsync(res -> {
            System.out.println("任务2启动了..." + res);
        }, executor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    运行结果:

    image-20220914183702640

    5.4.3、thenApplyAsync

    同样如5.2中所说,apply是拥有返回值,同样与前面的两个不同,其是存在返回值的。

    public void test() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任务2启动了..." + res);
            return "hello " + res;
        }, executor);
        System.out.println(future.join());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    运行结果:

    image-20220914183947520

    5.5、多异步任务


    5.5.1、前置准备

    因为是多异步任务的形式,因此在这里先准备好三个任务供下面的共用。

    public CompletableFuture<Object> getFuture01() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("任务1结束:");
            return i;
        }, executor);
    }
    public CompletableFuture<Object> getFuture02() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程" + Thread.currentThread().getId());
            try {
                Thread.sleep(3000);
                System.out.println("任务2结束:");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, executor);
    }
    
    public CompletableFuture<Object> getFuture03() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3线程" + Thread.currentThread().getId());
            System.out.println("任务3结束:");
            return "haha";
        }, executor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    5.5.2、runAfterBothAsync

    runAfterBothAsync是针对于有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行后续任务,不感知任务1、2的结果的,也没返回值。

    public void testRunAfterBothAsync() {
        System.out.println("starting……");
        // 等待任务一和任务二完成之后再执行action即箭头函数中的任务
        CompletableFuture<Void> future = getFuture01().runAfterBothAsync(getFuture02(), () -> {
            System.out.println("任务一和任务二都完成了");
        }, executor);
        System.out.println("ending……");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    运行结果:

    image-20220915014139132

    5.5.3、thenAcceptBothAsync

    thenAcceptBothAsync同样是针对有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,但没返回值。

    public void testRun() {
        System.out.println("start……");
        CompletableFuture<Object> future01 = getFuture01();
        CompletableFuture<Object> future02 = getFuture02();
        // 等待任务一和任务二完成之后再执行action即箭头函数中的任务,且可以获取到任务一和任务二的返回结果
        future01.thenAcceptBothAsync(future02, (f1, f2) -> {
            System.out.println("任务一和任务二都完成了");
            System.out.println("任务一拿到结果:" + f1);
            System.out.println("任务二拿到结果:" + f2);
        }, executor);
        System.out.println("ending……");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    运行结果:

    image-20220915014737705

    5.5.4、thenCombineAsync

    thenCombineAsync也是针对有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,而且自己可以带返回值。

    public void testThenCombineAsync() {
        System.out.println("start……");
        CompletableFuture<Object> future01 = getFuture01();
        CompletableFuture<Object> future02 = getFuture02();
        // 等待任务一和任务二完成之后再执行action即箭头函数中的任务,且可以获取到任务一和任务二的返回结果并返回结果
        CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            System.out.println("任务一和任务二都完成了");
            return "任务一拿到结果:" + f1 + "任务二拿到结果:" + f2;
        }, executor);
        System.out.println("ending……");
        System.out.println(future.join());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    运行结果:

    image-20220915015224554

    5.5.5、runAfterEitherAsync

    runAfterBothAsync不同的是,runAfterEitherAsync两个任务只要有一个完成,就执行任务3,不感知结果,自己没返回值。

    public void testRunAfterEitherAsync() {
        System.out.println("starting……");
        // 等待任务一和任务二完成之后再执行action即箭头函数中的任务
        getFuture01().runAfterEitherAsync(getFuture02(), () -> {
            System.out.println("任务一和任务二有其中一个完成了");
        }, executor);
        System.out.println("ending……");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    运行结果:

    image-20220915020455200

    5.5.6、acceptEitherAsync

    thenAcceptBothAsync不同的是,acceptEitherAsync两个任务只要有一个完成,就执行任务3,感知结果,自己没返回值。

    public void testAcceptEitherAsync() {
        System.out.println("starting……");
        // 等待任务一和任务二完成之后再执行action即箭头函数中的任务
        getFuture01().acceptEitherAsync(getFuture03(), (res) -> {
            System.out.println("任务一和任务三有其中一个完成了");
            System.out.println(res);
        }, executor);
        System.out.println("ending……");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    运行结果:

    image-20220915020617498

    5.5.7、applyToEitherAsync

    thenCombineAsync不同的是,applyToEitherAsync两个任务只要有一个完成,就执行任务3,感知结果,自己有返回值。

    public void testApplyToEitherAsync() {
        System.out.println("starting……");
        // 等待任务一和任务二完成之后再执行action即箭头函数中的任务
        CompletableFuture<Object> future = getFuture01().applyToEitherAsync(getFuture03(), (res) -> {
            System.out.println("任务一和任务三有其中一个完成了");
            return res;
        }, executor);
        System.out.println("ending……");
        System.out.println("执行完成的结果为:" + future.join());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    运行结果:

    image-20220915020712534

  • 相关阅读:
    ActiveMQ、RabbitMQ、RocketMQ、Kafka介绍
    Jackson公司蛋白质印迹指南丨样品制备
    软件开发毕业4年后,靠自学自动化测试月入2W,本人亲身经历供大家参考
    pytest框架之fixture测试夹具详解
    【RL+Transformer综述】A Survey on Transformers in Reinforcement Learning论文笔记
    【甄选靶场】Vulnhub百个项目渗透——项目二十八:zico2-1(目录遍历,sqlite数据库写入,脏牛提权)
    最近少更新的原因
    thinkphp6 入门(4)--数据库操作 增删改查
    QML(26)——多层qml界面传递信号
    ZooKeeper常见面试题
  • 原文地址:https://blog.csdn.net/Aqting/article/details/126863478