• JUC实战经验-CompletableFuture 异步编程利器


    开发中为什么使用线程池

    • 降低资源的消耗
      • 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
    • 提高响应速度
      • 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
    • 提高线程的可管理性
      • 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
    • 控制并发数
      • 线程池可以限制并发线程的数量,避免系统过载。通过设置线程池的核心线程数、最大线程数、队列大小等参数,可以控制系统能够同时处理的请求数量,防止资源耗尽和性能下降。

    使用线程池可以提高系统的资源利用率、响应速度和并发控制,同时也简化了线程管理和资源管理的复杂性,是开发中常用的并发编程工具。

    常见线程池

    • newCachedThreadPool
      • 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若
        无可回收,则新建线程。
    • newFixedThreadPool
      • 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    • newScheduledThreadPool
      • 创建一个定长线程池,支持定时及周期性任务执行。
    • newSingleThreadExecutor
      • 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
    • ThreadPoolExecutor
      • ThreadPoolExecutor 是 Java 标准库 java.util.concurrent 包中提供的类,是 Java 原生的线程池实现。它是一个基本的线程池实现,提供了一系列构造函数和属性来配置线程池的核心属性,如核心线程数、最大线程数、阻塞队列、拒绝策略等。由于是原生的线程池实现,使用起来相对灵活,但需要手动配置参数和管理线程池的生命周期。
    • ThreadPoolTaskExecutor
      • ThreadPoolTaskExecutor 是 Spring 框架中的类,位于 org.springframework.scheduling.concurrent 包下。它是 Spring 对 ThreadPoolExecutor 进行了封装和增强,提供了更多功能和便利的配置选项,使得使用更加方便和高级。ThreadPoolTaskExecutor 实现了 AsyncTaskExecutor 接口,可以很方便地在 Spring 中使用异步任务。它还支持 Spring 的事务管理,可以将异步任务与事务结合,保证在同一个事务中执行。

    线程池的核心属性

    核心线程数(Core Pool Size):指定线程池中的核心线程数量,即线程池保持的最小线程数。即使线程处于空闲状态,核心线程也不会被回收。当有新的任务提交时,如果当前线程数小于核心线程数,将会创建新的线程来执行任务。

    最大线程数(Maximum Pool Size):指定线程池中允许的最大线程数。当任务提交的数量超过核心线程数且工作队列已满时,线程池会创建新的线程来执行任务,直到达到最大线程数。超过最大线程数的任务将会被拒绝执行,可以根据具体需求选择合适的值。

    空闲线程存活时间(Keep-Alive Time):当线程池中的线程数大于核心线程数,并且处于空闲状态时,空闲线程的存活时间。超过存活时间后,空闲线程将会被回收,以减少资源消耗。

    阻塞队列(Blocking Queue):用于存储等待执行的任务的队列。当任务提交的数量超过核心线程数时,超过核心线程数的任务会被放入阻塞队列中等待执行。常用的阻塞队列有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等,可以根据需求选择合适的队列类型和大小。

    线程工厂(Thread Factory):用于创建线程的工厂类。线程池通过线程工厂来创建新的线程对象。可以自定义线程工厂来设置线程名称、优先级等属性。

    拒绝策略(Rejected Execution Policy):当线程池无法接受新的任务时,即达到了最大线程数且阻塞队列已满时,定义了如何处理被拒绝的任务。常见的拒绝策略有抛出异常、丢弃任务、丢弃最旧的任务、将任务回退给调用者等。

    线程池执行步骤

    1. 核心线程处理:如果线程池中的线程数小于核心线程数,线程池会创建新的线程来执行任务。

    2. 阻塞队列存储:如果线程池中的线程数已经达到核心线程数,并且有新的任务提交,但是阻塞队列未满,线程池会将任务放入阻塞队列中等待执行。

    3. 创建非核心线程:如果线程池中的线程数已经达到核心线程数,并且阻塞队列已满,但是线程池中的线程数还没有达到最大线程数,线程池会创建新的非核心线程来执行任务。

    4. 执行拒绝策略:如果线程池中的线程数已经达到最大线程数,并且阻塞队列已满,此时线程池无法接受新的任务,会根据配置的拒绝策略来处理被拒绝的任务。常见的拒绝策略有抛出异常、丢弃任务、丢弃最旧的任务、将任务回退给调用者等。

    如果线程池无法处理提交的任务,一般会采取以下几种处理方式:

    1. 抛出异常:使用默认的拒绝策略,将无法处理的任务抛出一个 RejectedExecutionException 异常。

    2. 丢弃任务:使用 ThreadPoolExecutor.DiscardPolicy 拒绝策略,直接丢弃无法处理的任务,不做任何处理。

    3. 丢弃最旧的任务:使用 ThreadPoolExecutor.DiscardOldestPolicy 拒绝策略,丢弃阻塞队列中最旧的任务,然后尝试再次提交当前任务。

    4. 调用者运行:使用 ThreadPoolExecutor.CallerRunsPolicy 拒绝策略,将任务退回给提交任务的线程来执行,由提交任务的线程直接执行该任务。

    5. 自定义拒绝策略:根据具体需求,可以自定义拒绝策略来处理无法处理的任务,例如将任务记录到日志中、进行异步处理等。

      • DiscardOldestPolicy丢弃最旧的任务
      • AbortPolicy 丢弃新任务并抛出异常
      • CallerRunsPolicy 峰值同步调用
      • DiscardPolicy 丢弃新任务不抛异常
      • 一个线程池core 7; max 20 , queue: 5e,100并发进来怎么分配的;
        7个会立即得到执行,50个会进入队列,再开13个进行执行。剩下的30个就使用拒绝策略。
      • CPU密集型:核心线程数 = CPU核数 + 1
      • IO密集型:核心线程数 = CPU核数 * 2

    我们异步执行一个任务时,一般是用线程池Executor去创建。如果不需要有返回值,任务实现Runnable接口;如果需要有返回值,任务实现Callable接口,调用Executor的submit方法,再使用Future获取即可。如果多个线程存在依赖组合的话,则可以使用CompeletableFuture。

    Future和Callable接口

    因为CompletableFuture实现了Future接口,我们先从Future接口说起

    Future是Java5新加的一个接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。

    如下例,假设我们有两个任务服务,一个是sku基本信息获取,一个是获取spu的销售属性组合。

    public class GoodsSkuService {
    
        public GoodsSkuInfo getGoodsSkuInfo(Long userId) throws InterruptedException {
            Thread.sleep(300);//模拟调用耗时  一般是查数据库,或者远程调用返回的结果
        }
    }
    
    public class GoodsSpuService {
    
        public GoodsSpuInfo getGoodsSpu(long userId) throws InterruptedException {
            Thread.sleep(500); //模拟调用耗时
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    接下来,我们来演示下,在主线程中是如何使用Future来进行异步调用的。

    public class FutureTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //调用用户服务sku基本信息获取
            FutureTask<GoodsSkuInfo> goodsSkuInfo= new FutureTask<>(new Callable<GoodsSkuInfo>() {
                @Override
                public GoodsSkuInfocall() throws Exception {
                    return goodsSkuInfoService.getGoodsSkuInfo(goodsId);
                }
            });
            executorService.submit(userInfoFutureTask);
            Thread.sleep(300); //模拟主线程其它操作耗时
            FutureTask<GoodsSpuInfo> goodsSpuInfo= new FutureTask<>(new Callable<GoodsSpuInfo>() {
                @Override
                public GoodsSkuInfocall() throws Exception {
                    return goodsSpuInfoService.getGoodsSpuInfo(goodsId);
                }
            });
            executorService.submit(medalInfoFutureTask);
            GoodsSkuInfo goodsSkuInfo= userInfoFutureTask.get();//sku基本信息获取
            GoodsSpuInfo goodsSpuInfo= medalInfoFutureTask.get();//获取spu的销售属性组合。
        }
    }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    future+线程池异步配合,可以提高程序的执行效率,但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

    • Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞。
    • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

    阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

    对Future的改进

    CompletableFuture

    • 在java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处埋计算结果,也提供了转换和组合CompletableFuture的方法。
    • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。

    上述例子可以改写成:

    ublic class FutureTest {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
    
            //调用用户服务sku基本信息获取
           FutureTask<GoodsSkuInfo> completablegoodsSkuInfo = CompletableFuture.supplyAsync(() ->  goodsSkuInfoService.getGoodsSkuInfo(goodsId));
    
            Thread.sleep(300); //模拟主线程其它操作耗时
    
            FutureTask<GoodsSpuInfo> completablegoodsSpuInfo = CompletableFuture.supplyAsync(() -> goodsSpuInfoService.getGoodsSpuInfo(goodsId)); 
    		GoodsSkuInfo goodsSkuInfo= completablegoodsSkuInfo Future.get(2,TimeUnit.SECONDS);;//sku基本信息获取
            GoodsSpuInfo goodsSpuInfo= completablegoodsSpuInfo Future.get();//获取spu的销售属性组合。
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    可以发现,使用CompletableFuture,代码简洁了很多。CompletableFuture的supplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool。

    CompletionStage(不做详细叙述)

    • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
    • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。.比如:stage.thenApply(X -> square(x).thenAccept( x -System.out.print(x) ).thenRun(() -> System.out.printIn())
    • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

    CompletableFuture的优点

    异步任务结束时,会自动回调某个对象的方法;
    异步任务出错时,会自动回调某个对象的方法;
    主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。

    核心的四个静态方法

    CompletableFuture提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、任务异步回调、多个任务组合处理等方面。再次仅仅比较重要的四个核心的静态方法。
    1、runAsync 无 返回值

    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)  
    
    • 1
    • 2

    代码示例

    public class CompletableFutureDemo3{
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName()+"\t"+"-----come in");
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("-----task is over");
            });
            System.out.println(future.get());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2、supplyAsync 有 返回值

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
    
    • 1
    • 2

    代码示例

    public class CompletableFutureDemo3{
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
                //暂停几秒钟线程
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return ThreadLocalRandom.current().nextInt(100);
            });
            System.out.println(completableFuture.get());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    上述Executor executor参数说明

    没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

    计算完成时回调方法

    public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
    public CompletableFuture<T> whencompleteAsync(BiConsumer<? super T,? super Throwable> action);
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,super Throwable> action,Executor executor);
    public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
    
    • 1
    • 2
    • 3
    • 4

    whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
    whenComplete和whenCompleteAsync的区别:

    • whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
    • whenCompleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池来进行执行。

    方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

    减少阻塞和轮询

    从Java8开始引入了CompletableFuture,它是Future的功能增强版,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

    public class CompletableFutureDemo3{
        public static void main(String[] args) throws Exception{
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("-----计算结束耗时1秒钟,result: "+result);
                if(result > 6){
                    int age = 10/0;
                }
                return result;
            }).whenComplete((v,e) ->{
                if(e == null){
                    System.out.println("-----result: "+v);
                }
            }).exceptionally(e -> {
                System.out.println("-----exception: "+e.getCause()+"\t"+e.getMessage());
                return -44;
            });
    
            //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    线程串行化方法

    public <U> CompletableFuture<U> thenApply(Function<? super T,extends U> fn)
    thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前
    任务的返回值。
    
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)
    public Completionstage<Void> thenAccept(Consumer<? super T> action);
    thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
    
    public Completionstage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
    public CompletionStage<Void> thenRun(Runnable action);
    thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行
    thenRun 的后续操
    
    public Completionstage<Void> thenRunAsync(Runnable action);
    public Completionstage<Void> thenRunAsync(Runnable action,Executor executor )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    带有 Async 默认是异步执行的。

    多任务组合

    public static CompletableFuture<Void>allof(CompletableFuture<?>... cfs);
    public static CompletableFuture<Object>anyof(CompletableFuture<?>... cfs);
    
    • 1
    • 2

    allOf:等待所有任务完成
    anyOf:只要有一个任务完成

    实战案例

    业务场景:
    查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

    //1,获取sku的基本信息				0.5s
    
    //2.获取sku的图片信息				0.5s
    
    //3.获取sku的促销信息				1s
    
    //4.获取spu的所有销售属性			1s
    
    //5.获取规格参数组及组下的规格参数	1.5s
    
    //6.spu详情						1s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    假如商品详情页的每个查询,需要如下标注的时间才能完成那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这 6 步操作,也许只需要 1.5s。

    以下是实战项目中利用线程池加CompletableFuture 异步编排来优化代码一定程度上增大了接口的QPS。

    @Override
        public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
            SkuItemVo skuItemVo = new SkuItemVo();
            CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
                //1、sku基本信息获取 pms_sku_info
                SkuInfoEntity info = getById(skuId);
                skuItemVo.setInfo(info);
                return info;
            }, executor);
            CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(res -> {
                //3、获取spu的销售属性组合
                List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBySkuId(res.getSkuId());
                skuItemVo.setSaleAttr(saleAttrVos);
            }, executor);
            CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(res -> {
                //4、获取spu的介绍 pms_sku_info_desc
                SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
                skuItemVo.setDesc(spuInfoDescEntity);
            }, executor);
            CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync(res -> {
                //5、获取spu的规格参数信息
                List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
                skuItemVo.setGroupAttrs(attrGroupVos);
            }, executor);
    
            //2、sku的图片信息 pms_sku_images
            CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
                List<SkuImagesEntity> images = imagesService.getIamImagesBySkuId(skuId);
                skuItemVo.setImages(images);
            }, executor);
    
            CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
                //3、远程调用查询当前sku是否参与秒杀优惠活动
                R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
                if (skuSeckilInfo.getCode() == 0) {
                    //查询成功
                    SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
                    });
                    skuItemVo.setSeckillSkuVo(seckilInfoData);
                    if (seckilInfoData != null) {
                        long currentTime = System.currentTimeMillis();
                        if (currentTime > seckilInfoData.getEndTime()) {
                            skuItemVo.setSeckillSkuVo(null);
                        }
                    }
                }
            }, executor);
            //等到所有任务都完成
            CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();
    
            return skuItemVo;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    上述代码是本此业务的核心代码,涉及微服务的远程调用等与CompletableFuture 无关的代码就给予忽视了

  • 相关阅读:
    微信小程序python+nodejs+php+springboot+vue 健身教练私教预约系统
    使用电力系统稳定器 (PSS) 和静态 VAR 补偿器 (SVC) 提高瞬态稳定性(Matlab代码实现)
    SCI投稿经验(三) 回复审稿人
    Vue2的12种组件通信
    MS Access 教程之 如何在不覆盖标题字段的情况下将 Excel 数据导入 MS Access 现有表?
    【离网逆变器】离网逆变器型号由一个高频DC-DC升压转换器与全桥PI控制电压源逆变器级联组成、逆变器使用带LC滤波器的SPWM调制(Simulink)
    论文投稿指南——中文核心期刊推荐(计算机技术)
    拍照扫描怎么弄?你可以试一下这两个方法
    机器人中的数值优化(十二)——带约束优化问题简介、LP线性规划
    Java&数组
  • 原文地址:https://blog.csdn.net/weixin_51596697/article/details/127651724