• JUC常见的线程池源码学习 01 ( 常见的 Java 线程池体系 )


    1. 常见的 Java 线程池体系

    学习一个技术,至少要先了解它的基本框架。

    在这里插入图片描述

    注意 ! Executors 线程池工具类,相当于一个线程工厂

    在这里插入图片描述

    2. Executor(执行器)

    线程池顶级接口
    执行无返回值的任务,根据Executor的实现判断,可能在新线程、线程池、线程调用中执行。

    public interface Executor {
        void execute(Runnable command);
    }
    
    • 1
    • 2
    • 3

    3. ExecutorService(执行器 服务)

    线程池次级接口,对Executor做了一些扩展,增加了一些功能

    • Executor + Service

    在Executor的基础上增加了线程池的服务

    public interface ExecutorService extends Executor {
    
        /**
    	关闭线程池,不再接受新任务,但已经提交的任务会执行完成
         */
        void shutdown();
    
        /**
    	立即关闭线程池,尝试停止正在运行的任务,未执行的任务将不再执行
    	被迫停止及未执行的任务会以列表的形式返回
         */
        List<Runnable> shutdownNow();
    
        /**
    	检查线程池是否已关闭
         */
        boolean isShutdown();
    
        /**
    	检查线程池是否已终止,只有在shutdown()或shutdownNow()之后调用才可能为true
         */
        boolean isTerminated();
    
        /**
    	在指定时间内 线程池  达到  终止状态了  才会  返回  true
         */
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
    	执行有返回值的任务,任务的返回值为 task.call()的结果
         */
        <T> Future<T> submit(Callable<T> task);
    	
    	/**
    	执行有返回值的任务,任务的返回值为这里传入的result
    	只有当任务执行完了,调用get()时才会返回
        */
        <T> Future<T> submit(Runnable task, T result);
        /**
    	执行有返回值的任务,返回值为null,只有当任务执行完了调用get()时才会返回
         */
        Future<?> submit(Runnable task);
        
        /**
    	批量执行任务,只有当这些任务都完成了这个方法才会返回
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        /**
    	在指定时间内批量执行任务,未执行完成的任务将被取消
    	timeout是所有任务的总时间,不是单个任务的时间
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
    	返回任意一个已完成任务的执行结果,未执行完成的任务将被取消
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        /**
    	在指定时间内如果有任务已完成,则返回任意一个已完成任务的执行结果,未执行完成的任务将被取消
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    4. AbstractExecutorService

    抽象类,运用模板方法设计模式实现了一部分方法

    • Abstract + Executor + Service
    • 抽象 执行器 服务
    public abstract class AbstractExecutorService implements ExecutorService {
        /**
    	批量执行任务,只有当这些任务都完成了这个方法才会返回
         */
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            //所有的结果代理,既能拿结果,又能取消任务的执行
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
            //把需要执行的任务放到线程池去执行,存储代理结果
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    futures.add(f);
                    execute(f);
                }
                for (int i = 0, size = futures.size(); i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        try {
                        // 查看存储的结果,如果任务没有完成。就阻塞
                            f.get();
                        } catch (CancellationException ignore) {
                        //取消异常  忽略
                        } catch (ExecutionException ignore) {
                        //执行异常  忽略
                        }
                    }
                }
                done = true;
                return futures;
            } finally {
            	//如果不是我想忽略的异常,就 done=false,因为=true那行没执行
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                    //取消所有任务的执行,执行过的取消也没用,主要取消未做的
                        futures.get(i).cancel(true);
            }
        }
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            // unit 时间单位
            long nanos = unit.toNanos(timeout);
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                final long deadline = System.nanoTime() + nanos;
                final int size = futures.size();
                //循环执行任务列表
                for (int i = 0; i < size; i++) {
                    execute((Runnable)futures.get(i));
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L)
                        return futures;
                }
    
                for (int i = 0; i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        if (nanos <= 0L)
                        	//如果不等直接返回
                            return futures;
                        try {
                        	//隔了最大的等待时间再次获取
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        nanos = deadline - System.nanoTime();
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
       private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                  boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
                //获取所有的任务数
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
    
            try {
    
                ExecutionException ee = null;
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Iterator<? extends Callable<T>> it = tasks.iterator();
                futures.add(ecs.submit(it.next()));
                //当前未执行的任务
                --ntasks;
                //当前正在执行的任务
                int active = 1;
    
                for (;;) {
                //如果这个任务没执行完
                    Future<T> f = ecs.poll();
                    if (f == null) {
                    //如果还有没执行的任务
                        if (ntasks > 0) {
                        //当前未执行任务 - 1
                            --ntasks;
                            //添加当前任务到执行队列
                            futures.add(ecs.submit(it.next()));
                            // 正在执行的任务数 + 1
                            ++active;
                        }
                        else if (active == 0)
                        //如果都执行完了
                            break;
                        else if (timed) {
                        //如果设置了超时时间
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            nanos = deadline - System.nanoTime();
                        }
                        else
                        //如果没设置超时时间,就无限期等待
                            f = ecs.take();
                    }
                    if (f != null) {
                    //如果执行完了,就是这个任务返回结果都出来了
                    //那么正在执行任务数 - 1
                        --active;
                        try {
                        //返回这个结果,其他不再执行
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally {
            //只要里面有预料之外的异常,任务全部取消
            //这里没有直接取消正在执行的,否则造成死锁,因为不会释放锁
            //而是设置中断位,虽然可能多个任务执行了,但只会返回第一个结果
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
            }
        }
    
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
        	// task封装成一个RunnableFuture
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            //执行task
            execute(ftask);
            return ftask;
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181

    newTaskFor(task, null)点进去就是创建一个FutureTask,而返回值类型是RunnableFuture,说明FutureTask就是一个特殊的RunnableFuture

        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
    • 1
    • 2
    • 3

    走进去RunnableFuture看看
    接口整合就是把功能整合
    一个Future就是一个异步的结果的代理

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    
    • 1
    • 2
    • 3
  • 相关阅读:
    网络安全系列-四十四:使用Filebeat、ElasticSearch、Kinaba 针对Suricata的分析结果eve.json进行可视化展示
    tictoc例子理解 16-18
    大数据HBASE的详细使用
    Allegro DFM Ravel Rule 丝印线段到PAD 间距检查
    【Prometheus+Grafana系列】监控MySQL服务
    MODBUS转PROFINET网关将电力智能监控仪表接入PROFINET网络案例
    java学习之包
    CEC2015:动态多目标野狗优化算法求解CEC2015(提供完整MATLAB代码,含GD、IGD、HV和SP评价指标)
    HTML概述_入门篇
    关于 WordPress 你了解多少?
  • 原文地址:https://blog.csdn.net/niTaoTaoa/article/details/126032359