• Java并发-实现带拒绝策略的线程池


    目录

    1 前言

    2 线程池

     2.1 线程池的设计

    2.2 阻塞队列

    2.3 线程池实现

    2.4 拒绝策略

    2.5 完整代码


    1 前言

    Java多线程的学习是绕不开线程池的,线程池可以帮助我们管理线程,处理任务。在学习多线程时我已经实现过一个简单的线程池《Java并发-线程池的实现》这个这是简单的阻塞等待。

    2 线程池

    如下图所示:线程池最根本的应该是生产者消费者模式,生产者向线程池中提供任务,线程池中的线程进行任务消费。

     2.1 线程池的设计

    在本节我们将设计一个线程池

    • 阻塞队列
    • 线程池
    • 拒绝策略
      • 阻塞等待
      • 超时等待
      • 放弃任务
      • 报错
      • 调用者线程自己执行

    2.2 阻塞队列

    属性

    • 队列容量
    • 真实队列
    • 锁ReentrantLock
    • 生产者的Condition
    • 消费者的Condition

    take方法

    • 阻塞实现
    • 超时等待实现:是对阻塞等待的改进,可以在没有任务的时候将Worker移除,防止线程一直阻塞,有任务时候在进行创建。

    put方法

    • 阻塞实现
    • 超时等待实现

    tryPut方法:用于拒绝策略

    1. {
    2. // 1. 队列的长度
    3. private int capacity;
    4. // 2.任务队列
    5. private volatile Deque queue = new ArrayDeque<>();
    6. // 3. 锁
    7. private ReentrantLock lock = new ReentrantLock();
    8. // 4.阻塞条件
    9. // 4.1 队列为满时,生产者进入
    10. private Condition fullWaitSet = lock.newCondition();
    11. // 4.1 队列为空时,消费者进入
    12. private Condition emptyWaitSet = lock.newCondition();
    13. public BlockQueue(int capacity) {
    14. this.capacity = capacity;
    15. }
    16. // 阻塞获取
    17. public T take() {
    18. lock.lock(); // 每次只能有一个线程可以消费
    19. try {
    20. while (queue.isEmpty()) {
    21. try {
    22. emptyWaitSet.await();
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. T t = queue.removeFirst();
    28. fullWaitSet.signal();
    29. return t;
    30. } finally {
    31. lock.unlock();
    32. }
    33. }
    34. // 超时时间的获取
    35. public T take(long time, TimeUnit timeUnit) {
    36. lock.lock(); // 每次只能有一个线程可以消费
    37. try {
    38. long nanos = timeUnit.toNanos(time);
    39. while (queue.isEmpty()) {
    40. try {
    41. if (nanos <= 0) {
    42. // 等待超时---结束循环
    43. return null;
    44. }
    45. nanos = emptyWaitSet.awaitNanos(nanos);
    46. } catch (InterruptedException e) {
    47. e.printStackTrace();
    48. }
    49. }
    50. T t = queue.removeFirst();
    51. fullWaitSet.signal();
    52. return t;
    53. } finally {
    54. lock.unlock();
    55. }
    56. }
    57. // 阻塞 添加
    58. public void put(T task) {
    59. lock.lock();
    60. try {
    61. while (queue.size() == capacity) {
    62. try {
    63. System.out.println("等待加入任务对列" + task);
    64. fullWaitSet.await();
    65. } catch (InterruptedException e) {
    66. e.printStackTrace();
    67. }
    68. }
    69. queue.addLast(task);
    70. System.out.println("加入任务队列" + task);
    71. emptyWaitSet.signal();
    72. } finally {
    73. lock.unlock();
    74. }
    75. }
    76. // 超时 添加
    77. public boolean put(T task, long time, TimeUnit unit) {
    78. lock.lock();
    79. try {
    80. long nanos = unit.toNanos(time);
    81. while (queue.size() == capacity) {
    82. try {
    83. if (nanos <= 0) {
    84. return false;
    85. }
    86. System.out.println("等待加入任务队列 ..." + task);
    87. nanos = fullWaitSet.awaitNanos(nanos);
    88. } catch (InterruptedException e) {
    89. e.printStackTrace();
    90. }
    91. }
    92. queue.addLast(task);
    93. System.out.println("加入任务队列 ..." + task);
    94. emptyWaitSet.signal();
    95. return true;
    96. } finally {
    97. lock.unlock();
    98. }
    99. }
    100. // 获取任务的大小。
    101. public int size() {
    102. lock.lock();
    103. try {
    104. return queue.size();
    105. } finally {
    106. lock.unlock();
    107. }
    108. }
    109. public void tryput(RejectPolicy rejectPolicy, T task) {
    110. lock.lock();
    111. try {
    112. // 1. 队列是否满了
    113. if (queue.size() == capacity) {
    114. // 2. 实行拒绝策略
    115. rejectPolicy.reject(this,task);
    116. }else {
    117. // 有空闲
    118. queue.addLast(task);
    119. System.out.println("加入任务队列 ..." + task);
    120. emptyWaitSet.signal();
    121. }
    122. }finally {
    123. lock.unlock();
    124. }
    125. }
    126. }

    2.3 线程池实现

    属性

    • 阻塞队列
    • 超时时间
    • 时间单位
    • 工作线程集合
    • 拒绝策略
    • 工作线程
    1. class ThreadPool {
    2. private final BlockQueue taskQueue;
    3. private final int coreSize;
    4. private long time;
    5. private TimeUnit timeUnit;
    6. private final HashSet workers = new HashSet<>();
    7. private RejectPolicy rejectPolicy;
    8. public ThreadPool(int coreSize, long time, TimeUnit timeUnit, int queueCapacity ,RejectPolicy rejectPolicy) {
    9. this.coreSize = coreSize;
    10. this.time = time;
    11. this.timeUnit = timeUnit;
    12. this.taskQueue = new BlockQueue<>(queueCapacity);
    13. this.rejectPolicy = rejectPolicy;
    14. }
    15. public void execute(Runnable task) {
    16. // 当任务数没有超过核心线程数时直接执行。
    17. // 当任务数超过核心线程数时候,选择拒绝策略进行执行
    18. synchronized (workers) {
    19. if (workers.size() < coreSize) {
    20. Worker worker = new Worker(task);
    21. System.out.println("新增 work" + worker);
    22. workers.add(worker);
    23. worker.start();
    24. } else {
    25. // 拒绝策略
    26. taskQueue.tryput(rejectPolicy,task);
    27. }
    28. }
    29. }
    30. // 真正工作的线程
    31. class Worker extends Thread {
    32. private Runnable task;
    33. public Worker(Runnable task) {
    34. this.task = task;
    35. }
    36. @Override
    37. public void run() {
    38. // 当task不为空,执行任务
    39. // 当task执行完毕,再接着任务队列获取任务并执行---take会一直等待有没有新的任务。因此在没有任务时是阻塞的
    40. while (task != null || (task = taskQueue.take(time, timeUnit)) != null) {
    41. try {
    42. System.out.println("正在执行..." + task);
    43. task.run();
    44. } catch (Exception e) {
    45. e.printStackTrace();
    46. } finally {
    47. task = null;
    48. }
    49. }
    50. synchronized (workers) {
    51. System.out.println("当前worker使用完 被移除 " + this);
    52. workers.remove(this);
    53. }
    54. }
    55. @Override
    56. public boolean equals(Object o) {
    57. if (this == o) {
    58. return true;
    59. }
    60. if (o == null || getClass() != o.getClass()) {
    61. return false;
    62. }
    63. Worker worker = (Worker) o;
    64. return task != null ? task.equals(worker.task) : worker.task == null;
    65. }
    66. @Override
    67. public int hashCode() {
    68. return task != null ? task.hashCode() : 0;
    69. }
    70. }
    71. public int size() {
    72. return taskQueue.size();
    73. }
    74. }

    2.4 拒绝策略

    拒绝策略是一个接口,作为线程池的一个参数,在线程初始化时候确定。

    1. // 拒绝策略
    2. @FunctionalInterface
    3. interface RejectPolicy {
    4. void reject(BlockQueue queue, T task);
    5. }

    2.5 完整代码

    1. package cn.itcast.n6.c5;
    2. import java.util.ArrayDeque;
    3. import java.util.Deque;
    4. import java.util.HashSet;
    5. import java.util.concurrent.TimeUnit;
    6. import java.util.concurrent.locks.Condition;
    7. import java.util.concurrent.locks.ReentrantLock;
    8. /**
    9. * @author : msf
    10. * @date : 2022/12/5
    11. */
    12. public class MyThreadPool {
    13. public static void main(String[] args) {
    14. ThreadPool execuror = new ThreadPool(
    15. 1, 1, TimeUnit.SECONDS, 1,
    16. (queue, task) -> {
    17. // 1) 死等
    18. //queue.put(task);
    19. // 2) 带超时等待
    20. //queue.put(task,1500,TimeUnit.MILLISECONDS);
    21. // 3) 让调用者放弃任务执行 --- 什么都不写。任务没有加在队列里面,也没有尝试等待
    22. // 4) 让调用者抛出异常---剩余的任务都不在执行了。
    23. //throw new RuntimeException("任务执行失败" + task);
    24. // 5) 让调用者自己执行任
    25. task.run();
    26. });
    27. for (int i = 0; i < 3; i++) {
    28. int j = i;
    29. execuror.execute(() -> {
    30. try {
    31. Thread.sleep(500);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. System.out.println(Thread.currentThread().getName() + "执行了" + j);
    36. // System.out.println(execuror.size());
    37. });
    38. }
    39. }
    40. }
    41. // 拒绝策略
    42. @FunctionalInterface
    43. interface RejectPolicy {
    44. void reject(BlockQueue queue, T task);
    45. }
    46. class ThreadPool {
    47. private final BlockQueue taskQueue;
    48. private final int coreSize;
    49. private long time;
    50. private TimeUnit timeUnit;
    51. private final HashSet workers = new HashSet<>();
    52. private RejectPolicy rejectPolicy;
    53. public ThreadPool(int coreSize, long time, TimeUnit timeUnit, int queueCapacity ,RejectPolicy rejectPolicy) {
    54. this.coreSize = coreSize;
    55. this.time = time;
    56. this.timeUnit = timeUnit;
    57. this.taskQueue = new BlockQueue<>(queueCapacity);
    58. this.rejectPolicy = rejectPolicy;
    59. }
    60. public void execute(Runnable task) {
    61. // 当任务数没有超过核心线程数时直接执行。
    62. // 当任务数超过核心线程数时候,选择拒绝策略进行执行
    63. synchronized (workers) {
    64. if (workers.size() < coreSize) {
    65. Worker worker = new Worker(task);
    66. System.out.println("新增 work" + worker);
    67. workers.add(worker);
    68. worker.start();
    69. } else {
    70. // 拒绝策略
    71. taskQueue.tryput(rejectPolicy,task);
    72. }
    73. }
    74. }
    75. // 真正工作的线程
    76. class Worker extends Thread {
    77. private Runnable task;
    78. public Worker(Runnable task) {
    79. this.task = task;
    80. }
    81. @Override
    82. public void run() {
    83. // 当task不为空,执行任务
    84. // 当task执行完毕,再接着任务队列获取任务并执行---take会一直等待有没有新的任务。因此在没有任务时是阻塞的
    85. while (task != null || (task = taskQueue.take(time, timeUnit)) != null) {
    86. try {
    87. System.out.println("正在执行..." + task);
    88. task.run();
    89. } catch (Exception e) {
    90. e.printStackTrace();
    91. } finally {
    92. task = null;
    93. }
    94. }
    95. synchronized (workers) {
    96. System.out.println("当前worker使用完 被移除 " + this);
    97. workers.remove(this);
    98. }
    99. }
    100. @Override
    101. public boolean equals(Object o) {
    102. if (this == o) {
    103. return true;
    104. }
    105. if (o == null || getClass() != o.getClass()) {
    106. return false;
    107. }
    108. Worker worker = (Worker) o;
    109. return task != null ? task.equals(worker.task) : worker.task == null;
    110. }
    111. @Override
    112. public int hashCode() {
    113. return task != null ? task.hashCode() : 0;
    114. }
    115. }
    116. public int size() {
    117. return taskQueue.size();
    118. }
    119. }
    120. class BlockQueue {
    121. // 1. 队列的长度
    122. private int capacity;
    123. // 2.任务队列
    124. private volatile Deque queue = new ArrayDeque<>();
    125. // 3. 锁
    126. private ReentrantLock lock = new ReentrantLock();
    127. // 4.阻塞条件
    128. // 4.1 队列为满时,生产者进入
    129. private Condition fullWaitSet = lock.newCondition();
    130. // 4.1 队列为空时,消费者进入
    131. private Condition emptyWaitSet = lock.newCondition();
    132. public BlockQueue(int capacity) {
    133. this.capacity = capacity;
    134. }
    135. // 阻塞获取
    136. public T take() {
    137. lock.lock(); // 每次只能有一个线程可以消费
    138. try {
    139. while (queue.isEmpty()) {
    140. try {
    141. emptyWaitSet.await();
    142. } catch (InterruptedException e) {
    143. e.printStackTrace();
    144. }
    145. }
    146. T t = queue.removeFirst();
    147. fullWaitSet.signal();
    148. return t;
    149. } finally {
    150. lock.unlock();
    151. }
    152. }
    153. // 超时时间的获取
    154. public T take(long time, TimeUnit timeUnit) {
    155. lock.lock(); // 每次只能有一个线程可以消费
    156. try {
    157. long nanos = timeUnit.toNanos(time);
    158. while (queue.isEmpty()) {
    159. try {
    160. if (nanos <= 0) {
    161. // 等待超时---结束循环
    162. return null;
    163. }
    164. nanos = emptyWaitSet.awaitNanos(nanos);
    165. } catch (InterruptedException e) {
    166. e.printStackTrace();
    167. }
    168. }
    169. T t = queue.removeFirst();
    170. fullWaitSet.signal();
    171. return t;
    172. } finally {
    173. lock.unlock();
    174. }
    175. }
    176. // 阻塞 添加
    177. public void put(T task) {
    178. lock.lock();
    179. try {
    180. while (queue.size() == capacity) {
    181. try {
    182. System.out.println("等待加入任务对列" + task);
    183. fullWaitSet.await();
    184. } catch (InterruptedException e) {
    185. e.printStackTrace();
    186. }
    187. }
    188. queue.addLast(task);
    189. System.out.println("加入任务队列" + task);
    190. emptyWaitSet.signal();
    191. } finally {
    192. lock.unlock();
    193. }
    194. }
    195. // 超时 添加
    196. public boolean put(T task, long time, TimeUnit unit) {
    197. lock.lock();
    198. try {
    199. long nanos = unit.toNanos(time);
    200. while (queue.size() == capacity) {
    201. try {
    202. if (nanos <= 0) {
    203. return false;
    204. }
    205. System.out.println("等待加入任务队列 ..." + task);
    206. nanos = fullWaitSet.awaitNanos(nanos);
    207. } catch (InterruptedException e) {
    208. e.printStackTrace();
    209. }
    210. }
    211. queue.addLast(task);
    212. System.out.println("加入任务队列 ..." + task);
    213. emptyWaitSet.signal();
    214. return true;
    215. } finally {
    216. lock.unlock();
    217. }
    218. }
    219. // 获取任务的大小。
    220. public int size() {
    221. lock.lock();
    222. try {
    223. return queue.size();
    224. } finally {
    225. lock.unlock();
    226. }
    227. }
    228. public void tryput(RejectPolicy rejectPolicy, T task) {
    229. lock.lock();
    230. try {
    231. // 1. 队列是否满了
    232. if (queue.size() == capacity) {
    233. // 2. 实行拒绝策略
    234. rejectPolicy.reject(this,task);
    235. }else {
    236. // 有空闲
    237. queue.addLast(task);
    238. System.out.println("加入任务队列 ..." + task);
    239. emptyWaitSet.signal();
    240. }
    241. }finally {
    242. lock.unlock();
    243. }
    244. }
    245. }

  • 相关阅读:
    tensorRT是怎么构建和编译一个模型的
    【Linux】冯诺依曼体系结构
    二叉树层级遍历(深度优先、广度优先算法)
    IDEA中Tomcat启动后提交表单,请求的资源[/servlet_demo2/book-add.html]不可用
    docker部署nacos集群
    xss跨站脚本攻击姿势大全
    第六篇Android--ImageView、Bitmap
    Dubbo和Spring Cloud相关
    【腾讯云 Cloud Studio 实战训练营】使用云IDEA,快速构建React完成点餐H5页面
    50个常用的Numpy函数解释,参数和使用示例
  • 原文地址:https://blog.csdn.net/abc123mma/article/details/128199245