ThreadPoolExecutor 是线程池的底层实现,如下所示
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ThreadPoolExecutor 具有 7 个主要参数
示意图
线程池行为模式
预设拒绝策略
有下列预设拒绝策略:
AbortPolicy 示例
ExecutorService pool = new ThreadPoolExecutor(
2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
CallerRunsPolicy 示例
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(
2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 0; i < 10; i++) {
pool.execute(()->{
System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
});
}
}finally {
pool.shutdown();
}
}
DiscardOldestPolicy 示例
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(
2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
try {
for (int i = 0; i < 10; i++) {
int finalI = i;
pool.execute(()->{
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+" is working for: "+ finalI);
});
}
}finally {
pool.shutdown();
}
}
说明
0、1 任务进入核心线程
2、3、4 进入任务队列并将其填满
5、6 进入任务队列未果,创建新线程并进入新线程
7、8、9 进入任务队列未果,创建新线程未果,触发拒绝策略
拒绝策略从最久的任务开始扔,扔了 3 个,正好把 7、8、9 放进去
7、8、9 被完成上一轮任务的线程消费
DiscardPolicy 示例
ExecutorService pool = new ThreadPoolExecutor(
2,4,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
说明
道理和 DiscardOldestPolicy 基本一致,只不过丢弃的是最新的任务
Executors 是线程池的工具类
名字 | 说明 | 特点 | 适用场景 |
---|---|---|---|
newFixedThreadPoo | 固定线程数的线程池 | 线程数量固定 | 执行长期任务,性能较好 |
newSingleThreadExecutor | 单线程线程池 | 线程数量固定为 1 | 需要任务按序单个消费的场景 |
newCachedThreadPool | 缓存线程池 | 自动扩容 | 短期执行很多异步小任务 或负载较轻的服务 的场景 |
newScheduledThreadPool | 任务调度线程池 | ||
newWorkStealingPool | |||
阿里规范
【强制】 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
允许的请求处理队列长度是Integer.MAX_VALUE,可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
允许的线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
newFixedThreadPoo
public static void main(String[] args) {
// demo 中简单示例,从简了,下同
ExecutorService pool = Executors.newFixedThreadPool(5);
try {
for (int i = 0; i < 15; i++) {
pool.execute(()->{
System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
});
}
}finally {
pool.shutdown();
}
}
newSingleThreadExecutor
public static void main(String[] args) {
ExecutorService pool = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 15; i++) {
pool.execute(()->{
System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
});
}
}finally {
pool.shutdown();
}
}
newCachedThreadPool
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 15; i++) {
pool.execute(()->{
System.out.println(Thread.currentThread().getName()+" is working: "+System.currentTimeMillis());
});
}
}finally {
pool.shutdown();
}
}
newScheduledThreadPool
自定义方式
使用 ThreadPoolExecutor 手动指定相关参数即可
参考上文 主要参数
ExecutorService pool = new ThreadPoolExecutor(
2,10,120L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(30),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
线程池各参数配置原则
需要参考线程池处理的业务的类型
CPU 密集型
定义
需要大量计算,其性能瓶颈在于主要在于 CPU 的计算能力
这种业务主要是 CPU 在工作,很少阻塞,CPU 全速运转
原则
CPU 密集型业务需要尽量少的线程数量
目的是为了减少 CPU 切换线程
推荐值
CPU 核数 + 1
理论上应该 == CPU 核心数
但业务在处理中,即使再 CPU 密集,也不可能一点 IO 都没有
当 CPU 等待此线程 IO 时,可以处理另一个线程的计算,因此 +1
服务器的逻辑 cpu 核数 可以通过下面代码或指令获取
Runtime.getRuntime().availableProcessors();
cat /proc/cpuinfo| grep "processor"
注意
只有在多核 CPU 时才能得到性能提升
相当于以前只用一个 CPU 核,现在用多个核一起计算
单核 CPU 即使使用多线程也会受限于物理 CPU 总算力没有提升
类似网络带宽只有 1 M,买了迅雷白金也不可能提高下载速度
IO 密集型
定义
需要大量 IO,等待 IO 就绪时,CPU 可以处理其他线程的计算
原则
可以多配置线程,以便把 CPU 等待 IO 的时间利用起来
但同时还要考虑当前服务在服务器(或容器)上是否独占,独占说明整个服务器或容器的资源都是当前服务的,否则需要根据经验取一个保守值
还可以根据 业务流量 和 服务点数 估算
也可以依赖压测,从实际触发
推荐值
计算方式 1 :CPU 核数 * 2
计算方式 2 :CPU 核数 / CPU 执行率
计算方式 3 : 每秒业务量 * 每业务评价处理时间 / 服务节点数量(相当于计算得把 1 秒当 几秒使才能满足业务,然后按服务容器个数均分)
服务非服务器独占时使用 1
服务是服务器独占时使用 2
情况复杂时使用 3,因为反正业务需要处理完,物理资源就这么多
CPU 执行率 = 单位时间中处理线程池业务的时间/总时间
CPU 执行率的估算方式