// 创建线程池Demo代码
// 1. 继承Thread类
public class MyThread extends Thread{
@Override
public void run() {
for(int i=0;i<1000;i++){
System.out.println(Thread.currentThread().getName()+"线程"+(i*2/10+23*12-38)+(i*3/10+23*12-38));
}
}
}
// 1.1 继承Thread类-启动线程
MyThread myThread = new MyThread();
myThread.start();
// 2. 实现Runnable接口创建及启动
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println(Thread.currentThread().getName() + "线程=============");
}
}
}).start();
// 线程池实现下文分解
......
为什么不直接创建Thread对象调用start()方法?
因为直接创建Thread对象调用start()方法,在该线程中没有我们想要的逻辑代码执行,所以要继承Thread类重写run()方法。run()方法里面的代码就是该线程要执行的逻辑代码。main方法里面的代码其实就是main线程的执行代码,其他线程需放在run()方法中。
多线程内存图解
// 图解素材代码
public static void main(String[] args) throws InterruptedException {
// 线程0
MyThread myThread = new MyThread();
myThread.start();
// 线程1
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println(Thread.currentThread().getName() + "线程=============");
}
}
}).start();
// main线程
for (int i = 0; i < 1000; i++) {
System.out.println(Thread.currentThread().getName() + "线程");
}
}
| 上面代码片段内存图解: |
|---|
![]() |
线程从创建到消亡经历几种状态的转变 创建(new)、就绪(runable)、运行(running)、阻塞(block)、time waiting、waiting、消亡(dead);
其中block、time wating、 wating不分先后就是在running和dead的中间步骤。
七种状态的转化图示
| 线程状态转化图示: |
|---|
![]() |
对于单核CPU来说(对于多核CPU,此处就理解为一个核),CPU在一个时刻只能运行一个线程,当在运行一个线程的过程中转去运行另外一个线程,这个叫做线程上下文切换(对于进程也是类似)。
由于可能当前线程的任务并没有执行完毕,所以在切换时需要保存线程的运行状态,以便下次重新切换回来时能够继续切换之前的状态运行。举个简单的例子:比如一个线程A正在读取一个文件的内容,正读到文件的一半,此时需要暂停线程A,转去执行线程B,当再次切换回来执行线程A的时候,我们不希望线程A又从文件的开头来读取。
因此需要记录线程A的运行状态,那么会记录哪些数据呢?因为下次恢复时需要知道在这之前当前线程已经执行到哪条指令了,所以需要记录程序计数器的值,另外比如说线程正在进行某个计算的时候被挂起了,那么下次继续执行的时候需要知道之前挂起时变量的值时多少,因此需要记录CPU寄存器的状态。所以一般来说,线程上下文切换过程中会记录程序计数器、CPU寄存器状态等数据。
说简单点的:对于线程的上下文切换实际上就是 存储和恢复CPU状态的过程,它使得线程执行能够从中断点恢复执行。
虽然多线程可以使得任务执行的效率得到提升,但是由于在线程切换时同样会带来一定的开销代价,并且多个线程会导致系统资源占用的增加,所以在进行多线程编程时要注意这些因素。
private volatile String name; :线程的名字private int priority;:线程的优先级(最大值为10,最小值为1,默认值为5)private boolean daemon = false;:是否为守护线程private Runnable target;:要执行的任务start() 方法:
run() 方法:
sleep() 方法:
两个重载方法:
sleep(long millis) //参数为毫秒
sleep(long millis,intnanoseconds) //第一参数为毫秒,第二个参数为纳秒
但是有一点要非常注意,sleep方法不会释放锁,也就是说如果当前线程持有对某个对象的锁,则即使调用sleep方法,其他线程也无法访问这个对象。看下面这个例子就清楚了:
// sleep()方法在休眠的过程中不会释放锁的演示代码
// 线程类
public class MyThread extends Thread{
private Object object = new Object();
@Override
public void run() {
synchronized (MyThread.class){
System.out.println("进入同步块拿到锁"+new Date());
System.out.println(Thread.currentThread().getName()+"进入休眠状态");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 测试方法
public static void main(String[] args) {
MyThread myThread0 = new MyThread();
myThread0.start();
MyThread myThread1 = new MyThread();
myThread1.start();
}
// 结果
进入同步块拿到锁Thu Mar 03 16:04:04 CST 2022
Thread-0进入休眠状态
进入同步块拿到锁Thu Mar 03 16:04:06 CST 2022
Thread-1进入休眠状态
yield() 方法:
join() 方法:
// join()方法代码示例
// 线程逻辑代码
public class MyThread2 extends Thread{
private Object object = new Object();
@Override
public void run() {
try {
System.out.println("当前线程"+Thread.currentThread().getName()+"开始执行");
Thread.sleep(2000);
System.out.println("当前线程"+Thread.currentThread().getName()+"执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// main线程等待其他线程执行完毕
public static void main(String[] args) throws InterruptedException {
System.out.println(Thread.currentThread().getName()+"开始执行");
MyThread2 myThread = new MyThread2();
myThread.start();
System.out.println(Thread.currentThread().getName()+"等待");
myThread.join();
System.out.println(Thread.currentThread().getName()+"继续执行");
}
// 结果
main开始执行
main等待
当前线程Thread-0开始执行
当前线程Thread-0执行完毕
main继续执行
** 结论:可以看出,myThread.join()方法后,main线程会进入等待,然后等待Thread-0执行完之后再继续执行。
interrupt() 方法
阻塞的线程,不能中断正常**执行的// 线程
public class MyThread2 extends Thread{
private Object object = new Object();
@Override
public void run() {
try {
System.out.println("当前线程"+Thread.currentThread().getName()+"开始执行");
System.out.println("当前线程"+Thread.currentThread().getName()+"开始休眠");
Thread.sleep(2000);
System.out.println("当前线程"+Thread.currentThread().getName()+"执行完毕");
} catch (InterruptedException e) {
System.out.println("当前线程"+Thread.currentThread().getName()+"发生异常"+e.getMessage());
e.printStackTrace();
}
}
}
// 测试
public static void main(String[] args) {
System.out.println("主线程开始工作");
MyThread2 myThread2 = new MyThread2();
myThread2.start();
myThread2.interrupt();
}
// 结果
主线程开始工作
当前线程Thread-0开始执行
当前线程Thread-0开始休眠
当前线程Thread-0发生异常sleep interrupted
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.zhw.thread.pool.thread.MyThread2.run(MyThread2.java:15)
总结:通过interrupt方法可以中断处于阻塞状态的线程;补充不能中断非阻塞的线程

线程池的好处:
任务(Runnable 或者 Callable)
任务的执行(Executor)
异步执行结果(Future)
public class MyRunnable implements Runnable{
@Override
public void run() {
// 要执行的任务代码
......
}
}
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
// 要执行的任务代码
......
return "任务执行完成的返回值";
}
}
| Executor类图: |
|---|
![]() |
如上图所示:
包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。
ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
异步执行结果(Future)
Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行,
可以调用execute()方法和 submit()方法执行(调用 submit() 方法时会返回一个 FutureTask 对象)
Executor框架的执行步骤
主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
把创建完成的实现 Runnable/Callable接口的 任务对象直接交给 ExecutorService 执行:
ExecutorService.execute(Runnable command))/ExecutorService.execute(Callable task) )ExecutorService.submit(Runnable task)/ExecutorService.submit(Callable task) )如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象
最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

线程工厂(ThreadFactory)
拒绝策略
Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor第二种方式存在的问题:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
ThreadPoolExecutor 构造函数创建线程池
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), new ThreadPoolExecutor.AbortPolicy());
// 执行线程
for (int i = 0; i < 50; i++) {
// threadPoolExecutor.execute(new MyRunnable("dd"));
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
//终止线程池
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
System.out.println("Finished all threads");
}
线程池原理分析
① ThreadPoolExecutor类重要 属性标识
// 用来标记线程池状态和线程池工作线程个数的int数值
// 32位 前3位标记线程池状态 & 后29位标记线程池工作线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 固定值 29 (方便后面做位运算)
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池中工作线程最大容量 00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态
// runState is stored in the high-order bits
// -1 32个1 ;左移29位 11100000 00000000 00000000 00000000
// 111 RUNNING : 正常接收任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000 SHUTDOWN : 不接受任务、队列和正在执行的继续
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 STOP : 不接受任务、队列和正在执行的中断
private static final int STOP = 1 << COUNT_BITS;
// 010 TIDYING : 过渡状态;清空工作线程,为结束作准备
private static final int TIDYING = 2 << COUNT_BITS;
// 011 TERMINATED : 结束
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 线程池的工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
| 线程池各个状态及它们之间的转换 |
|---|
![]() |
② execute()方法源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 32位ctl值,线程池的状态和正在工作的线程数
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 1.当前正在跑的线程数 < 核心线程数 ,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (addWorker(command, true))
return;
// 可能由于并发第二个if没有进来,重新获取32位ctl值
c = ctl.get();
}
// 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里(核心线程已满,队列未满)
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态,就移除任务并拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程池是Running状态,但是线程池没有正在工作的线程
else if (workerCountOf(recheck) == 0)
// 启动一个工作线程
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
| 启用线程池线程三部曲图示: |
|---|
![]() |
③ execute()方法的执行核心是addWorker(Runnable firstTask, boolean core)方法;这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。
firstTask:需要往线程池里面添加的任务
core:使用的是否是核心线程
// 全局锁,并发操作必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// rs >= SHUTDOWN:线程池非Running状态
// 1. 线程池是stop及后面的状态 2. firstTask不为null 3. 阻塞队列为空 (只要满足任何一个)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程池的工作线程个数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
// 跳出两层循环
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程的状态被其他线程改变了,就再次执行上述操作重新尝试启动线程执行任务
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作线程启动状态
boolean workerStarted = false;
// 工作线程创建状态
boolean workerAdded = false;
Worker w = null;
// 创建并启动线程
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将任务添加带集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 任务创建成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 任务启动成功
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
知识补充:
punlic void template(){
retry
for(;;){
for(;;){
break retry;
}
}
}
Runnable vs CallableRunnable接口自 Java 1.0 以来一直存在,但Callable 接口仅在 Java 1.5 中引入,目的就是为了来处理Runnable`不支持的用例。
Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。
工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。
Runnable:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Callable:
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
execute() vs submit()execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException。
// submit()方法使用示例
public class SubmitMethod {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), new ThreadPoolExecutor.AbortPolicy());
try {
Future<String> future = threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws InterruptedException {
// 休眠5秒
Thread.sleep(1000 * 5l);
return "dd";
}
});
String s1 = null;
// 4秒没结果,抛出异常
s1 = future.get(4, TimeUnit.SECONDS);
System.out.println(s1 + "输出");
threadPoolExecutor.shutdown();
}catch (Exception e){
e.printStackTrace();
}
}
}
结果:
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at com.zhw.thread.pool.SubmitMethod.main(SubmitMethod.java:23)
shutdown()VSshutdownNow()shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。isTerminated() VS isShutdown()isShutDown 当调用 shutdown() 方法后返回为 true。isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true。简介:ScheduledThreadPoolExecutor 主要用来在给定的延迟后运行任务,或者定期执行任务。 这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下它的思想即可。
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
// 固定延迟执行
scheduledThreadPoolExecutor.schedule(() -> {
System.out.println(".....");
}, 2, TimeUnit.SECONDS);
// 固定频率执行
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println(".....");
}, 2, 5, TimeUnit.SECONDS);
......
}
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。
当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。
概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。
任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。 Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
CPU 密集型任务:与I/O 密集型任务
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
调用某个对象的wait()方法能让当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁);
调用某个对象的notify()方法能够唤醒一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程;
调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程;
| 举个例子: |
|---|
| 假如有三个线程Thread1、Thread2和Thread3都在等待对象objectA的monitor,此时Thread4拥有对象objectA的monitor,当在Thread4中调用objectA.notify()方法之后,Thread1、Thread2和Thread3只有一个能被唤醒。注意,被唤醒不等于立刻就获取了objectA的monitor。假若在Thread4中调用objectA.notifyAll()方法,则Thread1、Thread2和Thread3三个线程都会被唤醒,至于哪个线程接下来能够获取到objectA的monitor就具体依赖于操作系统的调度了。尤其要注意一点,一个线程被唤醒不代表立即获取了对象的monitor,只有等调用完notify()或者notifyAll()并退出synchronized块,释放对象锁后,其余线程才可获得锁执行。 |
// 实例代码
// 线程一作为被唤醒的线程
public class MyThread1 extends Thread {
@Override
public void run() {
try {
synchronized (MyThread2.object) {
Thread.currentThread().setName("线程一");
System.out.println(Thread.currentThread().getName()+"等待");
MyThread2.object.wait();
System.out.println(Thread.currentThread().getName()+"被唤醒");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 线程二作为唤醒线程一的线程
public class MyThread2 extends Thread{
static Object object = new Object();
@Override
public void run() {
try {
synchronized (MyThread2.object){
Thread.currentThread().setName("线程二");
System.out.println(Thread.currentThread().getName() + "唤醒等待该对象锁的线程");
object.notify();
System.out.println(Thread.currentThread().getName() + "交出对象锁唤醒线程");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 测试
public static void main(String[] args) {
MyThread1 myThread1 = new MyThread1();
myThread1.start();
MyThread2 myThread2 = new MyThread2();
myThread2.start();
}
// 结果
线程一等待
线程二唤醒等待该对象锁的线程
线程二交出对象锁唤醒线程
线程一被唤醒
** 总结:
1. 无论是等待操作、还是唤醒操作都在同步代码块中进行
2. 在线程A中只有拿到某个对象的锁才能让线程A wait(), wait()之后就是使得该线程交出对象锁
3. 此时线程B就可以拿到对象锁执行 notify(),这个方法将唤醒等待该对象锁(处于wait的线程);当有多个线程都在等待该对象的monitor的话,则只能唤醒其中 一个线程。只是唤醒线程具体是否执行要看CPU调度。
wait() 和 notify() 实现生产者-消费者模型
// 生产者
public class MyThread1 extends Thread {
static int queueLength = 10;
static public PriorityQueue queue = new PriorityQueue(queueLength);
@Override
public void run() {
try {
while (true) {
synchronized (MyThread2.object) {
while (queue.size() == 0) {
System.out.println("消费队列空");
// 把对象锁释放,使得生产者可以获取到对象锁
MyThread2.object.wait();
}
Object poll = queue.poll();
System.out.println("弹出元素"+poll+"剩余元素个数"+queue.size());
// 把对象锁释放,生产者就可以拿到锁生产
MyThread2.object.notify();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 消费者
public class MyThread2 extends Thread {
static Object object = new Object();
@Override
public void run() {
try {
while (true) {
synchronized (MyThread2.object) {
while (MyThread1.queue.size() == MyThread1.queueLength){
System.out.println("生产者队列已满");
MyThread2.object.wait();
}
System.out.println("填充元素");
MyThread1.queue.offer(Math.random());
MyThread2.object.notify();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 测试
public class WaitTest {
public static void main(String[] args) {
MyThread1 myThread1 = new MyThread1();
myThread1.start();
MyThread2 myThread2 = new MyThread2();
myThread2.start();
}
}
基本概念
Condition是个接口,基本的方法就是await()和signal()方法;
Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() ;
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用;
Conditon中的await()对应Object的wait();
Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
生产者-消费者模型的实现
异步编排不同于多线程,它会有任务的执行顺序的要求
启动任务的四个方法
// 返回值 ,不使用指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
// 返回值 ,使用指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
// 无返回值 ,不使用指定线程池
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
// 无返回值 ,使用指定线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
代码示例
// 线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(30), new ThreadPoolExecutor.AbortPolicy());
// ===============================runAsync示例==============================================
CompletableFuture.runAsync(()->{
System.out.println("无返回值的异步方法执行");
},threadPoolExecutor);
//==================================supplyAsync示例=============================================
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("有返回值的异步方法执行");
return 1;
}, threadPoolExecutor);
System.out.println(future.get());
}
一个任务执行结束之后,需要执行业务或者抛出异常之后需要执行的业务
// ==============================whenComplete:没有返回值===============================================
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
// 对上一步执行的结果和异常处理,非自定义线程池
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
// 对上一步执行的结果和异常处理,自定义线程池
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
// ===================================handle:有返回值==========================================
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
// 对上一步执行的结果和异常处理,非自定义线程池
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
// 对上一步执行的结果和异常处理,自定义线程池
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
代码示例
// ============================whenCompleteAsync示例========================================
// 无返回值不能对异常处理
CompletableFuture.supplyAsync(()-> {
System.out.println(10);
System.out.println(1/0);
return 10;
// res:上一步操作的返回值 ; exception: 上一步操作的异常
},threadPoolExecutor ).whenCompleteAsync((res,exception)->{
System.out.println(res);
System.out.println(exception.getMessage());
},threadPoolExecutor);
// ============================handle示例========================================
// handle有返回值可以对异常处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println(10);
System.out.println(1 / 0);
return 10;
// res:上一步操作的返回值 ; exception: 上一步操作的异常
}, threadPoolExecutor).handleAsync((res, exception) -> {
System.out.println(res);
System.out.println(exception.getMessage());
if (res != null) {
return 1;
}
if (exception != null) {
return 0;
}
return null;
}, threadPoolExecutor);
System.out.println("============"+future.get());
串行化是任务的执行顺序的控制,把多个任务按照业务需求排序执行
// =====================thenRun:不能感知上一步结果无返回值===============================================
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
// ========================thenAccept:能感知上一步结果无返回值============================================
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
// ======================thenApply:能感知上一步结果有返回值==============================================
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
// ======================thenCombine:组合任务能感知上一步结果有返回值==================================
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
// ======================thenAcceptBoth:组合任务能感知上一步结果无返回值==================================
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
// ======================thenCombine:组合任务不能感知上一步结果无返回值==================================
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
thenCombine、thenAcceptBoth 和runAfterBoth
这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。
区别:
测试代码:
// ================thenCombine===================
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
return 2;
});
CompletableFuture<Integer> cf3 = cf1.thenCombine(cf2, (a, b) -> {
System.out.println(Thread.currentThread() + " cf3 do something....");
return a + b;
});
System.out.println("cf3结果->" + cf3.get());
// ===============thenAcceptBoth====================
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
return 2;
});
CompletableFuture<Void> cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {
System.out.println(Thread.currentThread() + " cf3 do something....");
System.out.println(a + b);
});
System.out.println("cf3结果->" + cf3.get());
// ==============runAfterBoth========================
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
return 2;
});
CompletableFuture<Void> cf3 = cf1.runAfterBoth(cf2, () -> {
System.out.println(Thread.currentThread() + " cf3 do something....");
});
System.out.println("cf3结果->" + cf3.get());