• Java-多线程-CompletionService(优先处理)


    java.util.concurrent.CompletionService 是对 ExecutorService 的一个功能增强封装,优化了获取异步操作结果的接口。

    假设我们要向线程池提交一批任务,并获取任务结果。一般的方式是提交任务后,从线程池得到一批 Future 对象集合,然后依次调用其 get() 方法。进行阻塞所有线程执行完毕,然后按线程执行的顺序获取到结果

    这里有个问题:因为我们会要按固定的顺序来遍历 Future 元素,而 get() 方法又是阻塞的,因此如果某个 Future 对象执行时间太长,会使得我们的遍历过程阻塞在该元素上,无法及时从后面早已完成的 Future 当中取得结果。

    CompletionService 解决了这个问题。下面介绍如何创建和使用 CompletionService。

    CompletionService 本身不包含线程池,创建它的实例之前,先要创建一个 ExecutorService。下面是一个例子:

    ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    
    • 1
    • 2

    向 CompletionService 提交任务的方式与 ExecutorService 一样:completionService.submit(() -> "Hello");

    两者的区别在于取结果的方式。有了 CompletionService,你不需要再持有 Future 集合。如果要得到最早的执行结果,只需要像下面这样:
    String result = completionService.take().get(); 这个 take() 方法返回的是最早完成的任务的结果,这个就解决了一个任务被另一个任务阻塞的问题。下面是一个封装好的例子:

        //不用等待所有线程执行完毕,而是谁先执行完毕,就返回谁的结果 ,以此类推,等待全部线程执行完毕
        public  static void createCompletionServicesAll(String key, List<Callable> callables, Consumer< Future> consumer)  {
            CompletionService<?> completionService = new ExecutorCompletionService<>(getExecutor(key));
            for (Callable callable : callables) {
                completionService.submit(callable);
            }
            try {
                for (int i = 0; i < callables.size(); i++) {
                    consumer.accept(completionService.take());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //一堆线程同时执行,谁先执行完毕那么就采用谁的结果,其余线程结果不管
        public  static Object createCompletionServicesOne(String key, List<Callable> callables) throws ExecutionException {
            CompletionService<?> completionService = new ExecutorCompletionService<>(getExecutor(key));
            for (Callable callable : callables) {
                completionService.submit(callable);
            }
            Object result=null;
            try {
                result=completionService.take().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    测试

        @Test
        public  void show(){
    
            List<Callable> callables=new LinkedList<>();
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                callables.add(()->{
    //                throw  new Exception("---");
                    int i1 = new Random().nextInt(100);
                    Thread.sleep(i1 );
                    System.out.println("睡眠:"+i1+"返回结果:"+finalI);
                    return finalI;
                });
            }
    //        Collections.shuffle(callables);
    
            ExecutorUtils.createCompletionServicesAll("test",callables,(o)->{
                try {
                    System.out.println(o.get()); //如果某个线程出现异常则抛出异常我们这里可以捕捉到
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }
    
        @Test
        public  void show1(){
    
            List<Callable> callables=new LinkedList<>();
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                callables.add(()->{
    //                throw  new Exception("---");
                    int i1 = new Random().nextInt(1000);
                    Thread.sleep(i1);
                    System.out.println("睡眠:"+i1+"返回结果:"+finalI);
                    return finalI;
                });
            }
    
            try {
                //获取第一个执行完毕的结果
                Integer test = (Integer)ExecutorUtils.createCompletionServicesOne("test", callables);
                System.out.println("result:"+test);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    在这里插入图片描述

    点赞 -收藏-关注-便于以后复习和收到最新内容
    有其他问题在评论区讨论-或者私信我-收到会在第一时间回复
    在本博客学习的技术不得以任何方式直接或者间接的从事违反中华人民共和国法律,内容仅供学习、交流与参考
    免责声明:本文部分素材来源于网络,版权归原创者所有,如存在文章/图片/音视频等使用不当的情况,请随时私信联系我、以迅速采取适当措施,避免给双方造成不必要的经济损失。
    感谢,配合,希望我的努力对你有帮助^_^
  • 相关阅读:
    Haproxy 透传IP配置方法及测试
    ArcGIS for js 缓冲(vue项目)
    synchronized修饰类的注意事项
    2020年最全Java面试汇总整理(含答案),再也不用担心面试被挂了
    C# WebSocket 服务器
    83 # 静态服务中间件 koa-static 的使用以及实现
    解决pip install时ssl报错的问题
    asp.net core mvc之路由
    头条服务端一面经典10道面试题解析
    echarts5.0引入地图,背景渐变色,航线图,地图阴影
  • 原文地址:https://blog.csdn.net/weixin_45203607/article/details/126269272