线程我们可以使用 new 的方式去创建,但如果并发的线程很多,每个线程执行的时间又不长,这样频繁的创建线程会大大的降低系统处理的效率,因为创建和销毁进程都需要消耗资源,线程池就是用来解决类似问题。
线程池实现了一个线程在执行完一段任务后,不销毁,继续执行下一段任务。用《Java并发编程艺术》提到线程池的优点:
1、降低资源的消耗:使得线程可以重复使用,不需要在创建线程和销毁线程上浪费资源
2、提高响应速度:任务到达时,线程可以不需要创建即可以执行
3、线程的可管理性:线程是稀缺资源,如果无限制的创建会严重影响系统效率,线程池可以对线程进行管理、监控、调优。
Excutor框架是线程池处理线程的核心,包括创建任务,传递任务,任务的执行三个方面
执行的任务需要实现 Runnable 或者 Callable 接口,然后重写里面的 run 方法,这两个接口区别下面会写
以前执行线程都是直接创建线程然后调用 start() 方法去执行线程,现在我们需要把任务传递到线程池里面去,传递任务的核心接口就是 Excutor 接口,而它下面有几个实现的方法,如图所示:
可以看到真正实现了功能的其实就是两个类 ThreadPoolExecutor 和 ScheduledTreadPollExecutor(可以延时定时执行任务),而其中用的最多的就是这个 ThreadPoolExecutor ,下面会详细介绍,我们把任务通过这个方法的对象进行传递。
任务传递给线程池执行完毕后,对于不同的传递方式,会有不同的返回策略,对于利用 excutor 方法传递的任务,不管执行的怎么样都不会有值传递回来,而对于 submit 方法传递的任务会返回一个 FutureTask 对象,返回用户希望接受的值。
Excutor 框架使用示意图如图:
ThreadPoolExecutor 是线程池最为核心的一个类,而线程池为它提供了四个构造方法,我们先来看一下其中最原始的一个构造方法,其余三个都是由它衍生而来
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看到这里有6个参数,这些参数直接影响到线程池的效果,以下是具体分析每个参数的意义
饱和策略的介绍: 即如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:
class worker implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName());
}
}
public class Test {
private static final int corePoolSize = 4;
private static final int maximumPoolSize = 6;
private static final long keepAliveTime = 2;
private static final TimeUnit unit = TimeUnit.SECONDS;
private static final int QueueSize = 5;//将参数设定为固定值
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
new ArrayBlockingQueue<>(QueueSize),
new ThreadPoolExecutor.CallerRunsPolicy()
);//构造线程池
for(int i = 0; i < 10; i++){
worker w = new worker();
executor.execute(w);
}//传入10个任务
executor.shutdown();//关闭线程池
while(!executor.isTerminated()){
}
System.out.println("finish");
}}
pool-1-thread-4
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
finish
我们可以根据运行的结果就知道我传进去的 10 个任务是通过 5 个线程复用所得到。
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
1 volatile int runState;
2 static final int RUNNING = 0;
3 static final int SHUTDOWN = 1;
4 static final int STOP = 2;
5 static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
Runnable:
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();//单纯的run
}
```go
**Callable:**
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;//无法计算则抛出异常
}
Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";//带有返回值
});
String s = submit.get();//get返回值
System.out.println(s);
executorService.shutdown();
称为可重用固定线程池,其源码:
/**
* 创建一个可重用固定数量线程的线程池
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
从源码可以看出,这个这个线程池的核心线程数 和 最大线程数量都设置为 nThreads ,这个参数是我们在创建线程池的时候传递的
上图说明:
使用FixThreadPool的缺点
在 FixThreadPool 中的等待队列是使用的 LinkedBlockingQueue,这是一个无界队列,最大的容量为 Integer.MAX_VALUE,可以无限制的接受任务,从而导致线程池的最大线程数参数生无效的,在核心线程全部被占有时,新来的任务就会被安置在无界队列中,当任务很多的时候,线程池没有拒绝任务的策略,就可能导致OOM(内存不足)
当线程池中的线程数达到核心线程数后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
由于使用无界队列时 最大线程数 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 的源码可以看出创建的 FixThreadPool 的 核心线程数和 最大线程数 被设置为同一个值。
运行中的 FixThreadPool(未执行 shutdown()或 shutdownow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。
/**
*返回只有一个线程的线程池
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
可以看到特点是只有一个核心线程
SingleThreadExcutor的缺点
同样的其等待队列也是用的 LinkedBlockingQueue 无界队列,同样也可能出现 OOM。
/**
* 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
从源码可以看到,其核心线程数为0,但最大线程数目为 Integer.MAX_VALUE,即是无界,其只要来任务需要线程就会创建线程,极端情况会造成线程耗尽的情况
其执行过程是当有任务传递给线程池会先查看现有的线程是否够,如果不够,则会创建新的线程。 其缺点就是可能创建大量线程,导致OOM
线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。