• Java多线程获取执行结果


    ps:我采用了方式3,时间=2022-09-20 21:40

    原文链接:Java多线程获取执行结果_Lucifer Zhao的博客-CSDN博客_java多线程获取结果

    Java多线程获取执行结果

    Lucifer Zhao

    已于 2022-03-08 18:37:51 修改

    1660
     收藏 3
    分类专栏: 线程 文章标签: 多线程
    版权

    线程
    专栏收录该内容
    9 篇文章0 订阅
    订阅专栏
    一、Thread.join()
    特点:一个线程等待另一个线程结束后才能执行。利用此原理我们可以设置一个监控线程用来等待程序线程执行完毕后输出返回结果。

    public class Result {
        private String value;
        public String getValue() {
            return value;
        }
        public void setValue(String value) {
            this.value = value;
        }
    }
    // 定义工作线程,模拟程序执行并输出线程执行结果
    public class WorkThread extends Thread {
        private Result result ;
        
        public void init(Result result) {
            this.result = result;
        }
        
        public void run() {
            try {
                Thread.sleep(1000*10);//模拟程序执行
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            result.setValue("线程执行完毕,输出结果");
        }
    }
    // 主线程等待工作线程执行,并获取工作线程的执行成果
    public class MainThread {
        public static void main(String[] args) throws InterruptedException {
            Result result = new Result();
            WorkThread workThread = new WorkThread();
            workThread.init(result);
            System.out.println("线程启动");
            workThread.start();
            System.out.println("线程等待");
            // 等待work线程运行完再继续运行
            workThread.join();
            System.out.println("线程执行结果:"+result.getValue());
        }
    }
    缺点:

    获取多个线程返回结果时繁琐,需要自己实现;
    无法与线程池配合使用;
    本质上还是同步返回结果,主线程被阻塞;
    二、CountDownLatch
    CountDownLatch是jdk提供的多线程同步工具,CountDownLatch其实本质上可以看做一个线程计数器,计数为0所有线程全部执行,统计多个线程执行完成的情况,适用于控制一个或多个线程等待,直到所有线程都执行完毕的场景;因此我们可以利用其功能特点实现获取多个线程的执行结果,一定程度上弥补了Thread.join的不足。

    public class WorkThread extends Thread {
        private Map resultMap;
        private CountDownLatch countDownLatch;
        
        public WorkThread(CountDownLatch countDownLatch, Map resultMap) {
            this.countDownLatch=countDownLatch;
            this.resultMap = resultMap;
        }
        
        public void run() {
            try {
                Thread.sleep(1000*3);//模拟程序执行
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String threadName = Thread.currentThread().getName();
            resultMap.put(threadName, threadName + "线程执行完毕,输出结果");
            countDownLatch.countDown();
        }
    }
    public class MainThread {
        public static void main(String[] args) throws InterruptedException {
            Map resultMap = new HashMap();//定义一个Map做为存储返回结果的容器
            final CountDownLatch countDownLatch = new CountDownLatch(5);
            // 启动多个工作线程
            for (int i = 0; i < 5; i++) {
                WorkThread workThread = new WorkThread(countDownLatch, resultMap);
                workThread.start();
            }
            System.out.println("主线程等待工作线程执行");
            countDownLatch.await();
            System.out.println("resultMap:" + resultMap);        
        }
    }
    应用场景:如果启动的线程数是固定的,且需要等待执行结果全部返回后统一处理,使用CountDownLatch是一个不错的选择。

    三、Future与FutureTask
    使用Future配合线程池,获取线程池执行线程的返回结果

    public class WorkThread implements Callable {
        public Result call() throws Exception {
            Thread.sleep(5000);
            Result result = new Result();
            result.setValue(Thread.currentThread().getName()+"线程执行完毕,输出结果");
            return result;
        }
    }
    public class MainThread {
         public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
             ExecutorService taskPool = new ThreadPoolExecutor(5, 15, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());
             Future future = taskPool.submit(new WorkThread());
             System.out.println("线程池执行工作线程");
             Result result = future.get();//注意这里get操作是阻塞,future仍属于同步返回,主线程需要阻塞等待结果返回
             //result = future.get(3,TimeUnit.SECONDS);//设置阻塞超时时间
             System.out.println(result.getValue());
         }
    }
    Future与FutureTask实现方式基本类似,FutureTask是对Futue的进一步封装。

    优点:Future可以配合线程池获取线程执行结果,并可以设置返回结果超时时间,避免长时间阻塞堆积任务。

    缺点:但是Future获取结果的get方法是阻塞的,本质上是同步返回。

    四、CompletionService
    CompletionService可以看作是FutureTask的进阶版,通过FutureTask + 阻塞队列的方式能够按照线程执行完毕的先后顺序获取执行结果,有点类似CountDownLatch。如果需要执行的线程数是固定的,且需要等待执行结果全部返回后统一处理,可以使用CompletionService。

    public class WorkThread implements Callable{
        private Map resultMap;
        
        public WorkThread(Map resultMap) {
            this.resultMap=resultMap;
        }
        
        public Map call() throws InterruptedException {
            String threadName = Thread.currentThread().getName();
            resultMap.put(threadName, threadName + "线程执行完毕,输出结果");
            return resultMap;
        }
    }
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService exec = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue(5), Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        //定义一个阻塞队列
        BlockingQueue> futureQueue =    new LinkedBlockingQueue>();
        //传入ExecutorService与阻塞队列,构造一个completionService
        CompletionService completionService = new ExecutorCompletionService(exec,futureQueue);
        private Map resultMap = new HashMap();
        for(int i=0;i<10;i++) {
            completionService.submit(new WorkThread(resultMap));
        }
        for(int i=0;i<10;i++) {
            resultMap = completionService.take().get();//如果获取不到数据时处于阻塞状态
        }
        System.out.println(resultMap);
    }
    确定:completionService获取返回结果是阻塞的,本质上还是同步,会阻塞主线程

    五、生产者消费者模式
    生产者消费者模式通过一个阻塞队列来解决生产者、消费者之间的强耦合问题。相当于一个缓冲区,平衡消费者和生产者的处理能力。

    public class Container {
        public static int QUEUE_SIZE = 20;
        public static ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(QUEUE_SIZE);
        public static AtomicInteger TASK_SIZE = new AtomicInteger(0);
        public static List list = new ArrayList(10);
    }
    public class ProducerThread extends Thread {
        public void run() {
            ExecutorService executorService = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
            for (int i = 0; i < 100; i++) {
                executorService.submit(new ProducerHandler(i));
            }
        }
    }
    public class ProducerHandler extends Thread {
        private int i;
     
        public ProducerHandler(int i) {
            this.i = i;
        }
     
        public void run() {
            while (true) {
                    try {
                        if (Container.blockingQueue.size() == Container.QUEUE_SIZE) {
                            System.out.println("仓库满了============================================");
                        }
                        Container.blockingQueue.put("product" + i);
                        System.out.println(Thread.currentThread().getName() + "生产了一个product" + i + ", 当前仓库数量:" + Container.blockingQueue.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }
        }
    }

    public class ConsumerThread extends Thread {
        @Override
        public void run() {
            ExecutorService executorService = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
            for (int i = 0; i < 2; i++) {
                executorService.submit(new ConsumerHandler());
            }
        }
    }
    public class ConsumerHandler extends Thread {
        @Override
        public void run() {
            while (true) {
                    try {
                        if (Container.blockingQueue.isEmpty()) {
                            System.out.println("仓库空了============================================");
                        }
                        String result = Container.blockingQueue.take(); //有数据就消费,没有就阻塞等待
                        System.out.println("消费者消费了一个产品" + result + ", 当前仓库数量:" + Container.blockingQueue.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        }
    }

    public class PCModelThreadPool {
        public static void main(String[] args) {
            new ProducerThread().start();
            new ConsumerThread().start();
        }
    }
    一个完善的生产者消费者模式需要考虑很多方面,  最关键的还是以下两个要素:

    1、线程安全,生产者与消费者分别执行读写操作,特别是在多个生产线程与消费线程时,一定会存在数据读写的并发操作,所以数据队列一定要保证线程安全;

    2、生产与消费的协调,数据队列满时生产线程停止写入,数据队列空时消费线程停止消费,需要考虑不必要的性能浪费;

    六、异步回调
    ————————————————
    版权声明:本文为CSDN博主「Lucifer Zhao」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/luciferlongxu/article/details/123217376

  • 相关阅读:
    HTML做一个简单的页面(纯html代码)地球专题学习网站
    【短文】Linux怎么删除全部带有指定关键字的进程
    wxWidgets从空项目开始Hello World
    python打包原理
    F - 爱丽丝、鲍勃和巧克力
    GBASE 8s 通过light scan优化查询性能
    Acwing 835. Trie字符串统计
    【软考高级信息系统项目管理师--第五章:信息系统工程下】
    大规模神经网络的实现(什么是异步梯度下降?怎样进行模型压缩?)
    leetcode动态规划之买卖股票+打家劫舍
  • 原文地址:https://blog.csdn.net/wangwenzhe222/article/details/126962316