之前所有的例子中,线程的执行都具有一定的随机性。如果希望线程能够有序的执行,必须使用线程通信技术。Java语言提供了一些线程通信的机制能够保证线程的有序执行,本小节将详细讲解线程通信技术。
假如有两个线程,它们分别代表生产者和消费者。当生产者生产了一个产品时,消费者才能取走这个产品,而当消费者没有取走产品时,生产者不能再次生产产品。不难看出:这两个线程必须交替执行才能保证生产者不会重复生产对象以及消费者不能连续两次取走产品。实现这种线程交替执行的模式必须使用线程通信技术完成。
为实现线程通信,Java语言定义了三个方法:分别是wait()、notify()和notifyAll(),下面的表14-2展示了这三个方法的作用。
表14-2线程通信的方法
方法 | 功能 |
void wait() | 使当前线程进入等待状态,直到其他对象调用相同对象的notify()或notifyAll()方法唤醒该线程 |
void wait(long millis) | 使当前线程进入等待状态,如果在millis毫秒内该线程没有被唤醒,则该线程在millis毫秒后自动苏醒 |
void wait(long millis, int nanos) | 使当前线程进入等待状态,如果在millis毫秒内该线程没有被唤醒,则该线程在millis毫秒加nanos纳秒后自动苏醒 |
void notify() | 唤醒被相同对象设置为等待状态的一个线程,如果有多个线程都处于等待状态则随机唤醒其中之一 |
void notifyAll() | 唤醒被相同对象设置为等待状态的所有线程 |
从表14-2可以看出:这三个方法能够控制线程的执行。必须强调:这三个方法是定义在Object类中的,因此Java语言的任何对象都可以控制线程的执行。线程在进入等待状态后会让出CPU以便其他线程有执行机会。此外,表14-2列出的这些控制线程执行的方法必须在synchronized所修饰的方法或代码块中才能调用,否则会出现异常。
在生产者和消费者模式中,程序员可以设置一个boolean型变量flag来表示是否生产出了产品。当flag为false时,表示当前没有生产出产品,此时生产者线程可以生产产品,而消费者线程必须等待。当产品生产出来后,由生产者把flag设置为true,并调用notify()方法唤醒消费者线程。而当flag为true时,表示已经有了产品,此时消费者线程可以取走产品,生产者线程必须必须等待。当消费者线程取走产品后,把flag值设为false,并再次唤醒消费者线程。生产者和消费者两个线程就是这样不断等待以及被唤醒从而保证两个线程交替执行。下面的【例14_13】展示了如何通过线程通信技术实现生产者和消费者两个线程的交替执行。
【例14_13 传统的线程通信】
Exam14_13.java
- class WorkShop1//车间
- {
- int n;//产品编号
- boolean flag = false;//是否生产了产品
- synchronized void put(int n)//生产产品
- {
- if (flag==true){//如果已经有产品则生产者等待
- try
- {
- wait();//等待
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- this.n = n;
- flag = true;
- System.out.println("生产者生产了 " + n+"号产品");
- notify();//唤醒消费者线程
- }
- synchronized int get()//取走产品
- {
- if (flag==false)//如果没有产品则消费者等待
- {
- try
- {
- wait();//等待
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- System.out.println("消费者取走了 "+n+"号产品");
- flag = false;
- notify();//唤醒生产者线程
- return n;
- }
- }
-
- class Producer1 extends Thread//生产者线程
- {
- WorkShop1 workShop;
-
- Producer1(WorkShop1 workShop)
- {
- this.workShop = workShop;
- }
-
- public void run()
- {
- for (int i = 1; i <= 5; i++)
- {
- workShop.put(i);//生产产品
- }
- }
- }
-
- class Consumer1 extends Thread//消费者
- {
- WorkShop1 workShop;
-
- Consumer1(WorkShop1 workShop)
- {
- this.workShop = workShop;
- }
-
- public void run()
- {
- for (int i = 1; i <=5; i++)
- {
- workShop.get();//取走产品
- }
- }
- }
- public class Exam14_13 {
- public static void main(String[] args) {
- WorkShop1 workShop = new WorkShop1();
- new Producer1(workShop).start();//创建并启动生产者线程
- new Consumer1(workShop).start();//创建并启动消费者线程
- }
- }
【例14_13】中,WorkShop1类表示一个生产车间,它个put()方法表示生产一个产品,而get()方法则表示取走该产品。在生产和取走产品时,都会判断当前有没有产品,如果当前有产品,则生产者进入等待状态,而如果当前没有产品则消费者进入等待状态。【例14_13】的运行结果如图14-14所示。
图14-14【例14_13】运行结果
从图14-14可以很明显的看出:两个线程实现了相互交替执行,能够实现线程交替执行的原理是设置了线程的执行条件,并规定在条件不允许的情况下线程要等待。
14.6.1小节讲过:wait()、notify()和notifyAll()方法只有在被synchronized所修饰的方法和代码块中才能使用。如果程序没有使用synchronized关键字来保证同步,而是使用同步锁Lock来保证同步,那么就不能在使用wait()、notify()和notifyAll()这三个方法完成线程通信。为解决这个问题,Java语言提供了一个Condition接口来实现线程通信。
Condition接口也定义了让线程等待和唤醒线程的相关方法,这些方法由下面的表14-3列出。
表14-3 Condition类的方法
方法 | 功能 |
void await() | 使线程进入等待状态 |
long awaitNanos(long nanos) | 使线程进入等待状态,但等待时间不超过nanos纳秒 |
boolean awaitUntil(Date deadline) | 使线程进入等待状态,但等待最晚在deadline这个时间点结束 |
void signal() | 唤醒同一个同步锁的一个等待状态线程,如果有多个线程都处于等待状态,则随机选择一个唤醒 |
void signalAll() | 唤醒同一个同步锁的所有等待状态线程 |
从表14-3可以看到:这些用于线程通信的方法实际上与Object类中所定义的那些方法非常类似。由于一个Condition要与一个同步锁相关联,所以Condition接口的实现类对象要通过与之相关的同步锁对象的newCondition()方法来获得。下面的【例14_14】展示了使用Condition实现线程通信的过程。
【例14_14 Condition线程通信】
Exam14_14.java
- import java.util.concurrent.locks.*;
- class WorkShop2//车间
- {
- int n;//产品编号
- boolean flag = false;//是否生产了产品
- ReentrantLock lock = new ReentrantLock();
- Condition cond = lock.newCondition();
- void put(int n)//生产产品
- {
- lock.lock();//加锁
- try {
- if (flag==true){//如果已经有产品则生产者等待
- cond.await();
- }
- this.n = n;
- flag = true;
- System.out.println("生产者生产了 " + n+"号产品");
- cond.signal();
- }catch (InterruptedException e){
- e.printStackTrace();
- }
- lock.unlock();//解锁
- }
- synchronized int get()//取走产品
- {
- lock.lock();//加锁
- try
- {
- if (flag==false){//如果没有产品则消费者等待
- cond.await();
- }
- System.out.println("消费者取走了 "+n+"号产品");
- flag = false;
- cond.signal();
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- lock.unlock();//解锁
- return n;
- }
- }
-
- class Producer2 extends Thread//生产者线程
- {
- WorkShop2 workShop;
-
- Producer2(WorkShop2 workShop)
- {
- this.workShop = workShop;
- }
-
- public void run()
- {
- for (int i = 1; i <= 5; i++)
- {
- workShop.put(i);//生产产品
- }
- }
- }
-
- class Consumer2 extends Thread//消费者
- {
- WorkShop2 workShop;
-
- Consumer2(WorkShop2 workShop)
- {
- this.workShop = workShop;
- }
-
- public void run()
- {
- for (int i = 1; i <=5; i++)
- {
- workShop.get();//取走产品
- }
- }
- }
- public class Exam14_14 {
- public static void main(String[] args) {
- WorkShop2 workShop = new WorkShop2();
- new Producer2(workShop).start();//创建并启动生产者线程
- new Consumer2(workShop).start();//创建并启动消费者线程
- }
- }
仔细阅读【例14_14】的代码不难发现:它与【例14_13】的代码逻辑是一样的,只是实现同步和线程通信的方式发生了变化,不再使用synchronized关键字和wait()以及notify()方法,而是换成了同步锁以及Condition的await()和signal()方法。【例14_14】的运行结果与【例14_13】的运行结果完全相同,读者可以自行运行这个例子从而体会使用Condition实现线程通信的效果。
从JDK1.5开始,Java语言提供了一个BlockingQueue接口,这个接口被称作“阻塞队列”。虽然BlockingQueue是Queue的子接口,但它的作用并不是用来当作容器,而是被当作同步线程的工具。BlockingQueue之所以能够被当作线程同步的工具,是因为当一个线程向BlockingQueue中放入元素时如果队列已满,则线程被阻塞,直到队列中能够放入元素时线程才恢复运行。相反的,当一个线程从BlockingQueue中取出元素时,如果队列是空的,则线程也会被阻塞,直到线程中有至少一个元素时线程才恢复运行。因此,如果两个线程分别向队列中放入和取出元素,并且队列的长度为1,那么这两个线程就能交替运行。
BlockingQueue是Queue的子接口,所以当然可以使用Queue接口中定义的各种方法,但Queue接口所定义的那些存取元素的方法并不能导致线程阻塞,因此BlockingQueue专门定义了能够阻塞线程的存取元素的方法:put()方法用于把元素放入队尾,如果队列已满则线程阻塞,take()方法用于把元素从队头取出,如果队列为空则线程阻塞。需要注意:take()方法执行“取出”操作时,会把元素从队列中删除而不只是获取元素的引用。
BlockingQueue接口有5个实现类,下面的表14-4展示了每种实现类的功能和特点。
表14-4 BlockingQueue接口的实现类
实现类 | 特点 |
ArrayBlockingQueue | 基于数组实现的BlockingQueue队列 |
LinkedBlockingQueue | 基于链表实现的BlockingQueue队列 |
PriorityBlockingQueue | 能够实现元素自动排序的队列,因此调用remove()、take()方法取出的是队列中最小的元素 |
SynchronousQueue | 同步队列。对该队列的存、取操作必须交替进行 |
DelayQueue | 底层基于PriorityBlockingQueue实现,但要求集合元素都实现Delay接口 |
下面的【例14_15】以ArrayBlockingQueue为例演示使用阻塞队列实现生产者和消费者两个线程交替执行的实现过程。
【例14_15阻塞队列线程通信】
Exam14_15.java
- import java.util.concurrent.ArrayBlockingQueue;
- class Producer3 extends Thread//生产者线程
- {
- ArrayBlockingQueue abq;
- Producer3(ArrayBlockingQueue abq)
- {
- this.abq = abq;
- }
-
- public void run()
- {
- try {
- for (int i = 1; i <= 5; i++)
- {
- abq.put(i);//生产产品
- System.out.println("生产者生产了"+i+"号产品");//打印所做操作
-
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- class Consumer3 extends Thread//消费者
- {
- ArrayBlockingQueue abq;
- Consumer3(ArrayBlockingQueue abq)
- {
- this.abq = abq;
- }
-
- public void run()
- {
- try {
- for (int i = 1; i <=5; i++)
- {
- abq.take();//取走产品
- System.out.println("消费者取走了"+i+"号产品");//打印所做操作
- }
- }catch (InterruptedException e){
- e.printStackTrace();
- }
-
- }
- }
- public class Exam14_15 {
- public static void main(String[] args) {
- ArrayBlockingQueue abq = new ArrayBlockingQueue(1);//设置阻塞队列长度为1
- new Producer3(abq).start();//创建并启动生产者线程
- new Consumer3(abq).start();//创建并启动消费者线程
- }
- }
从【例14_15】的代码中可以很明显的看到:使用阻塞队列实现线程通信不需要用WorkShop这样表示生产车间的类。线程直接向阻塞队列中存入或取出数据(即产品)即可,由于队列的长度为1,因此生产者线程放入数据后不能再次放入数据,只能进入阻塞状态并由消费者线程取走数据。同理,消费者线程取走数据后,队列变空,这时消费者线程不能再次取走数据,只能进入阻塞状态并由生产者线程向队列中放入数据。这样的话就任意一个线程都不能连续两次执行任务,线程只能交替执行。
如果读者执行【例14_15】,会发现运行结果不一定如图14-14所示的那样。这是因为每个线程每次所要执行的任务都分为两个部分:第一部分是存入或取出数据,第二部分是打印所做的操作。实际上,存取数据的操作是严格按照交替的方式执行的,但打印操作确未必是交替完成的。这是因为线程从阻塞状态恢复到可运行状态后会有可能会立刻抢占CPU,从而导致打印的结果不一定与存取数据的顺序相对应。例如:一开始消费者线程抢到了CPU,但由于队列为空因此进入阻塞状态,紧接着生产者线程向队列存入数据。本来在存入数据后应该打印“生产者生产了1号产品”,但当存入数据后消费者线程会脱离阻塞进入就绪状态,如果此时消费者线程抢到了CPU,那么还没等生产者线程打印出这条信息,消费者线程就执行取出数据的操作,如果消费者线程继续占据CPU,就会打印出“消费者取走了1号产品”,这样的话控制台上就会出现“先消费后生产”的情况,有时甚至打印结果还会出现连续两次生产或连续两次消费的情况。因此可以看出:使用阻塞队列实现线程通信,如果只是控制两个线程交替存取数据确实是很容易的,但如果除了存取数据还有其他操作的话,那么这种方式具有一定的局限性。