线程池:创建并维护一定数量的空闲线程,当有需要执行的任务,就交付给线程池中的一个线程,任务执行结束后,该线程也不会死亡,而是回到线程池中重新变为空闲状态。
线程池优点:
1、重用线程池中的线程,避免频繁创建和销毁线程所带来的内存开销。
2、有效控制线程的最大并发数,避免因线程之间抢占资源而导致的阻塞现象。
3、能够对线程进行简单的管理,提供定时执行以及指定时间间隔循环执行等功能。
ThreadPoolExecutor
是线程池的实现,先来个小demo,核心代码如下:
private void threadPoolTest() {
//创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4 , 3, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3), new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 5; i++) {
String data = "task@" + i;
Log.i(TAG, data);
//提交任务
threadPoolExecutor.execute(new ThreadPoolRunnable(data));
}
Log.i(TAG, "start test ---------------");
}
class ThreadPoolRunnable implements Runnable {
private Object taskData;
public ThreadPoolRunnable(Object taskData) {
this.taskData = taskData;
}
@Override
public void run() {
Log.i(TAG, "run - runnableData: " + taskData + " | " + Thread.currentThread());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
taskData = null;
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
corePoolSize (int ):核心线程的个数。线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。如果调用了线程池的prestartAllCoreThreads()
或prestartCoreThread()
方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize (int ):线程池所能容纳的最大线程数。当活动线程数达到这个数值后,后续的新任务将会被阻塞。该值等于核心线程数量 + 非核心线程数量。如果任务队列使用无界的阻塞队列,该参数没有什么效果。
有界阻塞队列和无界阻塞队列含义和区别:
阻塞队列有一个非常重要的属性:容量的大小,分为有界和无界两种。无界阻塞队列:队列容量很大,近似无上界,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量。
有界阻塞队列:队列容量有上界,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
keepAliveTime (long):非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。如果设置allowCoreThreadTimeOut(true)
,则会也作用于核心线程。
unit (TimeUnit ):keepAliveTime
的时间单位。
workQueue(BlockingQueue):线程池中的任务队列,通过线程池的execute
方法提交的Runnable
对象存储在这个参数中,遵循先进先出原则。
threadFactory(ThreadFactory ):创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,默认使用Executors.defaultThreadFactory()
来创建线程,线程具有相同的NORM_PRIORITY
优先级并且是非守护线程。
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
四种拒绝处理的策略如下:
使用总结:
demo中的构造代码如下,声明了2个核心进程,最大进程数为4,非核心进程空闲超时时长是3s。阻塞队列使用LinkedBlockingDeque
,并指定大小为3,防止队列无限膨胀,拒绝处理策略为DiscardOldestPolicy
,丢弃队列头部(最旧的)的任务。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4 , 3, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3), new ThreadPoolExecutor.DiscardOldestPolicy());
1、ArrayBlockingQueue(常用)
ArrayBlockingQueue
内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列。ArrayBlockingQueue
在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,没有实现读写分离。ArrayBlockingQueue
时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。ArrayBlockingQueue 使用 ReentrantLock 的公平锁和非公平锁来实现该功能。简单理解就是,ReentrantLock 内部会维护一个有先后顺序的等待队列,假如有五个任务一起过来,都被阻塞了。如果是公平的,则等待队列中等待最久的任务就会先进入阻塞队列。如果是非公平的,那么这五个线程就需要抢锁,谁先抢到,谁就先进入阻塞队列。
2、LinkedBlockingQueue(常用)
LinkedBlockingQueue
不指定队列的大小时,默认值是 Integer.MAX_VALUE
。但是,建议指定一个固定大小。因为,如果生产者的速度比消费者的速度大的多的情况下,这会导致阻塞队列一直膨胀,直到系统内存被耗尽。LinkedBlockingQueue
实现了读写分离,可以实现数据的读和写互不影响,这在高并发的场景下,对于效率的提高无疑是非常巨大的。3、SynchronousQueue
size
为0
。当执行插入元素的操作时,必须等待一个取出操作。也就是说,put
元素的时候,必须等待 take
操作。Excutors.newCachedThreadPool
方法用的就是这种队列,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。4、PriorityBlockingQueue
11
。也可以传入一个比较器,把元素按一定的规则排序,不指定比较器的话,默认是自然顺序。PriorityBlockingQueue
是基于二叉树最小堆实现的,每当取元素的时候,就会把优先级最高的元素取出来。5、DelayQueue
ThreadPoolExecutor
类中使用了一些final int
常量变量来表示线程池的状态 :
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
1、RUNNING
2、SHUTDOWN
3、STOP:
shutdownNow()
方法后,线程池的状态就会由RUNNING或SHUTDOWN转为STOP;4、TIDYING:
workCount
(有效线程数)为0
。5、TERMINATED:
1、newFixedThreadPool()
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
nThreads
。keepAliveTime = 0L
,即使线程池中的线程空闲,也不会被回收。除非调用shutDown()
或shutDownNow()
去关闭线程池。LinkedBlockingQueue()
链表结构的阻塞队列),意味着任务可以无限加入线程池。2、newScheduledThreadPool()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//默认闲置超时回收时常
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
10s
会被回收。3、newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ThreadPool
线程池,当前类型为ThreadExecutor
,执行线程。(相当于newFixedThreadPool
传入线程个数是1)。4、newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
SynchronousQueue()
队列,有新任务时使用空闲线程执行,没有空闲线程则创建新的线程来处理。基础知识差不多了,可以撸代码了,以下为核心代码:
public void fixedThreadPoolTest() {
Log.i(TAG, "----fixedThreadPoolTest----");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
String data = "task@" + i;
Log.i(TAG, data);
fixedThreadPool.execute(new ThreadPoolRunnable(data));
}
}
public void cachedThreadPoolTest() {
Log.i(TAG, "----cachedThreadPoolTest----");
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
String data = "task@" + i;
Log.i(TAG, data);
cachedThreadPool.execute(new ThreadPoolRunnable(data));
}
}
public void singleThreadExecutorTest() {
Log.i(TAG, "----singleThreadExecutorTest----");
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
String data = "task@" + i;
Log.i(TAG, data);
singleThreadExecutor .execute(new ThreadPoolRunnable(data));
}
}
public void scheduledThreadPoolTest() {
Log.i(TAG, "----scheduledThreadPoolTest----");
//这里创建的是ScheduledExecutorService对象,ScheduledExecutorService是ExecutorService的子类
mScheduledThreadPool = Executors.newScheduledThreadPool(2);
for (int i = 0; i < 5; i++) {
String data = "task@" + i;
Log.i(TAG, data);
// 1000ms后执行runnable
mScheduledThreadPool.schedule(new ThreadPoolRunnable(data), 1000, TimeUnit.MILLISECONDS);
}
}
public void scheduledAtFixedRateTest() {
Log.i(TAG, "----scheduledAtFixedRateTest----");
//这里创建的是ScheduledExecutorService对象,ScheduledExecutorService是ExecutorService的子类
mScheduledThreadPool = Executors.newScheduledThreadPool(2);
// 1s后,每2s执行一次runnable
mScheduledThreadPool.scheduleAtFixedRate(new ThreadPoolRunnable(666), 1000, 2000, TimeUnit.MILLISECONDS);
}
public void scheduledCancelTest() {
Log.i(TAG, "----scheduledCancelTest----");
if (mScheduledThreadPool != null) {
mScheduledThreadPool.shutdown();
}
}
class ThreadPoolRunnable implements Runnable {
private Object taskData;
public ThreadPoolRunnable(Object taskData) {
this.taskData = taskData;
}
@Override
public void run() {
Log.i(TAG, "run - runnableData: " + taskData + " | " + Thread.currentThread());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
taskData = null;
}
}
运行结果:
cachedThreadPoolTest
函数,看下图结果可知,一共新建了5个线程来执行这5个任务,所以newCachedThreadPoo()
只适合耗时较短的任务,否则会一直创建新线程。
参考文章:
线程池10:线程池的5种状态;
常用阻塞队列 BlockingQueue 有哪些?
Android多线程:理解和简单使用总结
Android 线程池
全方位解析-Android中的线程池