一些任务适合使用单独的线程去执行,而线程作为一项比较重的资源如果频繁创建对系统资源消耗较大。使用线程池,将线程重复使用,节省频繁创建线程的开销。
创建一个线程需要调用操作系统的API,然后操作系统要为线程分配一系列的资源,这个成本就很高了。所以应该避免线程的频繁创建和销毁。
在Java中表示线程池的类为ThreadPoolExecutor。定义时最多需要设置7个参数。
1.核心线程数,当线程池最少维护的线程个数
2.最大线程数,当提交任务时所有核心线程都在工作,切阻塞队列已经满了,则继续创建线程,这个参数定义了创建线程数的上限
3和4 线程活跃时间,当线程超过该时间空闲时销毁线程,当然要保留核心线程数个线程
5 阻塞队列,任务提交时,当无空闲线程且线程个数到达核心线程数,则先将任务保存到该队列中。为了避免OOM阻塞队列需要设置一个上限,不建议使用Executors定义好的几种线程池就是因为它的阻塞队列是无界的。
6 线程工厂,定义创建线程的方式,比如指定线程名字
7 拒绝策略,当阻塞队列满了,这时再提交任务则触发决绝策略。默认提供了4种:抛出异常、静默处理、调用者线程自身处理、删除最早提交的任务。也可以自定义处理方式,比如使用另一个补偿线程池执行、放入MQ让其他消费者执行、放入ySQL启动一个线程遍历执行等。
public static void func() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, //核心线程数
5, //最大线程数
10, //线程活跃时间,超过这个时间没有执行任务释放该线程
TimeUnit.SECONDS, //活跃时间单位
new ArrayBlockingQueue<>(1024), //阻塞队列,设置一个上限避免OOM
new ThreadFactory() {
private final AtomicInteger threadId = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MyThreadPool-" + threadId.getAndIncrement());
}
}, //线程工厂,指定如何生成线程。这里给线程指定一个有意义的名字。(线程可以重名吗?)
new ThreadPoolExecutor.CallerRunsPolicy() //拒绝策略,当所有线程都在忙碌,而阻塞队列到达上限时执行的操作。
// 有4种已定义的:抛出异常、不执行静默处理、提交任务的线程自己执行、抛弃最早的提交的任务;
// 也可以自定义处理方式,如放入消息队列进行补偿执行、放入数据库进行补偿(有个线程进行定时遍历)、使用专门的补偿线程池进行执行
);
}
1.提交Runnable,不关心返回值、执行情况
ThreadPoolExecutor.execute(Runnable r)
此方法没有返回值,适用于任务执行结束不需要有处理结果的情况。且不能进行任务的取消。
2.提交Runnable,返回future,可以取消任务、等待任务结果
Future<?> future = ThreadPoolExecutor.submit(Runnable r)
返回一个future,提供了isDone()判断任务是否执行完;cancel(true)取消任务,参数表示如果任务已经在执行了是否发出Innterrupted中断;future.get()阻塞获取执行结果,当然这里提交的Runnable总是返回null,但可以作为等待任务结束的方法。
3.提交Callable,返回future,通过future拿到执行结果
Future<T> ThreadPoolExecutor.submit(Callable<T> c)
这个future的get()方法就可以获取到Callable的call方法的返回值。当然isDone、cancel等方法还是可以正常使用的
1.池化资源思想
线程池是一种池,将线程创建后重复使用,是一种池化思想。池化思想在编程领域有广泛应用,比如对象池、连接池等。
2.生产者、消费者模型
和一般的池化资源不同,线程池并不是通过获取资源、使用资源、释放资源的步骤来使用的。而是一种生产者、消费者模型。提交任务是一种生产行为,线程池中的线程执行任务是消费行为。
1.CPU密集型任务
一般设置为CPU核心数+1,因为任务大部分时间都在利用CPU,每个线程利用一个CPU。设置+1是为了偶尔的内存缺页等导致线程中断时有线程可以顶上。
2.IO密集型任务
任务大部分时间都在执行IO操作,比如调用外部接口等。线程数就可以设置的大于CPU核心数。一个理想公式为:(1 + I/O时间 / CPU时间) * 核心数。比如IO和Cpu之比为9:1,那么设置10个线程。当9个线程使用完CPU后,剩下一个线程正好开始使用CPU。每时每刻都有1个线程使用CPU,9个线程使用IO,两者的利用率都达到了100%。
对于使用execute(Runnable r)提交的任务,抛出异常时提交任务的线程不会得到通知。对于另外两种有future返回的submit()方法提交的任务,如果调用了get方法,发生异常时get会抛出异常。如果不调用get也感知不到异常。
推荐的方式:在任务中按需处理所有异常!
可以定时获取以下指标进行监控埋点:
//核心线程数,这个应该不会动态变化吧?
int corePoolSize = executor.getCorePoolSize();
//最大线程数
int maximumPoolSize = executor.getMaximumPoolSize();
//历史最大线程数
int largestPoolSize = executor.getLargestPoolSize();
//活跃线程数,正在执行任务的线程数?
int activeCount = executor.getActiveCount();
//当前线程池中线程总数
int poolSize = executor.getPoolSize();
//队列中任务个数
int size = executor.getQueue().size();
Java通过Executors类提供了4种默认的线程池,但是在实际情况中要谨慎使用,因为他们都使用了无界的任务队列,存在OOM的风险。
1.newFixedThreadPool
固定线程数的线程池。核心线程数=最大线程数,无界任务队列
ExecutorService executorService = Executors.newFixedThreadPool(10);
# 实际定义
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.newCachedThreadPool
核心线程数=0
最大线程数=最大整数值
空闲时间=60秒
无界任务队列
即线程最多空闲时间是60秒,有任务提交时如果还有线程则直接执行,否则直接创建新线程执行。
ExecutorService executorService1 = Executors.newCachedThreadPool();
# 构造方法定义
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
疑问:既然优先将任务放入阻塞队列。对于newCachedThreadPool线程池设置核心线程数为0,最大线程数为最大整数,阻塞队列貌似也无界,它是怎么运行的?
解答:关键在于他使用的阻塞队列SynchronousQueue,它并不是无界的,而是容量为1的同步队列。生产者线程必须等待队列中的任务被消费后才能继续放入任务。所以放入第二个任务时就会开始创建线程执行第一个任务。那么如果只放入一个线程,那是不是就永远不会被执行?实测发现并不会出现这种情况,而是立即执行了。
关于这个问题找到了下面这个解答:
由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。
作者:go4it
链接:https://www.jianshu.com/p/b7f7eb2bc778
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
3.newSingleThreadExecutor
单线程线程池
核心线程数=最大线程数=1
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.newScheduledThreadPool
可以执行定时、延迟任务的线程池
可以执行两种任务,一种是定时周期性执行的;另一种是在固定延迟后执行一次。
这四种线程池知道是怎么回事就行。使用时要仔细判断是否适用自己的场景,特别是注意OOM问题。第四种定时执行在需要执行一些简单定时任务时还是可以使用的。
上图来自:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
提供了shutdown()和shutdownNow()两个方法来终止线程池。
其中shutdown()方法调用后,线程池不再接收新提交的任务,会等待已提交的任务执行完成后终止线程池,包括正在执行的和阻塞队列中的任务。但shutdown()方法是立即返回的,不会等到线程池终止才返回。
shutdownNow()会立即终止线程池,正在执行的线程会收到Interrupted中断,调用了线程的interrupt()方法。队列中未执行的任务会返回一个Runnables列表,对于其中的Future可以进行cancel等处理。