• 并发编程CompletableFuture用法


    1.配置线程池

    1. /**
    2. * int corePoolSize,
    3. * int maximumPoolSize,
    4. * long keepAliveTime,
    5. * TimeUnit unit,
    6. * BlockingQueue workQueue,
    7. * ThreadFactory threadFactory,
    8. * RejectedExecutionHandler handler
    9. *
    10. * @return
    11. */
    12. @Bean
    13. public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
    14. return new ThreadPoolExecutor(pool.getCoreSize(),
    15. pool.getMaxSize(),
    16. pool.getKeepAliveTime(),
    17. TimeUnit.SECONDS,
    18. new LinkedBlockingQueue<>(100000),
    19. Executors.defaultThreadFactory(),
    20. new ThreadPoolExecutor.AbortPolicy()
    21. );
    22. }


    2.线程池参数配置类

    1. import lombok.Data;
    2. import org.springframework.boot.context.properties.ConfigurationProperties;
    3. import org.springframework.stereotype.Component;
    4. /**
    5. my.thread.core-size=20
    6. my.thread.max-size=200
    7. my.thread.keep-alive-time=10
    8. */
    9. @ConfigurationProperties(prefix = "my.thread")
    10. @Component
    11. @Data
    12. public class ThreadPoolConfigProperties {
    13. //核心线程数
    14. private Integer coreSize;
    15. //最大线程数
    16. private Integer maxSize;
    17. //空余线程的存活时间
    18. private Integer keepAliveTime;
    19. }


    3.测试异步任务

    1. @Autowired
    2. private ThreadPoolExecutor executor;
    3. //在这里开启一个异步任务,提交给线程池,runAsync()方法没有返回值,需要有返回值的可使用supplyAsync()方法
    4. @Test
    5. void testCompletableFuture() {
    6. CompletableFuture runAsync = CompletableFuture.runAsync(() -> {
    7. int result = 0;
    8. for (int i = 0; i <= 100; i++) {
    9. result += i;
    10. }
    11. System.out.println(result);
    12. }, executor);
    13. }


    4.关于CompletableFuture的其他相关用法
    4.1 CompletableFuture的**get()**方法可以获取异步的结果,get方法是一个阻塞式等待的方法,也即get方法会等待异步任务的完成

    1. CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
    2. for (int i = 0; i <= 100; i++) {
    3. sum2.addAndGet(i);
    4. }
    5. return sum2;
    6. }, executor);
    7. //获取异步结果
    8. AtomicInteger integer = completableFuture2.get();


    4.2 allOf : 等待所有任务完成完成

    1. AtomicInteger sum = new AtomicInteger();
    2. AtomicInteger sum2 = new AtomicInteger();
    3. CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
    4. for (int i = 0; i <= 100; i++) {
    5. sum.addAndGet(i);
    6. }
    7. return sum;
    8. }, executor);
    9. CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
    10. for (int i = 0; i <= 100; i++) {
    11. sum2.addAndGet(i);
    12. }
    13. return sum2;
    14. }, executor);
    15. AtomicInteger integer = completableFuture2.get();
    16. //allOf : 等待所有任务完成完成,注意get方法,是阻塞式等待,等待上面的异步任务都完成
    17. CompletableFuture.allOf(completableFuture1,completableFuture2).get();
    18. //获取异步结果
    19. AtomicInteger atomicInteger1 = completableFuture1.get();
    20. AtomicInteger atomicInteger2 = completableFuture2.get();
    21. System.out.println("结果是--->"+atomicInteger1.addAndGet(atomicInteger2.intValue()));


    4.3 异步任务完成时,whenComplete,exceptionally

    1. CompletableFuture completableFuture3 = CompletableFuture.supplyAsync(() -> {
    2. for (int i = 0; i <= 10; i++) {
    3. sum2.addAndGet(i);
    4. }
    5. return sum2;
    6. }, executor).whenComplete((res, exception) -> {
    7. //当出现异常,可以拿到异常信息,但是无法修改返回数据
    8. System.out.println("结果是:" + res + ",异常:" + exception);
    9. }).exceptionally(throwable -> {
    10. //可以感知异常,同时返回默认值
    11. return new AtomicInteger(10);
    12. });


    4.4 handle,方法完成后的后续处理

    1. CompletableFuture completableFuture4 = CompletableFuture.supplyAsync(() -> {
    2. int i = 10 / 2;
    3. return i;
    4. }, executor).handle((res, throwable) -> {
    5. //res 为结果,throwable 为异常
    6. if (res != null) {
    7. return res * 2;
    8. }
    9. if (throwable != null) {
    10. return -1;
    11. }
    12. return 0;
    13. });
    14. System.out.println("completableFuture4--结果是:"+completableFuture4.get());


    4.5 异步任务串行化

    1. /**
    2. * 异步任务串行化
    3. * thenAcceptAsync 可以接收上一步获取的结果,但是无返回值
    4. * thenApplyAsync 可以接收上一步获取的结果,有返回值
    5. */
    6. CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    7. int i = 10 / 2;
    8. return i;
    9. }, executor).thenApplyAsync(res -> {
    10. //res为上一步的结果
    11. return res * 2;
    12. }, executor).thenAcceptAsync((res) -> {
    13. System.out.println("hello ...thenAcceptAsync");
    14. }, executor);


     

  • 相关阅读:
    数字金融场景下数据安全建设方案
    AI绘图Stable Diffusion中关键技术:U-Net的应用
    ceph集群巡检项
    21天学习第八天--面向对象三大特征之:继承
    一文学习yolov5 实例分割:从训练到部署
    后台管理系统中,实现修改功能时,数据回显导致table-column数据消失。罪魁祸首竟是浅拷贝
    域名里边的门道
    【CCF】第30次csp认证——202305-1重复局面
    管理学名词解释
    安全框架springSecurity+Jwt+Vue-2(后端开发)
  • 原文地址:https://blog.csdn.net/weixin_43837268/article/details/133961814