有时候,系统需要处理非常多的执行时间很短的请求
,如果每一个请求都开启一个新线程的话
,系统就要不断的进行线程的创建和销毁
,有时花在创建和销毁线程上的时间
会比线程真正执行的时间还长
。而且当线程数量太多时
,系统不一定能受得了
使用线程池
主要为了解决一下几个问题:
降低资源消耗:通过重用线程池中的线程,来减少每个线程创建和销毁的性能开销
提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一维护和管理,分配、调优和监控
线程池
的线程执行规则
跟任务队列
有很大的关系。
下面都假设任务队列没有大小限制:
如果线程数量 <= 核心线程数量,那么 直接启动一个核心线程 来执行任务,不会放入队列中。
如果线程数量 > 核心线程数,但 <= 最大线程数,并且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
如果线程数量 > 核心线程数,但 <= 最大线程数,并且任务队列是SynchronousQueue的时候,线程池会创建新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
如果线程数量 > 核心线程数,并且 > 最大线程数,当任务队列是LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是LinkedBlockingDeque并且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。
如果线程数量 > 核心线程数,并且 > 最大线程数,当任务队列是SynchronousQueue的时候,会因为线程池 拒绝添加 任务而抛出异常。
任务队列大小有限时
当LinkedBlockingDeque塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
SynchronousQueue没有数量限制。因为他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。
Executor是一个接口
,跟线程池有关的基本都要跟他打交道。下面是常用的ThreadPoolExecutor的关系
Executor接口
很简单,只有一个execute方法
ExecutorService
是Executor的子接口
,增加了一些常用的对线程的控制方法
,之后使用线程池主要也是使用这些方法
AbstractExecutorService
是一个抽象类
。ThreadPoolExecutor
就是实现了这个类
。
构造方法
构造方法参数说明
corePoolSize
核心线程数
,默认情况下核心线程会一直存活
,即使处于闲置状态也不会受存keepAliveTime限制
。除非将allowCoreThreadTimeOut设置为true。
当提交一个任务到线程池
时,线程池会创建一个线程来执行任务
,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建
。如果调用了prestartAllCoreThreads()方法
,线程池会提前创建并启动所有基本线程
maximumPoolSize
线程池所能容纳的最大线程数
。超过这个数的线程将被阻塞
。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效
。
如果队列满了
,并且已创建的线程数小于最大线程数
,则线程池会再创建新的线程执行任务
。值得注意的是,如果使用了无界的任务队列这个参数无效
keepAliveTime
非核心线程
的闲置超时时间
,工作线程空闲后保持存活的时间
,超过这个时间就会被回收
,如果任务很多,并且每个任务执行的时间比较短
,可以调大时间,提高线程利用率
unit
指定keepAliveTime的单位
,如TimeUnit.SECONDS
。当将allowCoreThreadTimeOut设置为true
时对corePoolSize生效
天(Days)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
workQueue
线程池中常用的三种任务队列
,SynchronousQueue
, LinkedBlockingDeque
, ArrayBlockingQueue
1 ArrayBlockingQueue
: 是一个基于数组结构
的有界阻塞队列
,按FIFO原则进行排序
2 LinkedBlockingQueue
: 一个基于链表结构
的阻塞队列
,吞吐量高于ArrayBlockingQueue。
静态工厂方法Excutors.newFixedThreadPool()
使用了这个队列
3 SynchronousQueue
: 一个不存储元素的阻塞队列
。每个插入操作必须等到另一个线程调用移除操作
,否则插入操作一直处于阻塞状态
,吞吐量高于LinkedBlockingQueue
,静态工厂方法Excutors.newCachedThreadPool()
使用了这个队列
4 PriorityBlockingQueue
: 一个具有优先级的无限阻塞队列
threadFactory
线程工厂
,提供创建新线程
的功能。ThreadFactory是一个接口
,只有一个方法
通过线程工厂可以对线程的一些属性进行定制,比如为每个创建出来的线程设置更有意义的名字
public interface ThreadFactory {
Thread newThread(Runnable r);
}
RejectedExecutionHandler
RejectedExecutionHandler也是一个接口
,只有一个方法,当线程池中的队列和线程池已经全部使用
,说明线程池处于饱和状态
,添加新线程被拒绝
时,会调用RejectedExecutionHandler的rejectedExecution方法
AbortPolicy : 直接抛出异常,默认情况下采用这种策略
CallerRunsPolicy :只用调用者所在线程来运行任务
DiscardOldestPolicy : 丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy : 不处理,丢弃掉
更多的时候,我们应该通过实现RejectedExecutionHandler 接口来自定义策略,比如记录日志或持久化存储等
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
可以使用execute和submit
两个方法向线程池提交任务
execute
方法用于提交不需要返回值的任务
,利用这种方式提交的任务无法得知是否正常执行
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
submit方法
用于提交一个任务并带有返回值
,这个方法将返回一个Future类型对象
。可以通过 这个返回对象判断任务是否执行成功
,并且可以通过future.get()
方法来获取返回值
,get()
方法会阻塞当前线程直到任务完成
Future> future = threadPoolExecutor.submit(futureTask);
Object value = future.get();
可以通过调用线程池的shutdown
或shutdownNow
方法来关闭线程池
。他们的原理是遍历线程池中的工作线程
,然后逐个调用线程的interrupt方法来中断线程
,所以无响应中断的任务可能永远无法停止
。
但是他们存在一定的区别
shutdownNow 首先 将线程池的状态 设置为STOP,然后 尝试停止 所有 正在执行或暂停任务 的线程,并 返回等待执行任务的列表
shutdown 只是 将线程池的状态 设置成 SHUTDOWN 状态,然后 中断 所有正在执行的任务
只要调用了这两个关闭方法的一个
,isShutdown就会返回true
。当所有的任务都关闭后
,才表示线程池关闭成功
,这时调用isTerminated方法会返回true
至于应该调用哪一种方法
来关闭线程池
,应该由提交到线程池的任务特性决定
,通常调用shutdown方法来关闭线程池
,如果任务不一定执行完,则可以调用shutdownNow方法
。
要想合理地配置线程池,首先要分析任务特性
任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
任务的优先级:高、中和低。
任务的执行时间:长、中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理
。
CPU密集型
任务应该配置尽可能少的线程
,如配置N+1
个线程,N位CPU
的个数。
IO密集型
任务线程并不是一直在执行任务
,则应配置尽可能多的线程
,如2*N
。
混合型任务
,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务
,只要这两个任务执行的时间相差不是太大
,那么分解后执行的吞吐量将高于串行执行的吞吐量
优先级不同的任务
可以交给优先级队列PriorityBlcokingQueue
来处理。
执行时间不同
的任务可以交给不同规模的线程池
来处理。
依赖数据库的任务
,因此线程提交SQL后需要等待数据库返回结果
,等待的时间越长,则CPU空闲时间越长,那么线程数应该设置的越大,这样能更好滴利用CPU。
首先构造一个线程池,用ArrayBlockingQueue
作为其等待队列
,队列初始化容量为10
。该线程池核心容量为 10,最大容量为20,线程存活时间为1分钟
static BlockingQueue blockingQueue=new ArrayBlockingQueue<>(10);
static ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(10, 20, 1, TimeUnit.MINUTES, blockingQueue);
构造一个实现Runable接口的类TaskWithoutResult
,其逻辑很简单,睡眠1秒
/**
* 无返回值的任务
*/
class TaskWithoutResult implements Runnable {
private int sleepTime=1000;//默认睡眠时间1s
public TaskWithoutResult(int sleepTime) {
this.sleepTime=sleepTime;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread()+"开始运行");
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {//捕捉中断异常
System.out.println("线程"+Thread.currentThread()+"被中断");
}
System.out.println("线程"+Thread.currentThread()+"结束运行");
}
}
/**
* 中断测试
*/
public static void test1() {
for(int i=0;i<10;i++) {
Runnable runnable=new TaskWithoutResult(1000);
threadPoolExecutor.submit(runnable);
}
//threadPoolExecutor.shutdown();//不会触发中断
threadPoolExecutor.shutdownNow();//会触发中断
}
分别测试shutdown和shutdownNow()
方法,结果shutdown()
方法的调用并不会引发中断
,而shutdownNow()
方法则会引发中断
。
这也正验证前面所说的,shutdown
方法只是发出了停止信号
,等所有线程执行完毕会关闭线程池
;而shutdownNow
则是立即停止所有任务
。
1 Spring是通过任务执行器(TaskExecutor)
来实现多线程和并发编程
2 使用ThreadPoolTaskExecutor
来创建一个基于线城池的TaskExecutor
3 实际开发
任务大多数情况下都是异步非阻塞的
。我们配置注解@EnableAsync可以开启异步任务
。然后在实际执行的方法上
配置注解@Async上声明是异步任务
。
同步交互:指发送一个请求, 需要等待返回, 然后才能够发送下一个请求,有个等待过程;
异步交互:指发送一个请求, 不需要等待返回, 随时可以再发送下一个请求,即不需要等待。
区别:一个需要等待,一个不需要等待。在部分情况下,我们的项目开发中都会优先选择不需要等待的异步交互方式。
ThredPoolTaskExcutor的处理流程
当池子大小小于corePoolSize,就新建线程,并处理请求
当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
引入 Maven 依赖
org.springframework.boot
spring-boot-starter
2.4.4
异步执行的配置类 AsyncConfig
/**
* @classname AsyncConfig
* @description 开启异步执行的配置类
*/
@Configuration
@EnableAsync //开启异步执行
@ComponentScan("com.melodyjerry.thread")
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//线程池中的线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix("SpringBoot线程池的前缀-");
//线程池的核心线程数大小
threadPoolTaskExecutor.setCorePoolSize(4);
//线程池的最大线程数
threadPoolTaskExecutor.setMaxPoolSize(8);
//等待队列的大小
threadPoolTaskExecutor.setQueueCapacity(25);
//执行初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
异步任务的执行类
/**
* @classname AsyncTaskService
* @description 异步任务的执行类
*/
@Service
public class AsyncTaskService {
@Async //异步方法 不加的话就是同步方法
public void executeAsyncTask(Integer i) {
System.out.println(Thread.currentThread().getName() + ": "+i);
}
@Async //异步方法
//@Async("getAsyncExecutor")
public void executeAsyncTaskPlus(Integer i) {
System.out.println(Thread.currentThread().getName() + "+1: " + (i+1));
}
}
@Async注解表明该方法是个异步方法。
从Async注解接口可以看到,Target即可以在方法也可以在类型上,如果注解在类型上,表明该类所有的方法都是异步方法。
@Async("getAsyncExecutor")注解,它是刚刚我们在线程池配置类的里的那个配置方法的名字 getAsyncExecutor() ,加上这个后每次执行这个方法都会开启一个线程放入线程池中
测试 TestThreadApplication
/**
* @classname TestThreadApplication
* @description 测试异步任务
*/
@SpringBootApplication
public class TestThreadApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(TestThreadApplication.class, args);
AsyncTaskService asyncTaskService = context.getBean(AsyncTaskService.class);
for (int i = 0; i < 10; i++) {
asyncTaskService.executeAsyncTask(i);
asyncTaskService.executeAsyncTaskPlus(i);
}
System.out.println("This Program has Begun successfully");
}
}
或者
@RunWith (SpringRunner.class)
@SpringBootTest
public class SpringbootLearnApplicationTests {
@Autowired
private AsyncTaskService asyncTaskService;
@Test
public void contextLoads() {
}
@Test
public void threadTest() {
for (int i = 0; i < 20; i++) {
asyncTaskService.executeAsyncTask(i);
}
}
}
异步
同步
案例:用户在商品列表进行检索,结果集大约有100W商品,点击批量上架/下架。
文本里面有很多行url地址,需要的字段都包含在这些url中。最开始是使用的正常的普通方式去写入,但是量太大了,所以就尝试使用多线程来写入
ThreadPoolTaskExecutor多线程批量插入百万级数据
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦