• CompletableFuture之批量上传


    前言

    最近接到一个需求,批量上传图片到服务器及实时更新上传进度。当处理大量文件上传任务时,效率是一个关键因素。传统的串行方式会导致任务耗时较长,而使用并发处理可以极大地提高上传效率。想到很久之前用CompletableFuture优化过一些多统计的业务场景,效果都还不错,因此在这里也使用它来优化一下上传的效率。

     

    CompletableFuture简介

    CompletableFuture类是Java 8引入的,它实现了FutureCompletionStage接口,提供了更强大和灵活的异步编程功能。CompletableFuture除了具有Future的特性外,还提供了更多的操作和组合方式来处理异步任务。它可以更方便地处理异步任务,实现并发编程,并提供更好的异常处理和结果转换机制。在进行异步编程时,CompletableFuture是一个更为强大和推荐的选择。

    主要特点:

    1. 异步执行:允许将任务提交给后台线程,在任务执行期间不会阻塞主线程。这样可以提高应用程序的响应性能,特别是在处理I/O密集型操作时,如网络请求或数据库查询。

    2. 链式调用和组合操作:支持链式调用,可以将多个异步任务按照顺序连接起来形成一个任务流水线。每个任务的执行依赖于前一个任务的结果,这种串行的处理方式可以简化异步任务的编写和管理。

    3. 异常处理:提供了异常处理的机制,可以通过异常回调方法来捕获和处理任务执行过程中的异常情况。这样可以更好地控制和处理任务执行过程中的异常,提供更健壮的代码。

    4. 转换和合并结果:提供了一系列的转换和合并操作,可以对任务的结果进行映射、转换和合并。这样可以方便地对任务的结果进行处理和转换,得到最终期望的结果。

    5. 多任务并行执行:支持等待多个任务并行执行,并等待它们全部完成或任意一个完成。这种能力使得在处理并发任务时可以更好地利用系统资源,提高任务执行的效率。

     

    串行和并行的效率对比

    测试批量上传了1000张图片,每张图片在579KB,一共564MB。使用串行方式上传,总时长为501秒,使用并行方式上传,总时长是108秒,通过对比优化前后的代码,可以明显看出使用CompletableFuture并发处理方式的效率更高。由于任务是并行执行的,多核处理器的能力得到了充分的利用,从而大大提高了批量上传的速度。

     

    串行处理方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    /**
     * describe: 批量上传图片
     *
     * @param files 图片文件集合
     * @param fileId 文件夹id
     * @param scheduleKey 上传进度key
     * @date 2023年06月28日 11:42:03
     * @author Tang
     */
    @Override
    public BatchUploadVO batchUpload2(MultipartFile[] files, Long fileId, String scheduleKey) {
        //取上传配置
        String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
        ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
        List imgTypeList = Arrays.asList(config.getImgType().split(","));
     
        List errorNames = Lists.newCopyOnWriteArrayList();
        String userName = SecurityAuthorHolder.getSecurityUser().getUsername();
     
        for(MultipartFile file : files){
            try {
                RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
                String suffix = Objects.requireNonNull(file.getOriginalFilename()).substring(file.getOriginalFilename().lastIndexOf(".") + 1);
                ServerException.Assert(!imgTypeList.contains(suffix), "文件格式不正确,支持" + String.join(",", imgTypeList));
                ServerException.Assert(file.getSize() > config.getMaxSize() * 1024, "文件最大不能超过" + config.getMaxSize() + "K");
                //上传
                ImgResourceEntity saveData = upload(file, config);
                saveData.setFileId(fileId);
                saveData.setCreator(userName);
                baseMapper.insert(saveData);
                //缓存自增  供轮询查询实时进度
                RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
            } catch (Exception e) {
                errorNames.add(file.getOriginalFilename());
                RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
            }
        }
     
        BatchUploadVO vo = schedule(scheduleKey);
        vo.setErrFileNames(errorNames);
        return vo;
    }

      

    串行处理调用时间

     

    并行处理方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    /**
     * describe: 批量上传图片
     *
     * @param files 图片文件集合
     * @param fileId 文件夹id
     * @param scheduleKey 上传进度key
     * @date 2023年06月28日 11:42:03
     * @author Tang
     */
    @Override
    public BatchUploadVO batchUpload(MultipartFile[] files, Long fileId, String scheduleKey) {
        ExecutorService executor = Executors.newFixedThreadPool(10);
     
        //取上传配置
        String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
        ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
        List imgTypeList = Arrays.asList(config.getImgType().split(","));
     
        List errorNames = Lists.newCopyOnWriteArrayList();
        String userName = SecurityAuthorHolder.getSecurityUser().getUsername();
     
        CompletableFuture allFutures = CompletableFuture.allOf(
                Arrays.stream(files).map(v ->
                        CompletableFuture.runAsync(
                                () -> {
                                    try {
                                        RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
                                        String suffix = Objects.requireNonNull(v.getOriginalFilename()).substring(v.getOriginalFilename().lastIndexOf(".") + 1);
                                        ServerException.Assert(!imgTypeList.contains(suffix), "文件格式不正确,支持" + String.join(",", imgTypeList));
                                        ServerException.Assert(v.getSize() > config.getMaxSize() * 1024, "文件最大不能超过" + config.getMaxSize() + "K");
                                        //上传
                                        ImgResourceEntity saveData = upload(v, config);
                                        saveData.setFileId(fileId);
                                        saveData.setCreator(userName);
                                        baseMapper.insert(saveData);
                                        //缓存自增  供轮询查询实时进度
                                        RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
                                    } catch (Exception e) {
                                        errorNames.add(v.getOriginalFilename());
                                        RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
                                    }
                                }, executor)
                ).toArray(CompletableFuture[]::new)
        );
        // 等待所有 CompletableFuture 完成
        allFutures.join();
     
        // 关闭线程池
        executor.shutdown();
     
        BatchUploadVO vo = schedule(scheduleKey);
        vo.setErrImgFileNames(errorNames);
        return vo;
    }

      

     并行调用处理时间

     

    实现过程中的注意事项

    1. 线程池的使用:为了实现并发处理,可以使用线程池来管理并执行异步任务。通过合理设置线程池的大小和参数,可以控制并发线程的数量和资源的利用率。

    2. 异常处理:在并发处理中,每个任务都是独立执行的,因此需要适当处理任务中可能出现的异常情况,避免异常的影响扩散。

    3. 进度更新:为了实时更新上传进度,可以将每个任务的进度信息保存到Redis中,并在前端通过轮询查询的方式获取最新的进度信息。

    4. 线程安全:确保上传逻辑的线程安全性,避免多线程环境下的竞态条件和数据一致性问题。

     

    总结

    使用CompletableFuture来优化批量上传任务是一种高效且灵活的方式。通过并发处理,我们可以充分利用多核处理器的能力,提高任务的执行效率。同时,通过实时更新上传进度并返回总体的上传结果,可以给用户更好的体验。
    在实现过程中,我们需要合理使用线程池、处理异常、保证数据同步和线程安全,以确保上传任务的稳定性和性能。同时,我们还可以利用CompletableFuture提供的方法来处理任务的结果、异常和其他相关操作,以满足具体的业务需求。
    通过使用CompletableFuture进行批量上传任务的优化,可以显著提高系统的性能和用户体验,适用于需要处理大量并发任务的场景。

     

  • 相关阅读:
    苏格拉底、柏拉图、亚里士多德,走进希腊三贤的世界
    糟糕,数据库异常不可用怎么办?
    GreenPlum AOCO列存读IO原理
    公司应该如何招人?
    Python实现整蛊恶搞程序生成exe文件小弹窗祝福发给好兄弟好闺蜜好室友
    练习4
    创客匠人抖音小程序引流转化三步走
    数据库学习之基础内容
    python虚拟环境的安装和搭建
    HarmonyOS 应用开发之@Concurrent装饰器:@Sendable装饰器:声明并校验Sendable类
  • 原文地址:https://www.cnblogs.com/-tang/p/17517164.html