• 【Java】实现一个简单的线程池


      📝个人主页:哈__

    期待您的关注 

    一、线程池的模式

    线程池顾名思义就是管理线程的一个池子,我们把创建线程的过程交给线程池来处理,而这个线程池当中的线程都会从阻塞队列当中取获取任务执行。

    我们不在直接把任务的创建过程写到我们初始化的线程对象中,而是通过调用线程池的execute()方法,同时把我们的具体任务交作为参数传给线程池,之后线程池就会把任务添加到阻塞队列当中,而线程池当中的线程会从阻塞队列当中获取任务并执行。

    二、线程池的一些参数 

    1. corePoolSize:线程池核心线程大小,即最小线程数(初始化线程数)。线程池会维护当前数量的线程在线程池中,即使这些线程一直处于闲置状态,也不会被销毁,除非设置了allowCoreThreadTimeOut。
    2. maximumPoolSize:线程池最大线程数量。当任务提交到线程池后,如果当前线程数小于核心线程数,则会创建新线程来处理任务;如果当前线程数大于或等于核心线程数,但小于最大线程数,并且任务队列已满,则会创建新线程来处理任务。
    3. keepAliveTime:空闲线程的存活时间。当线程池中的线程数量大于核心线程数且线程处于空闲状态时,在指定时间后,这个空闲线程将会被销毁,从而逐渐恢复到稳定的核心线程数数量。
    4. unit:keepAliveTime的存活时间的计量单位,通常使用TimeUnit枚举类中的方法,如TimeUnit.SECONDS表示秒级。
    5. workQueue:任务队列。用于存放等待执行的任务,常见的实现类有LinkedBlockingQueue、ArrayBlockingQueue等。
    6. threadFactory:线程工厂。用于创建新的线程,可以自定义线程的名称、优先级等。
    7. handler:拒绝策略。当任务无法执行(如线程池已满)时,可以选择的策略有:AbortPolicy(抛出异常)、CallerRunsPolicy(调用者运行)、DiscardOldestPolicy(丢弃最老的任务)、DiscardPolicy(无声丢弃)。

    三、代码实现

    因为我们只是简单的实现,所以有一些情况和实际不太相似。

    1.BlockingQueue

    先来看看我们阻塞队列当中的一些参数,为了在多线程环境下防止并发问题,我使用了ReentrantLock,使用它的目的是为了创建多个不同的阻塞条件。

    在我们调用一个对象的await()方法后,我们的当前线程就会加入到一个特定的队列当中去等待,直到有调用了这个对象的notify()方法后才会从这个队列中抽取一个线程唤醒。

    举个例子,我们去医院的时候,一个医生同一时间只能看一个病人,剩下的人都只能等待,如果只有一个大厅的话,看不同病的病人都只能等待在一个候诊室中。使用ReentrentLock的意思就是为了创建多个不同的候诊室,将不同医生的病人分开在不同的候诊室当中。

    1. //1.阻塞队列
    2. private Deque deque = new ArrayDeque<>();
    3. //2.实现阻塞的锁
    4. private ReentrantLock lock = new ReentrantLock();
    5. //3. 生产者等待条件
    6. private Condition fullWaitSet = lock.newCondition();
    7. //4.消费者等待条件
    8. private Condition emptyWaitSet = lock.newCondition();
    9. //5.阻塞队列的大小
    10. private int CAPACITY;

    在自定义的阻塞队列中,我使用了一个双向队列来存储任务,并且设置了一个队列大小的属性,在我们创建这个队列的时候我们可以进行初始化。

    先来看看阻塞队列任务的添加过程。这个逻辑并不难,我们在代码的上方上锁,在finally中解锁。如果这时我们的队列是满的,就无法在继续添加任务了,这个时候我们就把当前线程挂起(注意我们的挂起条件)。如果队列不是满的话那我们就加入到队尾,同时把另一类挂起的线程唤醒(这类线程在队列为空的时候挂起,等待任务的添加)

    1. // 生产者放入数据
    2. public void put(T t) {
    3. lock.lock();
    4. try {
    5. while (deque.size() == CAPACITY) {
    6. fullWaitSet.await();
    7. }
    8. deque.addLast(t);
    9. emptyWaitSet.signal();
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. } finally {
    13. lock.unlock();
    14. }
    15. }

    在看看我们取任务的过程。同样加锁,当我们的队列为空的时候,线程挂起,等待任务的添加之后线程唤醒,如果队列不为空的话,我们从队列头部取出一个任务,并且唤起一类线程(这类线程在任务已经满了的时候无法在添加任务了,进行挂起,等待队列不为满)

    1. // 消费者从线程池当中获取任务
    2. public T take(){
    3. T t = null;
    4. lock.lock();
    5. try {
    6. while(deque.size() == 0){
    7. emptyWaitSet.await();
    8. }
    9. t = deque.removeFirst();
    10. fullWaitSet.signal();
    11. }catch (Exception e){
    12. e.printStackTrace();
    13. }finally {
    14. lock.unlock();
    15. }
    16. return t;
    17. }

    我们上边的代码展示的队列的存取的过程都是死等状态,什么是死等状态?就是任务添加不进去或者取不出来的时候,线程会被一直挂起。真实并不是如此,这里只是简单的展示。

    阻塞队列需要的就是这两个存取的过程。

    2.ThreadPool

    先看看线程池当中的属性。把刚才创建的任务队列加进去,因为线程池要时常和任务队列沟通。然后创建了一个HashSet结构用于存储我们的线程。下边的都是我们线程池需要的一些参数了,拒绝策略在这里没有写。

    1. // 任务队列
    2. private BlockedQueue taskQueue;
    3. // 线程集合
    4. private HashSet workers = new HashSet<>();
    5. //核心线程数
    6. private int coreSize;
    7. // 超时时间
    8. private int timeout;
    9. // 超时单位
    10. private TimeUnit timeUnit;

    来看看我们的线程池是如何工作的吧,可以看到我们线程池保存的是Worker对象,我们来看看这个Worker对象是干啥的。这个Worker对象实现了Runnable接口,我们可以把这个类当作线程类,这个类中有一个task属性,因为我们线程池当中的线程是要获取任务执行的,这个任务就用这个task属性代表。

    这个Worker类一直在干一件事情,就是不断地从我们的任务队列当中获取任务(Worker类是ThreadPool的内部类),如果获取的任务不为空的话就执行任务,一旦没有任务可以执行那么就把当前的线程从线程池当中移除。

    1. class Worker implements Runnable{
    2. private Runnable task;
    3. public Worker(Runnable task){
    4. this.task = task;
    5. }
    6. @Override
    7. public void run() {
    8. while(task!=null || (task = taskQueue.take())!=null){
    9. System.out.println("取出的任务是"+task);
    10. try {
    11. task.run();
    12. }catch (Exception e){
    13. e.printStackTrace();
    14. }finally {
    15. task = null;
    16. }
    17. synchronized (workers){
    18. workers.remove(this);
    19. }
    20. }
    21. }
    22. }

    那什么时候用到这个Worker类呢?当我们调用ThreadPool中的execute()方法时,线程池中的线程会就调用这个run()方法。

    来看我们的execute()方法。当我们的线程数小于我们的核心线程数的时候,我们可以直接创建一个新的线程,并且把我们的任务直接交给这个核心线程。反之我们不能创建,而是把任务添加到我们的任务队列当中,等待核心线程去执行这个任务。

    1. // 任务执行
    2. public void execute(Runnable task){
    3. synchronized (workers){
    4. if(workers.size() < coreSize){
    5. // 创建核心线程
    6. Worker worker = new Worker(task);
    7. workers.add(worker);
    8. Thread thread = new Thread(worker);
    9. thread.start();
    10. }else {
    11. taskQueue.put(task);
    12. }
    13. }
    14. }

    写完了上边的代码我们测试一下。

    1. public static void main(String[] args) {
    2. ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.MILLISECONDS,10);
    3. for(int i = 0;i<12;i++){
    4. int j = i;
    5. threadPool.execute(()->{
    6. System.out.println("当前线程"+Thread.currentThread().getName()+"task "+j+" is running");
    7. try {
    8. Thread.currentThread().sleep(1000);
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. }
    12. });
    13. }
    14. }

    方法运行了之后,即使任务全部执行,线程也不会结束。这是因为我们的worker类中的run方法调用了任务队列的take()方法,而take方法是会一直挂起的。

    我们现在换一种带超时获取,在规定时间内获取不到任务就自动结束任务。这时候就用到我们传入的时间参数了,我们不再调用await()方法了,而是调用awaitNanos()方法,方法可以接收一个时间参数,这个方法可以消耗我们的nanos时间,在这个时间内如果获取不到的话线程就不在挂起了,这时还会进入到我们的while循环当中,判断我们的nanos是不是被消耗完了,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。

    1. // 带超时获取
    2. public T poll(int timeout,TimeUnit timeUnit){
    3. T t = null;
    4. lock.lock();
    5. try {
    6. long nanos = timeUnit.toNanos(timeout);
    7. while(deque.size() == 0){
    8. if(nanos <= 0){
    9. return null;
    10. }
    11. nanos = emptyWaitSet.awaitNanos(nanos);
    12. }
    13. t = deque.removeFirst();
    14. fullWaitSet.signal();
    15. }catch (Exception e){
    16. e.printStackTrace();
    17. }finally {
    18. lock.unlock();
    19. }
    20. return t;
    21. }

     修改Worker类。

    1. class Worker implements Runnable{
    2. private Runnable task;
    3. public Worker(Runnable task){
    4. this.task = task;
    5. }
    6. @Override
    7. public void run() {
    8. while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){
    9. System.out.println("取出的任务是"+task);
    10. try {
    11. task.run();
    12. }catch (Exception e){
    13. e.printStackTrace();
    14. }finally {
    15. task = null;
    16. }
    17. synchronized (workers){
    18. workers.remove(this);
    19. }
    20. }
    21. }
    22. }

    现在就可以正常结束了。

    四、拒绝策略

    全部代码如下。要使用拒绝策略,我们定义一个函数式接口,同时写一个参数传给线程池,参数的具体内容就是拒绝策略的拒绝方法,是我们自己定义的。

    同时我们的execute()方法不在使用put来添加任务了,而是使用tryPut,如果大家对这一块感兴趣的话,可以在bilibili上观看黑马程序员的课程学习一下。

    1. /**
    2. * 自定义线程池
    3. */
    4. public class TestPool {
    5. public static void main(String[] args) {
    6. ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.SECONDS,10,((queue, task) -> {queue.put(task);}));
    7. for(int i = 0;i<12;i++){
    8. int j = i;
    9. threadPool.execute(()->{
    10. System.out.println("当前线程"+Thread.currentThread().getName()+"task "+j+" is running");
    11. try {
    12. Thread.currentThread().sleep(1000);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. });
    17. }
    18. }
    19. }
    20. /**
    21. * 拒绝策略
    22. */
    23. @FunctionalInterface
    24. interface RejectPolicy{
    25. void reject(BlockedQueue queue,T task);
    26. }
    27. /**
    28. * 阻塞队列
    29. */
    30. class BlockedQueue {
    31. //1.阻塞队列
    32. private Deque deque = new ArrayDeque<>();
    33. //2.实现阻塞的锁
    34. private ReentrantLock lock = new ReentrantLock();
    35. //3. 生产者等待条件
    36. private Condition fullWaitSet = lock.newCondition();
    37. //4.消费者等待条件
    38. private Condition emptyWaitSet = lock.newCondition();
    39. //5.阻塞队列的大小
    40. private int CAPACITY;
    41. public BlockedQueue(int queueCapacity) {
    42. this.CAPACITY = queueCapacity;
    43. }
    44. // 生产者放入数据
    45. public void put(T t) {
    46. lock.lock();
    47. try {
    48. while (deque.size() == CAPACITY) {
    49. fullWaitSet.await();
    50. }
    51. deque.addLast(t);
    52. emptyWaitSet.signal();
    53. } catch (InterruptedException e) {
    54. e.printStackTrace();
    55. } finally {
    56. lock.unlock();
    57. }
    58. }
    59. // 带超时添加
    60. public boolean offer(T t,int timeout,TimeUnit timeUnit) {
    61. lock.lock();
    62. long nanos = timeUnit.toNanos(timeout);
    63. try {
    64. while (deque.size() == CAPACITY) {
    65. if(nanos <= 0){
    66. return false;
    67. }
    68. nanos = fullWaitSet.awaitNanos(nanos);
    69. }
    70. deque.addLast(t);
    71. emptyWaitSet.signal();
    72. return true;
    73. } catch (InterruptedException e) {
    74. e.printStackTrace();
    75. } finally {
    76. lock.unlock();
    77. }
    78. return true;
    79. }
    80. // 带超时获取
    81. public T poll(int timeout,TimeUnit timeUnit){
    82. T t = null;
    83. lock.lock();
    84. try {
    85. long nanos = timeUnit.toNanos(timeout);
    86. while(deque.size() == 0){
    87. if(nanos <= 0){
    88. return null;
    89. }
    90. nanos = emptyWaitSet.awaitNanos(nanos);
    91. }
    92. t = deque.removeFirst();
    93. fullWaitSet.signal();
    94. }catch (Exception e){
    95. e.printStackTrace();
    96. }finally {
    97. lock.unlock();
    98. }
    99. return t;
    100. }
    101. // 消费者从线程池当中获取任务
    102. public T take(){
    103. T t = null;
    104. lock.lock();
    105. try {
    106. while(deque.size() == 0){
    107. emptyWaitSet.await();
    108. }
    109. t = deque.removeFirst();
    110. fullWaitSet.signal();
    111. }catch (Exception e){
    112. e.printStackTrace();
    113. }finally {
    114. lock.unlock();
    115. }
    116. return t;
    117. }
    118. public void tryPut(RejectPolicy rejectPolicy, T task) {
    119. lock.lock();
    120. try {
    121. if(deque.size()==CAPACITY){
    122. rejectPolicy.reject(this,task);
    123. }else{
    124. deque.addLast(task);
    125. emptyWaitSet.signal();
    126. }
    127. } finally {
    128. lock.unlock();
    129. }
    130. }
    131. }
    132. /**
    133. * 线程池
    134. */
    135. class ThreadPool{
    136. // 任务队列
    137. private BlockedQueue taskQueue;
    138. // 线程集合
    139. private HashSet workers = new HashSet<>();
    140. //核心线程数
    141. private int coreSize;
    142. // 超时时间
    143. private int timeout;
    144. // 超时单位
    145. private TimeUnit timeUnit;
    146. //拒绝策略
    147. private RejectPolicy rejectPolicy;
    148. // 任务执行
    149. public void execute(Runnable task){
    150. synchronized (workers){
    151. if(workers.size() < coreSize){
    152. // 创建核心线程
    153. Worker worker = new Worker(task);
    154. workers.add(worker);
    155. Thread thread = new Thread(worker);
    156. thread.start();
    157. }else {
    158. // 任务队列
    159. //taskQueue.offer(task,timeout,timeUnit);
    160. taskQueue.tryPut(rejectPolicy,task);
    161. //taskQueue.put(task);
    162. }
    163. }
    164. }
    165. public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy rejectPolicy){
    166. this.coreSize = coreSize;
    167. this.timeout = timeout;
    168. this.timeUnit = timeUnit;
    169. this.taskQueue = new BlockedQueue<>(queueCapacity);
    170. this.rejectPolicy = rejectPolicy;
    171. }
    172. class Worker implements Runnable{
    173. private Runnable task;
    174. public Worker(Runnable task){
    175. this.task = task;
    176. }
    177. @Override
    178. public void run() {
    179. while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){
    180. System.out.println("取出的任务是"+task);
    181. try {
    182. task.run();
    183. }catch (Exception e){
    184. e.printStackTrace();
    185. }finally {
    186. task = null;
    187. }
    188. synchronized (workers){
    189. workers.remove(this);
    190. }
    191. }
    192. }
    193. }
    194. }

    这个代码我自己觉得是有些问题,因为如果我的任务队列大小有10的时候,我给出了13个任务,两个交给核心线程不占任务队列大小,另外10个任务正好占满,剩下一个放不进去,这时就会卡住不输出。---------未解决

  • 相关阅读:
    macos 不支持svn安装
    简单聊聊 分布式
    PostgreSQL文本搜索(七)——自定义配置
    Mathorcup数学建模竞赛第三届-【妈妈杯】C题:语音识别技术的应用(附带赛题解析&获奖论文&MATLAB代码)(一)
    EMIF 接口
    Flutter 文件读写-path_provider
    K-优字符串(冬季每日一题 11)
    【重学C++】04 | 说透C++右值引用、移动语义、完美转发(上)
    人工智能-线性回归1--损失函数、正规方程、梯度下降法
    讯飞AI算法挑战大赛-校招简历信息完整性检测挑战赛-三等奖方案
  • 原文地址:https://blog.csdn.net/qq_61024956/article/details/138031570