执行器(Executor, ExecutorService),
线程池(ThreadPoolExecutor, ScheduledThreadPoolExecutor, ForkJoinPool),
线程,线程创建工厂
任务队列(无界队列-LinkedBlockingQueue, 同步队列-SynchronousQueue, 延迟队列-DelayedWorkQueue),
任务(Runnable, Callable-FutureTask),任务拒绝策略
关闭线程池
5个以上线程时使用线程池
alibaba要求自定义线程池,不适用Executors工具类
/**
* 自定义线程池(阿里推荐)
* 1. 明确使用什么线程池
* 2. 得到线程数:明确I/O密集型还是CPU密集型
* 3. 确定任务队列
* 4. 线程工厂
* 5. 任务拒绝策略
*
* 线程数(CPU密集型)=CPU核心数的1到2倍
* 线程数(I/O密集型)=CPU核心数 × (1+平均等待时间/平均工作时间),IO时间越长线程数量越大
* java获取CPU核心数:Runtime.getRuntime().availableProcessors()
*/
public static void main(String[] args) {
/**
* corePoolSize:核心线程数,线程池维持的线程数
* maximumPoolSize:池中最多线程数
* keepAliveTime:大于核心线程数的线程存活时间
* unit:存活时间单位
* workQueue:任务队列
* threadFactory:线程工厂
* RejectedExecutionHandler:任务拒绝策略处理器
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread th = new Thread(r, "threadName");
if (th.getPriority() != Thread.NORM_PRIORITY)
th.setPriority(Thread.NORM_PRIORITY);
return th;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 日志
System.out.println("任务" + r.toString() + "被拒绝");
}
});
for (int i = 1; i < 10; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("线程:" + Thread.currentThread().getName() + " 执行耗时任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 主线程执行任务
for (int i = 0; i < 1; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
threadPoolExecutor.shutdown();
}
// Executors.newSingleThreadExecutor/execute(Runnable command)
public static void main(String[] args) {
Runnable duty = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
System.out.println("Runnable执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
/**
* new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
* corePoolSize=1, maximumPoolSize=1, keepAliveTime=0L, TimeUnit.MILLISECONDS
* new LinkedBlockingQueue(){this(Integer.MAX_VALUE);}
*/
ExecutorService executorService = Executors.newSingleThreadExecutor();
// execute(Runnable command) 只能传入Runnable
executorService.execute(duty);
// 主线程执行任务
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
System.out.println("main执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
executorService.shutdown();
}
// Executors.newSingleThreadExecutor/submit(Callable task)
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> duty = new Callable<String>() {
@Override
public String call() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
System.out.println("Callable执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "Callable任务执行完成";
}
};
/**
* new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
* corePoolSize=1, maximumPoolSize=1, keepAliveTime=0L, TimeUnit.MILLISECONDS
* new LinkedBlockingQueue(){this(Integer.MAX_VALUE);}
*/
ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* submit(Callable task)
* submit(Runnable task, T result)
* submit(Runnable task)
*/
Future<String> future = executorService.submit(duty);
// 主线程执行任务
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("主线程执行完任务");
LocalDateTime start = LocalDateTime.now();
// futureTask.get()会阻塞调用线程,这里阻塞主线程
System.out.println("查看Callable线程执行结果:" + future.get());
LocalDateTime end = LocalDateTime.now();
Duration duration = Duration.between(start, end);
System.out.println("主线程运行完毕,主线程被阻塞时间ms:" + duration.toMillis());
// 关闭线程池,否则线程池会维持核心线程数一直运行
executorService.shutdown();
}
public static void main(String[] args) {
/**
* new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
* corePoolSize=3, maximumPoolSize=3, keepAliveTime=0L, TimeUnit.MILLISECONDS
* new LinkedBlockingQueue(){this(Integer.MAX_VALUE);}
*/
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
Runnable duty = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executorService.execute(duty);
}
// 主线程执行任务
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
executorService.shutdown();
}
public static void main(String[] args) {
/**
* new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue())
* corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE, keepAliveTime=60L, TimeUnit.SECONDS
* new SynchronousQueue(){this(false);} true-公平,线程FIFO获取任务,false-不公平,线程争抢获取任务
*/
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 3; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 主线程执行任务
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
executorService.shutdown();
}
public static void main(String[] args) {
/**
* new ScheduledThreadPoolExecutor(corePoolSize)
* corePoolSize=3, maximumPoolSize=Integer.MAX_VALUE, keepAliveTime=0L, NANOSECONDS
* new DelayedWorkQueue(){this(false);} true-公平,线程FIFO获取任务,false-不公平,线程争抢获取任务
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
/**
* scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
* initialDelay:初始延迟时间
* period:执行间隔
* 执行耗时>执行间隔:任务执行完成后立马开始执行下一轮任务
* 执行耗时<等待时间:在任务执行完成后等待(等待时间-执行耗时),再执行下一轮任务
*/
for (int i = 0; i < 3; i++) {
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1, 2, TimeUnit.SECONDS);
}
// 主线程执行任务
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
scheduledExecutorService.shutdown();
}
public static void main(String[] args) {
/**
* new ScheduledThreadPoolExecutor(corePoolSize)
* corePoolSize=3, maximumPoolSize=Integer.MAX_VALUE, keepAliveTime=0L, NANOSECONDS
* new DelayedWorkQueue(){this(false);} true-公平,线程FIFO获取任务,false-不公平,线程争抢获取任务
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
/**
* scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
* initialDelay:初始延迟时间
* delay:延迟时间
* 任务执行完成之后延迟delay时间再执行下一轮
*/
for (int i = 0; i < 3; i++) {
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "开始执行任务时间" + LocalDateTime.now().getSecond());
try {
Thread.sleep(1000);
System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1, 1, TimeUnit.SECONDS);
}
// 主线程执行任务
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
scheduledExecutorService.shutdown();
}
public static void main(String[] args) {
/**
* new ScheduledThreadPoolExecutor(corePoolSize)
* corePoolSize=3, maximumPoolSize=Integer.MAX_VALUE, keepAliveTime=0L, NANOSECONDS
* new DelayedWorkQueue(){this(false);} true-公平,线程FIFO获取任务,false-不公平,线程争抢获取任务
*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
/**
* schedule(Runnable command, long delay, TimeUnit unit)
* delay:延迟时间
* 在给定的延迟时间后执行1次任务
*/
System.out.println("当前时间:" + LocalDateTime.now().getSecond());
for (int i = 0; i < 3; i++) {
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "开始执行任务时间" + LocalDateTime.now().getSecond());
try {
Thread.sleep(1000);
System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 2, TimeUnit.SECONDS);
}
// 主线程执行任务
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
scheduledExecutorService.shutdown();
}
newWorkStealingPool()本质上是ForkJoinPool
工作窃取算法:在线程池中还有线程可以提供服务的时候帮忙分担一些已经被分配给某一个线程的耗时任务
适合于有大量耗时任务的情况
ForkJoinPool线程池中的线程是守护线程,主线程任务执行完毕,所有守护线程都将停止
public static void main(String[] args) {
/**
* new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)
* ForkJoinPool线程池中的线程是守护线程,主线程任务执行完毕,所有守护线程都将停止
* new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)
*/
ExecutorService executorService = Executors.newWorkStealingPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("线程:" + Thread.currentThread().getName() + " 执行耗时任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 主线程执行任务
for (int i = 0; i < 1; i++) {
try {
Thread.sleep(1000);
System.out.println("主线程执行任务" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池,否则线程池会维持核心线程数一直运行
executorService.shutdown();
}
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 停止ExecutorService添加新的任务, 继续执行正在执行的任务.
executorService.shutdown();
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 立即停止线程池,正在执行的任务也就关闭
executorService.shutdownNow();
5个以上线程时使用线程池

核心线程数》任务队列满》最大线程数》任务拒绝策略;只有在队列满情况下才会创建大于corePoolSize的线程
ThreadPoolExecutor:
ScheduledThreadPoolExecutor
ForkJoinPool
BlockingQueue
ArrayBlockingQueue implements BlockingQueue
LinkedBlockingQueue implements BlockingQueue
SynchronousQueue implements BlockingQueue
DelayQueue implements BlockingQueue
DelayedWorkQueue implements BlockingQueue
线程数(CPU密集型)=CPU核心数的1到2倍
线程数(I/O密集型)=CPU核心数 × (1+平均等待时间/平均工作时间),IO时间越长线程数量越大
AbortPolicy(中断策略):
DiscardPolicy(丢弃策略):
DiscardOldestPolicy(丢弃最老的):
CallerRunsPolicy(调用者运行):
构造方法入参:核心线程数、最大线程数、存活时间、任务队列
Executors.newSingleThreadExecutor()
Executors.newFixedThreadPool(10)
Executors.newCachedThreadPool()
Executors.newScheduledThreadPool(5)
Executors.newWorkStealingPool(5)
自定义线程池:
shutdown():停止ExecutorService添加新的任务, 但是老任务还是会继续执行.
isShutdown():
isTerminated():
无界队列LinkedBlockingQueue,因为当处理任务的速度赶不上任务提交的速度的时候会造成OOM问题
无界线程池newCachedThreadPool、newScheduledThreadPool,因为线程的无限创建而导致OOM问题
因为:Executors.newSingleThreadExecutor的corePoolSize=1, maximumPoolSize=1,线程池中只有1个线程,因此任务来了只能放到任务队列中。
因为:ExecutorService executorService = Executors.newFixedThreadPool(3);的corePoolSize=3=maximumPoolSize=3,核心线程数和最大线程数相等
因为:-核心线程数=0,最大线程数=Integer.MAX_VALUE,新任务来了将直接创建新线程执行这个新任务,使用SynchronousQueue队列中转新任务,新线程创建完成后将马上执行新任务,不会在任务队列中存储很久,因此使用同步队列中转一会儿新任务即可
因为:CachedThreadPool使用同步队列SynchronousQueue中转任务,强调任务能够被新的线程执行
因为:调度线程池中线程需要执行的任务是定时任务,会在队列中存储一段时间,因此使用延迟队列DelayedWorkQueue