为什么要用线程池, 什么情况下要用线程池;
当线程涉及到频繁的创建于销毁时, 适合使用线程池;
如果线程只涉及单纯的创建, 并没有销毁时, 直接创建既可; (例如在创建长连接时, 保持心跳的线程、接收服务端消息推送的线程);
线程池的作用
创建线程池
线程池的真正实现类是 ThreadPoolExecutor
根据上面流程我们知道 线程池的最大容量 = maximumPoolSize(最大线程数) + workQueue(任务队列最大容量), 如果线程数超过线程池的最大容量就会执行handler(拒绝策略), 当线程闲置时会根据**超时时间(keepAliveTime)**回收线程, 那么我们带着这几个结论来测试线程池;
测试代码:
下面的代码创建了一个 核心线程数为3, 最大线程数为5, 超时时间为1s, 超时单位为秒, 任务队列长度为2, 拒绝策略为抛异常的线程池;
package org.example;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5,
1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
// 向线程池提交任务
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "->办理业务");
});
}
// 关闭线程池
threadPoolExecutor.shutdown();
// threadPoolExecutor.shutdownNow();
}
}
执行这段代码我们会发现, for创建了5个线程, 核心为3, 队列为2, 那么线程池不会扩容, 任务由三个线程抢占消费;
接下来我们让for创建7个线程, 发现当核心和队列满了之后, 线程池会进行扩容到最大线程数;
接下来我们让for创建8个线程, 发现当线程池最大容量满了之后会执行拒绝策略;
接下来我们测试线程限制超时后被回收
package org.example;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5,
1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 7; i++) {
// 向线程池提交任务
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "->办理业务");
});
}
System.out.println("回收前线程数量: " + threadPoolExecutor.getPoolSize());
// 让当前线程等待3秒, 让线程闲置超时
Thread.sleep(3000);
// 获取当前线程数量
System.out.println("回收后线程数量: " + threadPoolExecutor.getPoolSize());
// 关闭线程池
threadPoolExecutor.shutdown();
// threadPoolExecutor.shutdownNow();
}
}
结果我们可以看到, 线程池启动的5个线程, 当闲置超时候非核心线程会被回收(如果将 allowCoreThreadTimeout 设置为 true 时, 核心线程也会超时回收; ), 然后我们获取当前线程数量发现只有三个核心线程;
任务队列是基于阻塞队列实现的, 即采用生产者消费者模式, 在 Java 中需要实现 BlockingQueue 接口; 但 Java 已经为我们提供了 7 种阻塞队列的实现:
有界队列: 可以指定队列大小, 当队列饱和时并超过最大线程数时就会执行拒绝策略;
无界队列: 任务队列永远都可以添加任务, 直到资源耗尽, 所以设置 maximumPoolSize 没有任何意义;
阻塞队列: 在队列为空时, 获取元素的线程会等待队列变为非空, 当队列满时, 存储元素的线程会等待队列可用;
支持排序: PriorityBlockingQueue,
延时执行: DelayQueue
先进先出|先进后出: LinkedBlockingDeque
当线程池的线程数达到最大线程数时, 需要执行拒绝策略; 拒绝策略需要实现 RejectedExecutionHandler 接口, 并实现 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法; 不过 ThreadPoolExecutor 已经为我们实现了 4 种拒绝策略:
CallerRunsPolicy策略源码: ThreadPoolExecutor创建了内部类并实现RejectedExecutionHandler 接口;
Executors已经为我们封装好了 4 种常见的功能线程池, 如下:
源码:
从源码我们可以看出来这四种线程池都是hreadPoolExecutor的封装类, 实际上就是java创建者给我们提供了四种常见的线程池;
使用示例:
public class Main1 {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
fixedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "->办理业务");
});
}
fixedThreadPool.shutdown();
}
}
源码:
使用示例:
public class Main3 {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
Runnable task = () -> System.out.println(Thread.currentThread().getName() + "->办理业务");
// 延迟三秒后执行
// scheduledThreadPool.schedule(task, 3, TimeUnit.SECONDS);
// 延迟三秒后执行 每个1秒执行任务
scheduledThreadPool.scheduleAtFixedRate(task, 3, 1, TimeUnit.SECONDS);
}
}
源码:
使用示例:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
Runnable task = () -> System.out.println(Thread.currentThread().getName() + "->办理业务");
cachedThreadPool.execute(task);
cachedThreadPool.shutdown();
源码:
使用示例:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
Runnable task = () -> System.out.println(Thread.currentThread().getName() + "->办理业务");
singleThreadExecutor.execute(task);
singleThreadExecutor.shutdown();
Executors 的 4 个功能线程池虽然方便, 但现在已经不建议使用了, 而是建议直接通过使用 ThreadPoolExecutor 的方式, 这样的处理方式让同学更加明确线程池的运行规则, 规避资源耗尽的风险;
其实 Executors 的 4 个功能线程有如下弊端:
FixedThreadPool 和 SingleThreadExecutor:主要问题是堆积的请求处理队列均采用 LinkedBlockingQueue, 可能会耗费非常大的内存, 甚至 OOM;
CachedThreadPool 和 ScheduledThreadPool:主要问题是线程数最大数是 Integer.MAX_VALUE, 可能会创建数量非常多的线程, 甚至 OOM;
自定义线程池步骤:
使用示例:
public class ThreadPoolExecutorTest {
private static ThreadPoolExecutor threadPool = null;
public static void main(String[] args) {
threadPool = getThreadPool(
3, 5, 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), customFactory(), new BlockPolicy());
for (int i = 0; i < 10; i++) {
threadPool.execute(task());
}
// 关闭池
poolShutdown();
// 测试处理策略 调用shutdown后 后续任务会直接进入处理策略
for (int i = 0; i < 10; i++) {
threadPool.execute(task());
}
}
/**
* 创建线程池
*
* @param corePoolSize 核心线程数 保留在池中的线程数, 即使它们是空闲的, 除非设置了allowCoreThreadTimeOut(true);
* @param maximumPoolSize 最大线程数 池中允许的最大线程数;
* @param keepAliveTime 超时时间 当线程数大于corePoolSize时, 这是多余空闲线程在终止前等待新任务的最大时间;
* @param unit 超时时间单位;
* @param workQueue 任务的队列 在任务执行前用于保存任务的队列; 这个队列只保存execute()方法提交的Runnable任务;
* @param threadFactory 当执行程序创建新线程时使用的工厂;
* @param handler 处理策略 当执行因到达线程边界和队列容量而被阻塞时使用的处理程序;
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPool(
int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
return new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* 线程任务
*
* @return Runnable
*/
public static Runnable task() {
return () -> {
// 业务逻辑
System.out.println(Thread.currentThread().getName() + "正在执行线程任务");
};
}
/**
* 返回用于创建新线程的线程工厂;
*
* @return CustomThreadFactory
*/
public static CustomThreadFactory customFactory() {
return new CustomThreadFactory();
}
/**
* explain:自定义线程工厂
*
* @author Hope
* @date 2022/6/22
*/
public static class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory() {
this.namePrefix = ThreadPoolExecutorTest.class.getSimpleName() + "-" +
poolNumber.getAndIncrement() + "-" + CustomThreadFactory.class.getSimpleName() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = namePrefix + threadNumber.getAndIncrement();
t.setName(threadName);
return t;
}
}
/**
* explain:自定义异常处理策略
*/
public static class BlockPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 线程池如果开始执行Shutdown之后还有任务添加 那么线程池会执行处理策略
if (executor.isShutdown()) {
return;
}
// 核心改造点, 由BlockingQueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 关闭线程池
*/
public static void poolShutdown() {
if (threadPool != null) {
threadPool.shutdown();
}
}
}
参考链接:https://jimmysun.blog.csdn.net/article/details/95225769