目录
Java多线程的学习是绕不开线程池的,线程池可以帮助我们管理线程,处理任务。在学习多线程时我已经实现过一个简单的线程池《Java并发-线程池的实现》这个这是简单的阻塞等待。
如下图所示:线程池最根本的应该是生产者消费者模式,生产者向线程池中提供任务,线程池中的线程进行任务消费。
在本节我们将设计一个线程池
属性
take方法
put方法
tryPut方法:用于拒绝策略
- {
- // 1. 队列的长度
- private int capacity;
-
- // 2.任务队列
- private volatile Deque
queue = new ArrayDeque<>(); -
- // 3. 锁
- private ReentrantLock lock = new ReentrantLock();
-
- // 4.阻塞条件
- // 4.1 队列为满时,生产者进入
- private Condition fullWaitSet = lock.newCondition();
- // 4.1 队列为空时,消费者进入
- private Condition emptyWaitSet = lock.newCondition();
-
- public BlockQueue(int capacity) {
- this.capacity = capacity;
- }
-
- // 阻塞获取
- public T take() {
- lock.lock(); // 每次只能有一个线程可以消费
- try {
- while (queue.isEmpty()) {
- try {
- emptyWaitSet.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- T t = queue.removeFirst();
- fullWaitSet.signal();
- return t;
- } finally {
- lock.unlock();
- }
- }
-
- // 超时时间的获取
- public T take(long time, TimeUnit timeUnit) {
- lock.lock(); // 每次只能有一个线程可以消费
- try {
- long nanos = timeUnit.toNanos(time);
- while (queue.isEmpty()) {
- try {
- if (nanos <= 0) {
- // 等待超时---结束循环
- return null;
- }
- nanos = emptyWaitSet.awaitNanos(nanos);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- T t = queue.removeFirst();
- fullWaitSet.signal();
- return t;
- } finally {
- lock.unlock();
- }
- }
-
- // 阻塞 添加
- public void put(T task) {
- lock.lock();
- try {
- while (queue.size() == capacity) {
- try {
- System.out.println("等待加入任务对列" + task);
- fullWaitSet.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- queue.addLast(task);
- System.out.println("加入任务队列" + task);
- emptyWaitSet.signal();
- } finally {
- lock.unlock();
- }
- }
-
- // 超时 添加
- public boolean put(T task, long time, TimeUnit unit) {
- lock.lock();
- try {
- long nanos = unit.toNanos(time);
- while (queue.size() == capacity) {
- try {
- if (nanos <= 0) {
- return false;
- }
- System.out.println("等待加入任务队列 ..." + task);
- nanos = fullWaitSet.awaitNanos(nanos);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- queue.addLast(task);
- System.out.println("加入任务队列 ..." + task);
- emptyWaitSet.signal();
- return true;
- } finally {
- lock.unlock();
- }
- }
-
- // 获取任务的大小。
- public int size() {
- lock.lock();
- try {
- return queue.size();
- } finally {
- lock.unlock();
- }
- }
-
- public void tryput(RejectPolicy
rejectPolicy, T task) { - lock.lock();
- try {
- // 1. 队列是否满了
- if (queue.size() == capacity) {
- // 2. 实行拒绝策略
- rejectPolicy.reject(this,task);
- }else {
- // 有空闲
- queue.addLast(task);
- System.out.println("加入任务队列 ..." + task);
- emptyWaitSet.signal();
- }
- }finally {
- lock.unlock();
- }
- }
- }
属性
- class ThreadPool {
- private final BlockQueue
taskQueue; -
- private final int coreSize;
-
- private long time;
-
- private TimeUnit timeUnit;
-
- private final HashSet
workers = new HashSet<>(); -
- private RejectPolicy
rejectPolicy; -
- public ThreadPool(int coreSize, long time, TimeUnit timeUnit, int queueCapacity ,RejectPolicy
rejectPolicy) { - this.coreSize = coreSize;
- this.time = time;
- this.timeUnit = timeUnit;
- this.taskQueue = new BlockQueue<>(queueCapacity);
- this.rejectPolicy = rejectPolicy;
- }
-
- public void execute(Runnable task) {
- // 当任务数没有超过核心线程数时直接执行。
- // 当任务数超过核心线程数时候,选择拒绝策略进行执行
- synchronized (workers) {
- if (workers.size() < coreSize) {
- Worker worker = new Worker(task);
- System.out.println("新增 work" + worker);
- workers.add(worker);
- worker.start();
- } else {
- // 拒绝策略
- taskQueue.tryput(rejectPolicy,task);
-
- }
- }
- }
-
- // 真正工作的线程
- class Worker extends Thread {
- private Runnable task;
-
- public Worker(Runnable task) {
- this.task = task;
- }
-
- @Override
- public void run() {
- // 当task不为空,执行任务
- // 当task执行完毕,再接着任务队列获取任务并执行---take会一直等待有没有新的任务。因此在没有任务时是阻塞的
- while (task != null || (task = taskQueue.take(time, timeUnit)) != null) {
- try {
- System.out.println("正在执行..." + task);
- task.run();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- task = null;
- }
- }
- synchronized (workers) {
- System.out.println("当前worker使用完 被移除 " + this);
- workers.remove(this);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Worker worker = (Worker) o;
-
- return task != null ? task.equals(worker.task) : worker.task == null;
- }
-
- @Override
- public int hashCode() {
- return task != null ? task.hashCode() : 0;
- }
- }
-
- public int size() {
- return taskQueue.size();
- }
- }
拒绝策略是一个接口,作为线程池的一个参数,在线程初始化时候确定。
- // 拒绝策略
- @FunctionalInterface
- interface RejectPolicy
{ - void reject(BlockQueue
queue, T task) ; - }
- package cn.itcast.n6.c5;
-
- import java.util.ArrayDeque;
- import java.util.Deque;
- import java.util.HashSet;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
-
- /**
- * @author : msf
- * @date : 2022/12/5
- */
- public class MyThreadPool {
-
- public static void main(String[] args) {
- ThreadPool execuror = new ThreadPool(
- 1, 1, TimeUnit.SECONDS, 1,
- (queue, task) -> {
- // 1) 死等
- //queue.put(task);
- // 2) 带超时等待
- //queue.put(task,1500,TimeUnit.MILLISECONDS);
- // 3) 让调用者放弃任务执行 --- 什么都不写。任务没有加在队列里面,也没有尝试等待
- // 4) 让调用者抛出异常---剩余的任务都不在执行了。
- //throw new RuntimeException("任务执行失败" + task);
- // 5) 让调用者自己执行任
- task.run();
- });
-
-
- for (int i = 0; i < 3; i++) {
- int j = i;
- execuror.execute(() -> {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "执行了" + j);
- // System.out.println(execuror.size());
- });
- }
- }
- }
-
- // 拒绝策略
- @FunctionalInterface
- interface RejectPolicy
{ - void reject(BlockQueue
queue, T task) ; - }
-
-
- class ThreadPool {
- private final BlockQueue
taskQueue; -
- private final int coreSize;
-
- private long time;
-
- private TimeUnit timeUnit;
-
- private final HashSet
workers = new HashSet<>(); -
- private RejectPolicy
rejectPolicy; -
- public ThreadPool(int coreSize, long time, TimeUnit timeUnit, int queueCapacity ,RejectPolicy
rejectPolicy) { - this.coreSize = coreSize;
- this.time = time;
- this.timeUnit = timeUnit;
- this.taskQueue = new BlockQueue<>(queueCapacity);
- this.rejectPolicy = rejectPolicy;
- }
-
- public void execute(Runnable task) {
- // 当任务数没有超过核心线程数时直接执行。
- // 当任务数超过核心线程数时候,选择拒绝策略进行执行
- synchronized (workers) {
- if (workers.size() < coreSize) {
- Worker worker = new Worker(task);
- System.out.println("新增 work" + worker);
- workers.add(worker);
- worker.start();
- } else {
- // 拒绝策略
- taskQueue.tryput(rejectPolicy,task);
-
- }
- }
- }
-
- // 真正工作的线程
- class Worker extends Thread {
- private Runnable task;
-
- public Worker(Runnable task) {
- this.task = task;
- }
-
- @Override
- public void run() {
- // 当task不为空,执行任务
- // 当task执行完毕,再接着任务队列获取任务并执行---take会一直等待有没有新的任务。因此在没有任务时是阻塞的
- while (task != null || (task = taskQueue.take(time, timeUnit)) != null) {
- try {
- System.out.println("正在执行..." + task);
- task.run();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- task = null;
- }
- }
- synchronized (workers) {
- System.out.println("当前worker使用完 被移除 " + this);
- workers.remove(this);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Worker worker = (Worker) o;
-
- return task != null ? task.equals(worker.task) : worker.task == null;
- }
-
- @Override
- public int hashCode() {
- return task != null ? task.hashCode() : 0;
- }
- }
-
- public int size() {
- return taskQueue.size();
- }
- }
-
- class BlockQueue
{ - // 1. 队列的长度
- private int capacity;
-
- // 2.任务队列
- private volatile Deque
queue = new ArrayDeque<>(); -
- // 3. 锁
- private ReentrantLock lock = new ReentrantLock();
-
- // 4.阻塞条件
- // 4.1 队列为满时,生产者进入
- private Condition fullWaitSet = lock.newCondition();
- // 4.1 队列为空时,消费者进入
- private Condition emptyWaitSet = lock.newCondition();
-
- public BlockQueue(int capacity) {
- this.capacity = capacity;
- }
-
- // 阻塞获取
- public T take() {
- lock.lock(); // 每次只能有一个线程可以消费
- try {
- while (queue.isEmpty()) {
- try {
- emptyWaitSet.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- T t = queue.removeFirst();
- fullWaitSet.signal();
- return t;
- } finally {
- lock.unlock();
- }
- }
-
- // 超时时间的获取
- public T take(long time, TimeUnit timeUnit) {
- lock.lock(); // 每次只能有一个线程可以消费
- try {
- long nanos = timeUnit.toNanos(time);
- while (queue.isEmpty()) {
- try {
- if (nanos <= 0) {
- // 等待超时---结束循环
- return null;
- }
- nanos = emptyWaitSet.awaitNanos(nanos);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- T t = queue.removeFirst();
- fullWaitSet.signal();
- return t;
- } finally {
- lock.unlock();
- }
- }
-
- // 阻塞 添加
- public void put(T task) {
- lock.lock();
- try {
- while (queue.size() == capacity) {
- try {
- System.out.println("等待加入任务对列" + task);
- fullWaitSet.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- queue.addLast(task);
- System.out.println("加入任务队列" + task);
- emptyWaitSet.signal();
- } finally {
- lock.unlock();
- }
- }
-
- // 超时 添加
- public boolean put(T task, long time, TimeUnit unit) {
- lock.lock();
- try {
- long nanos = unit.toNanos(time);
- while (queue.size() == capacity) {
- try {
- if (nanos <= 0) {
- return false;
- }
- System.out.println("等待加入任务队列 ..." + task);
- nanos = fullWaitSet.awaitNanos(nanos);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- queue.addLast(task);
- System.out.println("加入任务队列 ..." + task);
- emptyWaitSet.signal();
- return true;
- } finally {
- lock.unlock();
- }
- }
-
- // 获取任务的大小。
- public int size() {
- lock.lock();
- try {
- return queue.size();
- } finally {
- lock.unlock();
- }
- }
-
- public void tryput(RejectPolicy
rejectPolicy, T task) { - lock.lock();
- try {
- // 1. 队列是否满了
- if (queue.size() == capacity) {
- // 2. 实行拒绝策略
- rejectPolicy.reject(this,task);
- }else {
- // 有空闲
- queue.addLast(task);
- System.out.println("加入任务队列 ..." + task);
- emptyWaitSet.signal();
- }
- }finally {
- lock.unlock();
- }
- }
- }