线程是并发编程中基础的基础,只有先了解如何创建线程,才能进一步学习并发相关的知识。
常见的实现线程的方法有以下几种:
可以通过实现Runnable接口,并重写run()方法,之后将实现Runnable接口的类传入Thread中即可创建线程。
public class RunnableThread implements Runnable {
@Override
public void run() {
System.out.println('用实现Runnable接口实现线程');
}
}
继承Thread类,重写其中的run()方法创建线程:
public class ExtendsThread extends Thread {
@Override
public void run() {
System.out.println('用Thread类实现线程');
}
}
通过线程池,可以指定线程数量,由线程池创建线程。
对于线程池而言,本质上是通过线程工厂创建线程的,默认采用 DefaultThreadFactory ,它会给线程池创建的线程设置一些默认值,比如:线程的名字、是否是守护线程,以及线程的优先级等。
/**
* 描述: 用固定线程数的线程池执行10000个任务
*/
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10000; i++) {
service.execute(new Task());
}
System.out.println(Thread.currentThread().getName());
}
static class Task implements Runnable {
public void run() {
System.out.println("Thread Name: " + Thread.currentThread().getName());
}
}
}
实现Callable接口,并重写call()方法,之后将实现Callable接口的类传入Thread中创建线程。
这种方式与Runnable接口的方式相类似,但是Callable接口通过Future将线程执行的结果作为返回值返回。
class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return new Random().nextInt();
}
}
//创建线程池
ExecutorService service = Executors.newFixedThreadPool(10);
//提交任务,并用 Future提交返回结果
Future<Integer> future = service.submit(new CallableTask());
a. 定时器
新建一个 Timer,令其每隔一段时间之后,执行一些任务,这时它也会创建线程并执行任务。
public class Run {
private static Timer timer=new Timer();
public static void main(String[] args) throws ParseException
{
timer.schedule(new Mytask(), TimeUtil.df.get().parse("2017-09-14 09:19:30"));
}
}
b. 匿名内部类或 lambda 表达式
/**
*描述:匿名内部类创建线程
*/
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}).start();
/**
*描述:lambda表达式创建线程
*/
new Thread(() -> System.out.println(Thread.currentThread().getName())).start();
}
前面介绍了5中创建线程的方法,按理来说,创建线程的方法应该是很多的,然而实际上,创建线程的方法只有一种而已。
对于线程池而言,跟进DefaultThreadFactory 的源码查看具体创建线程的实现:
static class DefaultThreadFactory implements ThreadFactory {
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
可以看到,线程池底层还是通过Thread类创建线程的,只是在创建时传入的参数更多一些。
对于Callable接口,其底层实现依赖FutureTask,而FutureTask实际上实现了Runnable接口,并进行了一系列封装后能够返回返回值。因此Callable接口还是离不开Runnable接口。
对于定时器Timer,其内部实现,本质上还是会有一个继承自 Thread 类的 TimerThread,因此还是通过Thread类创建线程的。
置于匿名内部类和lambda表达式,它们仅仅是一种语法实现,本质上还是基于Thread类。
那么归根到底,其实只有Thread类和Runnable接口两种方式咯?其实这两种方式本质上也是一种。
线程的启动需要调用Thread.start()方法,而start()方法最终会调用run()方法:
@Override
public void run() {
if (target != null) {
target.run();
}
}
这里有两个run()方法,一个是外层的run(),一个是内层的run()。
上述代码中的target其实就是一个 Runnable,实现Runnable并重写run()方法,实际上就是重写这里内层的run()的内部逻辑。
而实现Thread类,重写run()方法,实际上是重写上述代码的外层run()方法。
因此可以这样说,本质上,实现线程只有一种方式,而要想实现线程执行的内容,却有两种方式,也就是可以通过 实现 Runnable 接口的方式,或是继承 Thread 类重写 run() 方法的方式。
通常情况下,我们不会手动停止一个线程,而是允许线程运行到结束,然后让它自然停止。但是如果遇到特殊情况比如:用户突然关闭程序,或程序运行出错重启等,这种情况下,如何安全停止线程就很重要了。
对于 Java 而言,最正确的停止线程的方式是使用 interrupt。但 interrupt 仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。
那么为什么 Java 不提供强制停止线程的能力呢?因为如果不了解对方正在做的工作,贸然强制停止线程就可能会造成一些安全的问题,为了避免造成问题就需要给对方一定的时间来整理收尾工作。比如:线程正在写入一个文件,这时收到终止信号,它就需要根据自身业务判断,是选择立即停止,还是将整个文件写入成功后停止,而如果选择立即停止就可能造成数据不完整,不管是中断命令发起者,还是接收者都不希望数据出现问题。
一旦调用某个线程的 interrupt() 之后,这个线程的中断标记位就会被设置成 true。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,如果标记位被设置成 true,就说明有程序想终止该线程。
要想使用interrupt正确停止线程,应当在线程中做如下设计,在 while 循环体判断语句中,首先通过 Thread.currentThread().isInterrupt() 判断线程是否被中断,随后检查是否还有工作要做。&& 逻辑表示只有当两个判断条件同时满足的情况下,才会去执行下面的工作。
while (!Thread.currentThread().isInterrupted() && more work to do) {
do more work
}
以下是interrupt的使用示例:
public class StopThread implements Runnable {
@Override
public void run() {
int count = 0;
while (!Thread.currentThread().isInterrupted() && count < 1000) {
System.out.println("count = " + count++);
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new StopThread());
thread.start();
Thread.sleep(5);
thread.interrupt();
}
}
run() 方法中,首先判断线程是否被中断,然后判断 count 值是否小于 1000。线程中打印 0~999 的数字,每打印一个数字 count 值加 1。线程会在每次循环开始之前,检查是否被中断了。接下来在 main 函数中会启动该线程,然后休眠 5 毫秒后立刻中断线程,该线程会检测到中断信号,于是在还没打印完1000个数的时候就会停下来,这种就属于通过 interrupt 正确停止线程的情况。
现在考虑一种特殊情况,如果线程在执行任务期间有休眠需求,也就是每打印一个数字,就进入一次 sleep ,而此时将 Thread.sleep() 的休眠时间设置为 1000 秒钟
Runnable runnable = () -> {
int num = 0;
try {
while (!Thread.currentThread().isInterrupted() &&
num <= 1000) {
System.out.println(num);
num++;
Thread.sleep(1000000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
在主线程中,休眠 5 毫秒后,通知子线程中断,此时子线程仍在执行 sleep 语句,处于休眠中。那么就需要考虑一点,在休眠中的线程是否能够感受到中断通知呢?是否需要等到休眠结束后才能中断线程呢?如果是这样,就会带来严重的问题,因为响应中断太不及时了。
public class StopDuringSleep {
public static void main(String[] args) throws InterruptedException {
Runnable runnable = () -> {
int num = 0;
try {
while (!Thread.currentThread().isInterrupted() && num <= 1000) {
System.out.println(num);
num++;
Thread.sleep(1000000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Thread thread = new Thread(runnable);
thread.start();
Thread.sleep(5);
thread.interrupt();
}
}
Java 设计者在设计之初就考虑到以上的问题,如果 sleep、wait 等可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个 InterruptedException 异常,同时清除中断信号,将中断标记位设置成 false。这样一来就不用担心长时间休眠中线程感受不到中断了,因为即便线程还在休眠,仍然能够响应中断通知,并抛出异常。
上面提到, sleep、wait 等方法可以使线程休眠,而处于休眠中的线程被中断,将会抛出一个 InterruptedException 异常,并清除中断信号,将中断标记位设置成 false。
在实际的开发者中,不同的人负责编写不同的方法,然后相互调用来实现整个业务的逻辑。那么如果我们负责编写的方法需要被别人调用,同时我们的方法内调用了 sleep 或者 wait 等能响应中断的方法时,要怎么处理?
a. 方法签名抛异常 / run() 强制 try/catch
如果只是捕捉到异常,没有进行任何处理逻辑,相当于把中断信号给隐藏了,这样做非常不合理。
调用方要不使用 try/catch 并在 catch 中正确处理异常,要不将异常声明到方法签名中。如果每层逻辑都遵守规范,便可以将中断信号层层传递到顶层,最终让 run() 方法可以捕获到异常。而对于 run() 方法而言,它本身没有抛出 checkedException 的能力,只能通过 try/catch 来处理异常。层层传递异常的逻辑保障了异常不会被遗漏,而对 run() 方法而言,就可以根据不同的业务逻辑来进行相应的处理。
void subTask2() throws InterruptedException {
Thread.sleep(1000);
}
b. 再次中断
在 catch 语句中再次中断线程。如代码所示,需要在 catch 语句块中调用 Thread.currentThread().interrupt() 函数。因为如果线程在休眠期间被中断,那么会自动清除中断信号。如果这时手动添加中断信号,中断信号依然可以被捕捉到。这样后续执行的方法依然可以检测到这里发生过中断,可以做出相应的处理,整个线程可以正常退出。
private void reInterrupt() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
除了interrupt之外, volatile标记位也能起到停止线程的目的,但是这种方法有局限性,适用的场景有限。
a. 适用的场景
public class VolatileCanStop implements Runnable {
private volatile boolean canceled = false;
@Override
public void run() {
int num = 0;
try {
while (!canceled && num <= 1000000) {
if (num % 10 == 0) {
System.out.println(num + "是10的倍数。");
}
num++;
Thread.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
VolatileCanStop r = new VolatileCanStop();
Thread thread = new Thread(r);
thread.start();
Thread.sleep(3000);
r.canceled = true;
}
}
声明一个叫作 VolatileStopThread 的类, 它实现了 Runnable 接口,然后在 run() 中进行 while 循环,在循环体中又进行了两层判断,首先判断 canceled 变量的值,canceled 变量是一个被 volatile 修饰的初始值为 false 的布尔值,当该值变为 true 时,while 跳出循环,while 的第二个判断条件是 num 值小于1000000(一百万),在while 循环体里,只要是 10 的倍数就打印出来,然后 num++。
首先启动线程,然后经过 3 秒钟的时间,把用 volatile 修饰的布尔值的标记位设置成 true,这样,正在运行的线程就会在下一次 while 循环中判断出 canceled 的值已经变成 true 了,这样就不再满足 while 的判断条件,跳出整个 while 循环,线程就停止了,这种情况是演示 volatile 修饰的标记位可以正常工作的情况
b. 不适用的场景
class Producer implements Runnable {
public volatile boolean canceled = false;
BlockingQueue storage;
public Producer(BlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
int num = 0;
try {
while (num <= 100000 && !canceled) {
if (num % 50 == 0) {
storage.put(num);
System.out.println(num + "是50的倍数,被放到仓库中了。");
}
num++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("生产者结束运行");
}
}
}
声明了一个生产者 Producer,通过 volatile 标记的初始值为 false 的布尔值 canceled 来停止线程。而在 run() 方法中,while 的判断语句是 num 是否小于 100000 及 canceled 是否被标记。while 循环体中判断 num 如果是 50 的倍数就放到 storage 仓库中,storage 是生产者与消费者之间进行通信的存储器,当 num 大于 100000 或被通知停止时,会跳出 while 循环并执行 finally 语句块,告诉大家“生产者结束运行”。
class Consumer {
BlockingQueue storage;
public Consumer(BlockingQueue storage) {
this.storage = storage;
}
public boolean needMoreNums() {
if (Math.random() > 0.97) {
return false;
}
return true;
}
}
对于消费者 Consumer,它与生产者共用同一个仓库 storage,并且在方法内通过 needMoreNums() 方法判断是否需要继续使用更多的数字,刚才生产者生产了一些 50 的倍数供消费者使用,消费者是否继续使用数字的判断条件是产生一个随机数并与 0.97 进行比较,大于 0.97 就不再继续使用数字。
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue storage = new ArrayBlockingQueue(8);
Producer producer = new Producer(storage);
Thread producerThread = new Thread(producer);
producerThread.start();
Thread.sleep(500);
Consumer consumer = new Consumer(storage);
while (consumer.needMoreNums()) {
System.out.println(consumer.storage.take() + "被消费了");
Thread.sleep(100);
}
System.out.println("消费者不需要更多数据了。");
//一旦消费不需要更多数据了,我们应该让生产者也停下来,但是实际情况却停不下来
producer.canceled = true;
System.out.println(producer.canceled);
}
}
main 函数中,首先创建了生产者/消费者共用的仓库 BlockingQueue storage,仓库容量是 8,并且建立生产者并将生产者放入线程后启动线程,启动后进行 500 毫秒的休眠,休眠时间保障生产者有足够的时间把仓库塞满,而仓库达到容量后就不会再继续往里塞,这时生产者会阻塞,500 毫秒后消费者也被创建出来,并判断是否需要使用更多的数字,然后每次消费后休眠 100 毫秒。
当消费者不再需要数据,就会将 canceled 的标记位设置为 true,理论上此时生产者会跳出 while 循环,并打印输出“生产者运行结束”。
但实际结果是,尽管已经把 canceled 设置成 true,但生产者仍然没有停止,这是因为在这种情况下,生产者在执行 storage.put(num) 时发生阻塞,在它被叫醒之前是没有办法进入下一次循环判断 canceled 的值的,所以在这种情况下用 volatile 是没有办法让生产者停下来的
总结以上两种场景,volatile标记位停止线程的方法会在存在阻塞的时候失效。这种情况下,使用interrupt 语句来中断,即使生产者处于阻塞状态,仍然能够感受到中断信号,并做响应处理。因此相比interrupt ,volatile的使用还是有局限性的。
常见停止线程的方法有: stop(),suspend() 和 resume(),这些方法已经被 Java 直接标记为 @Deprecated。
对于stop()方法,它会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题。
对于 suspend() 和 resume() 而言,它们的问题在于如果线程调用 suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被 resume() 之前,是不会被释放的。
在Java中,线程的生命周期一共有6种状态:
可以通过getState() 方法获取当前线程所处的状态,线程在运行期间的任何时刻只能处于一种状态。
线程被创建但尚未启动的状态,当我们用 new Thread() 新建一个线程时,如果线程没有开始运行 start() 方法,那么此时它的状态就是 New。而一旦线程调用了 start(),它的状态就会从 New 变成 Runnable。
该状态对应操作系统线程状态中的两种状态,分别是 Running 和 Ready。也就是说,Java 中处于 Runnable 状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配 CPU 资源。
如果一个正在运行的线程是 Runnable 状态,当它运行到任务的一半时,执行该线程的 CPU 被调度去做其他事情,导致该线程暂时不运行,它的状态依然不变,还是 Runnable,因为它有可能随时被调度回来继续执行任务。
这三种状态被统称为阻塞状态
阻塞 – Blocked
从 Runnable 状态进入 Blocked 状态只有一种可能,就是进入 synchronized 保护的代码时没有抢到 monitor 锁,无论是进入 synchronized 代码块,还是 synchronized 方法,都是一样。
当处于 Blocked 的线程抢到 monitor 锁,就会从 Blocked 状态回到Runnable 状态。
等待 – Waiting
线程从 Runnable 状态进入 Waiting 状态有三种可能性:
Blocked 与 Waiting 的区别是 Blocked 在等待其他线程释放 monitor 锁,而 Waiting 则是在等待某个条件,比如 join 的线程执行完毕,或者是 notify()/notifyAll() 。进入 waiting 状态是线程主动的,而进入 blocked 状态是被动的。
计时等待 – Timed Waiting
计时等待与等待相似,区别仅在于有没有时间限制,Timed Waiting 会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。
线程从 Runnable 状态进入 Timed Waiting 状态有四种可能性:
三种状态的流转
Blocked -> Runnable
Blocked 状态进入 Runnable 状态,要求线程获取 monitor 锁
Waiting -> Runnable
从 Waiting 状态流转到其他状态则比较特殊,因为首先 Waiting 是不限时的,也就是说无论过了多长时间它都不会主动恢复。只有当执行了 LockSupport.unpark(),或者 join 的线程运行结束,或者被中断时才可以进入 Runnable 状态。
Waiting -> Blocked
如果其他线程调用 notify() 或 notifyAll()来唤醒它,当前Waiting的线程会直接进入 Blocked 状态。因为唤醒 Waiting 线程的线程如果调用 notify() 或 notifyAll(),要求必须首先持有该 monitor 锁,所以处于 Waiting 状态的线程被唤醒时拿不到该锁,就会进入 Blocked 状态,直到执行了 notify()/notifyAll() 的唤醒它的线程执行完毕并释放 monitor 锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从 Blocked 状态回到 Runnable 状态。
Timed Waiting -> Blocked
Timed Waiting 中执行 notify() 和 notifyAll() 也是一样的道理,它们会先进入 Blocked 状态,然后抢夺锁成功后,再回到 Runnable 状态。
Timed Waiting -> Runnable
Timed Waiting状态的线程,当超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到 Runnable 状态,而无需经历 Blocked 状态。
进入Terminated状态有两种可能:
线程的状态切换中,wait / notify / notifyAll / sleep等方法都起到了很重要的作用,而这些方法在实际使用中有不少注意事项。
在使用 wait 方法时,必须把 wait 方法写在 synchronized 保护的 while 代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行 wait 方法,而在执行 wait 方法之前,必须先持有对象的 monitor 锁,也就是通常所说的 synchronized 锁。
假如不按照以上的方法使用wait ,会出现什么问题?下面使用案例进行分析
class BlockingQueue {
Queue<String> buffer = new LinkedList<String>();
public void give(String data) {
buffer.add(data);
notify(); // Since someone may be waiting in take
}
public String take() throws InterruptedException {
while (buffer.isEmpty()) {
wait();
}
return buffer.remove();
}
}
give 方法负责往 buffer 中添加数据,添加完之后执行 notify 方法来唤醒之前等待的线程,而 take 方法负责检查整个 buffer 是否为空,如果为空就进入等待,如果不为空就取出一个数据。
如果这段代码并没有受 synchronized 保护,会有以下场景:
虽然消费者判断了 buffer.isEmpty 条件,但真正执行 wait 方法时,之前的 buffer.isEmpty 的结果已经过期了。原因在于这里的“判断-执行”不是一个原子操作,它在中间被打断了,是线程不安全的。
如果这时没有更多的生产者进行生产,消费者便有可能陷入无穷无尽的等待,因为它错过了刚才 give 方法内的 notify 的唤醒。
为了解决上述问题,需要用synchronized方法,确保 notify 方法永远不会在 buffer.isEmpty 和 wait 方法之间被调用,提升了程序的安全性。另外,wait 方法会释放 monitor 锁,这也要求我们必须首先进入到 synchronized 内持有这把锁。
public void give(String data) {
synchronized (this) {
buffer.add(data);
notify();
}
}
public String take() throws InterruptedException {
synchronized (this) {
while (buffer.isEmpty()) {
wait();
}
return buffer.remove();
}
}
另外,实际上还会发生“虚假唤醒”(spurious wakeup)的问题,线程可能在既没有被notify/notifyAll,也没有被中断或者超时的情况下被唤醒。虽然虚假唤醒发生的概率很小,但是程序依然需要保证在发生虚假唤醒的时候的正确性,所以就需要采用while循环的结构。即便被虚假唤醒了,也会再次检查while里面的条件,如果不满足条件,就会继续wait,也就消除了虚假唤醒的风险。
while (condition does not hold)
obj.wait();
这两种方式的相同点在于:
这两种方式的区别在于:
生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,有时是多个生产者对应多个消费者。这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。
消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,生产者在生产数据后将数据存放在阻塞队列中,消费者获取阻塞队列中的数据。而当阻塞队列已满或是已空都可能会产生线程阻塞。
那么什么时候阻塞线程需要被唤醒?第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产。
那么如何实现生产者消费者模式呢?这里介绍三种方法:
public static void main(String[] args) {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
Runnable producer = () -> {
while (true) {
queue.put(new Object());
}
};
new Thread(producer).start();
new Thread(producer).start();
Runnable consumer = () -> {
while (true) {
queue.take();
}
};
new Thread(consumer).start();
new Thread(consumer).start();
}
首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue,命名为 queue 并将它的容量设置为 10;其次,创建一个简单的生产者,while(true) 循环体中的queue.put() 负责往队列添加数据;然后,创建两个生产者线程并启动;同样消费者也非常简单,while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。为了简介设计,代码里省略了 try/catch 检测。
BlockingQueue实现生产者消费者模式看上去很简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。
public class MyBlockingQueue {
private Queue queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueue (int size) {
this.max = size;
queue = new LinkedList();
}
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
notFull.await();
}
queue.add(o);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
Object item = queue.remove();
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
}
利用Condition自己实现BlockingQueue:
首先,定义了一个队列变量 queue 并设置最大容量为 16;其次,定义了一个 ReentrantLock 类型的 Lock 锁,并在 Lock 锁的基础上创建两个 Condition,一个是 notEmpty,另一个是 notFull,分别代表队列没有空和没有满的条件;最后,声明了 put 和 take 这两个核心方法。
生产者消费者模式通常是面对多线程的场景,因此需要一定的同步措施保障线程安全。
对于put 方法,先将 Lock 锁上,然后,在 while 的条件里检测 queue 是不是已经满了,如果已经满了,则调用 notFull 的 await() 阻塞生产者线程并释放 Lock,如果没有满,则往队列放入数据并利用 notEmpty.signalAll() 通知正在等待的所有消费者并唤醒它们。最后在 finally 中利用 lock.unlock() 方法解锁,把 unlock 方法放在 finally 中是一个基本原则,否则可能会产生无法释放锁的情况。
对于take方法,实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后在 finally 中解锁。
上述代码中,生产者和消费者为什么要用while循环检查队列情况,而不用if判断呢?以消费者为例,生产者消费者往往是多线程的,如果将while( queue.size() == 0 ) 改为 if( queue.size() == 0 ),第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。
利用wait/notify自己实现BlockingQueue:
class MyBlockingQueue {
private int maxSize;
private LinkedList<Object> storage;
public MyBlockingQueue(int size) {
this.maxSize = size;
storage = new LinkedList<>();
}
public synchronized void put() throws InterruptedException {
while (storage.size() == maxSize) {
wait();
}
storage.add(new Object());
notifyAll();
}
public synchronized void take() throws InterruptedException {
while (storage.size() == 0) {
wait();
}
System.out.println(storage.remove());
notifyAll();
}
}
仍然是take 与 put 方法。对于put 方法,put 方法被 synchronized 保护,while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样,take 方法也被 synchronized 修饰,while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。
基于Condition和wait/notify实现BlockingQueue之后,以下为生产者消费者代码:
public class Test {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Producer producer = new Producer(myBlockingQueue);
Consumer consumer = new Consumer(myBlockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private MyBlockingQueue storage;
public Producer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
storage.put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private MyBlockingQueue storage;
public Consumer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
storage.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}