Java Platform, Standard Edition (Java SE) 11
官网DEV
Java教程是为JDK 8编写的。本页中描述的示例和实践没有利用以后版本中引入的改进,并且可能使用不再可用的技术。
计算机用户理所当然地认为他们的系统一次可以做不止一件事。他们认为,当其他应用程序下载文件、管理打印队列和流式传输音频时,他们可以继续在文字处理器中工作。即使是单个应用程序也经常被期望一次做不止一件事。例如,流式音频应用程序必须同时从网络上读取数字音频、解压缩、管理播放并更新其显示。即使是文字处理器也应该随时准备好响应键盘和鼠标事件,无论它有多忙,都要重新格式化文本或更新显示。能够做这些事情的软件被称为并发软件(concurrent software)。
Java平台从一开始就被设计为支持并发编程,在Java编程语言和Java类库中具有基本的并发支持。从5.0版本开始,Java平台还包含了高级并发api。本课介绍了平台的基本并发支持,并总结了java.util.concurrent包中的一些高级api。
在并发编程中,有两个基本的执行单位:进程(processes)和线程(threads)。在Java编程语言中,并发编程主要与线程有关。然而,进程也很重要。
一个计算机系统通常有许多活动的进程和线程。即使在只有一个执行核心的系统中也是如此,因此在任何给定时刻只有一个线程实际执行。单个内核的处理时间通过称为时间分切(time slicing)的操作系统特性在进程和线程之间共享。
计算机系统拥有多个处理器或具有多个执行核的处理器正变得越来越普遍。这极大地增强了系统并发执行进程和线程的能力——但即使在没有多个处理器或执行核心的简单系统上,并发也是可能的。
进程具有自包含的执行环境。一个进程通常拥有一套完整的、私有的基本运行时资源;特别是,每个进程都有自己的内存空间。
进程通常被视为程序或应用程序的同义词。然而,用户所看到的单个应用程序实际上可能是一组协作进程。为了方便进程间的通信,大多数操作系统都支持进程间通信(Inter Process Communication,IPC)资源,比如管道(pipe)和套接字(socket)。IPC不仅用于同一系统上的进程之间的通信,也用于不同系统上的进程之间的通信。
Java虚拟机的大多数实现作为单个进程运行。Java应用程序可以使用ProcessBuilder对象创建额外的进程。多进程应用程序超出了本课的范围。
线程有时被称为轻量级进程(lightweight processes)。进程和线程都提供了执行环境,但是创建一个新线程比创建一个新进程需要更少的资源。
线程存在于进程中——每个进程至少有一个线程。线程共享进程的资源,包括内存和打开的文件。这使得沟通更有效率,但也有潜在的问题。
多线程执行是Java平台的一个基本特性。每个应用程序至少有一个线程——如果算上执行内存管理和信号处理等任务的“系统”线程,则至少有几个。但是从应用程序程序员的角度来看,您只从一个线程开始,称为主线程(main thread)。这个线程能够创建其他线程,我们将在下一节中演示。
每个线程都与Thread类的一个实例相关联。使用Thread对象创建并发应用程序有两种基本策略。
Thread 即可。executor)。本节记录了Thread对象的使用。执行器(executor)与其他高阶并发对象一起讨论。
创建Thread实例的应用程序必须提供将在该线程中运行的代码。有两种方法:
Runnable对象。Runnable接口定义了一个方法run,用于包含线程中执行的代码。Runnable对象被传递给Thread构造函数,如HelloRunnable示例:public class HelloRunnable implements Runnable {
public void run() {
System.out.println("Hello from a thread!");
}
public static void main(String args[]) {
(new Thread(new HelloRunnable())).start();
}
}
Thread。Thread类本身实现了Runnable,尽管它的run方法什么也不做。应用程序可以继承Thread,提供自己的run实现,如HelloThread示例:public class HelloThread extends Thread {
public void run() {
System.out.println("Hello from a thread!");
}
public static void main(String args[]) {
(new HelloThread()).start();
}
}
注意,这两个例子都调用了Thread.start 以启动新线程。
你应该使用哪种风格呢?第一种习惯用法使用Runnable对象,它更通用,因为Runnable对象可以继承Thread以外的类。第二种习惯用法在简单的应用程序中更容易使用,但它受到任务类必须是Thread的后代这一事实的限制。本课重点介绍第一种方法,它将Runnable任务与执行该任务的Thread对象分开。这种方法不仅更加灵活,而且适用于后面介绍的高阶线程管理api。
Thread类定义了许多对线程管理有用的方法。这些方法包括静态方法,它提供有关调用该方法的线程的信息或影响其状态。其他方法由管理线程和Thread 对象所涉及的其他线程调用。我们将在下面的部分中研究其中的一些方法。
Thread.sleep 引起当前线程暂停执行一段指定的时间。这是将处理器时间提供给应用程序的其他线程或可能在计算机系统上运行的其他应用程序的有效方法。sleep方法还可以用于调整速度(如下面的示例所示),以及等待另一个需要时间的任务的线程(如后面小节中的SimpleThreads示例)。
提供了两个重载版本的sleep: 一个将睡眠时间指定为毫秒,另一个将睡眠时间指定为纳秒。但是,不能保证这些睡眠时间是精确的,因为它们受到底层操作系统提供的功能的限制。此外,睡眠周期可以通过中断(interrupts)终止,我们将在后面的部分中看到这一点。在任何情况下,都不能假设调用sleep会在指定的时间段内挂起线程。
SleepMessages示例使用sleep每隔4秒打印消息:
public class SleepMessages {
public static void main(String args[])
throws InterruptedException {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
for (int i = 0;
i < importantInfo.length;
i++) {
//Pause for 4 seconds
Thread.sleep(4000);
//Print a message
System.out.println(importantInfo[i]);
}
}
}
注意main声明它抛出InterruptedException。这是当另一个线程在sleep处于活动状态时中断当前线程时,sleep 抛出的异常。由于此应用程序没有定义导致中断的另一个线程,因此它不需要捕获InterruptedException。
中断(interrupt)是对线程的一个指示,它应该停止正在做的事情并做其他事情。由程序员来决定线程如何准确地响应中断,但是线程终止是很常见的。这是本课所强调的用法。
线程通过调用被中断线程 Thread 对象上的interrupt来发送中断。要使中断机制正常工作,被中断的线程必须支持自己的中断。
线程如何支持自己的中断?这取决于它当前正在做什么。如果线程频繁地调用抛出InterruptedException的方法,它只需在捕获该异常后从run方法返回即可。例如,假设SleepMessages示例中的中心消息循环位于线程的Runnable对象的run方法中。然后可以这样修改以支持中断:
for (int i = 0; i < importantInfo.length; i++) {
// Pause for 4 seconds
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
// We've been interrupted: no more messages.
return;
}
// Print a message
System.out.println(importantInfo[i]);
}
许多抛出InterruptedException的方法(比如sleep)被设计为取消它们当前的操作,并在接收到中断时立即返回。
如果线程长时间没有调用抛出InterruptedException的方法,该怎么办?然后,它必须定期调用Thread.interrupted,如果接收到中断,则返回true。例如:
for (int i = 0; i < inputs.length; i++) {
heavyCrunch(inputs[i]);
if (Thread.interrupted()) {
// We've been interrupted: no more crunching.
return;
}
}
在这个简单的示例中,代码只是测试中断,并在收到中断时退出线程。在更复杂的应用程序中,抛出InterruptedException可能更有意义:
if (Thread.interrupted()) {
throw new InterruptedException();
}
这允许中断处理代码集中在catch子句中。
中断机制是使用称为中断状态(interrupt status)的内部标志来实现的。调用Thread.interrupt设置这个标志。当线程通过调用静态方法Thread.interrupted检查中断时,中断状态将被清除。非静态的isInterrupted方法被一个线程用来查询另一个线程的中断状态,它不会改变中断状态标志。
按照惯例,任何通过抛出InterruptedException而退出的方法都会在退出时清除中断状态。然而,总是有可能中断状态会立即被另一个调用interrupt的线程再次设置。
join方法允许一个线程等待另一个线程的完成。如果t是一个线程正在执行的Thread对象,
t.join();
导致当前线程暂停执行,直到t的线程终止。join的重载允许程序员指定一个等待时间。但是,与sleep一样,join依赖于操作系统的计时,因此您不应该假设join将完全按照您指定的时间等待。
与sleep类似,join通过使用InterruptedException退出来响应中断。
下面的示例汇集了本节的一些概念。SimpleThreads由两个线程组成。第一个是每个Java应用程序都有的主线程(main thread)。主线程从Runnable对象,MessageLoop,创建一个新线程,并等待它完成。如果MessageLoop线程花了太长时间才完成,主线程就会中断它。
MessageLoop线程打印出一系列消息。如果在打印所有消息之前被中断,MessageLoop线程将打印一条消息并退出。
public class SimpleThreads {
// Display a message, preceded by
// the name of the current thread
static void threadMessage(String message) {
String threadName =
Thread.currentThread().getName();
System.out.format("%s: %s%n",
threadName,
message);
}
private static class MessageLoop
implements Runnable {
public void run() {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
try {
for (int i = 0;
i < importantInfo.length;
i++) {
// Pause for 4 seconds
Thread.sleep(4000);
// Print a message
threadMessage(importantInfo[i]);
}
} catch (InterruptedException e) {
threadMessage("I wasn't done!");
}
}
}
public static void main(String args[])
throws InterruptedException {
// Delay, in milliseconds before
// we interrupt MessageLoop
// thread (default one hour).
long patience = 1000 * 60 * 60;
// If command line argument
// present, gives patience
// in seconds.
if (args.length > 0) {
try {
patience = Long.parseLong(args[0]) * 1000;
} catch (NumberFormatException e) {
System.err.println("Argument must be an integer.");
System.exit(1);
}
}
threadMessage("Starting MessageLoop thread");
long startTime = System.currentTimeMillis();
Thread t = new Thread(new MessageLoop());
t.start();
threadMessage("Waiting for MessageLoop thread to finish");
// loop until MessageLoop
// thread exits
while (t.isAlive()) {
threadMessage("Still waiting...");
// Wait maximum of 1 second
// for MessageLoop thread
// to finish.
t.join(1000);
if (((System.currentTimeMillis() - startTime) > patience)
&& t.isAlive()) {
threadMessage("Tired of waiting!");
t.interrupt();
// Shouldn't be long now
// -- wait indefinitely
t.join();
}
}
threadMessage("Finally!");
}
}
线程主要通过共享对字段和引用字段所引用的对象的访问进行通信。这种形式的通信非常有效,但是可能产生两种错误:线程干扰(thread interference)和内存一致性错误(memory consistency errors)。防止这些错误所需的工具是同步(synchronization)。
但是,同步可能会引入线程竞争(thread contention),当两个或多个线程试图同时访问相同的资源时,就会发生竞争,并导致Java运行时执行一个或多个线程的速度变慢,甚至暂停它们的执行。 Starvation and livelock 是线程争用的形式。有关更多信息,请参阅生命周期部分。
本节涵盖以下主题:
考虑一个名为Counter的简单类
class Counter {
private int c = 0;
public void increment() {
c++;
}
public void decrement() {
c--;
}
public int value() {
return c;
}
}
Counter被设计成每次调用increment都会给c加1,每次调用decrement 都会从c减1。然而,如果一个Counter对象被多个线程引用,线程之间的干扰可能会阻止这种情况按期望的发生。
当两个操作在不同的线程中运行,但交错作用于相同的数据时,就会发生干扰。这意味着这两个操作由多个步骤组成,并且步骤序列重叠。
Counter实例上的操作似乎不可能交错,因为c上的两个操作都是单个的简单语句。然而,即使是简单的语句也可以被虚拟机转换成多个步骤。我们不会检查虚拟机所采取的具体步骤——知道单个表达式c++;可以分解为三个步骤就足够了:
c--;可以用同样的方法分解,除了第二步是递减而不是递增。假设线程A在调用increment 的同时,线程B调用decrement。如果c的初始值为0,它们的交错动作可能遵循以下顺序:
线程A的结果丢失了,被线程b覆盖。这种特殊的交错只是一种可能性。在不同的情况下,可能是线程B的结果丢失了,或者根本就没有错误。因为线程干扰错误是不可预测的,所以很难检测和修复。
当不同的线程对应该是相同的数据有不一致的视图时,就会发生内存一致性错误(Memory consistency errors)。内存一致性错误的原因很复杂,超出了本教程的范围。幸运的是,程序员不需要详细了解这些原因。所需要的只是一个避免它们的策略。
避免内存一致性错误的关键是理解happens-before关系。这种关系只是保证一个特定语句的内存写入对另一个特定语句可见。要了解这一点,请考虑以下示例。假设定义并初始化了一个简单的int字段:
int counter = 0;
counter 字段在两个线程A和B之间共享。假设线程A增加counter:
counter++;
然后,不久之后,线程B打印出counter:
System.out.println(counter);
如果这两个语句是在同一个线程中执行的,那么可以放心地假设输出的值是“1”。但是,如果这两个语句在单独的线程中执行,则输出的值很可能是“0”,因为不能保证线程A对counter的更改对线程B是可见的——除非程序员在这两个语句之间建立了happens-before关系。
有几个方法可以创造“happens-before”的关系。其中之一是同步,我们将在下面几节中看到。
我们已经看到了两个创建“ happens-before”关系的动作。
Thread.start时,每个与该语句有happens-before关系的语句也与新线程执行的每个语句都有happens-before关系。导致创建新线程的代码的效果对新线程是可见的。Thread.join返回,那么终止线程执行的所有语句与成功连接之后的所有语句都具有happens-before关系。执行连接的线程现在可以看到线程中代码的效果。有关创建happens-before关系的操作列表,请参阅java.util.concurrent 包的Summary页面。
Java编程语言提供了两种基本的同步习惯用法:同步方法( synchronized methods)和同步语句(synchronized statements)。这两种语句中更复杂的同步语句将在下一节中描述。本节介绍同步方法。
要使一个方法变成同步,只需在它的声明中添加synchronized关键字:
public class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c--;
}
public synchronized int value() {
return c;
}
}
如果count是SynchronizedCounter的一个实例,那么使这些方法同步有两个效果:
这保证了对象状态的改变对所有线程都是可见的。注意,构造函数不能同步——在构造函数中使用synchronized关键字会导致语法错误。同步构造函数没有意义,因为只有创建对象的线程才能在对象被构造时访问它。
警告:
在构造将在线程之间共享的对象时,要非常小心,以免对该对象的引用过早地“泄漏”。例如,假设您希望维护一个名为instances的List,其中包含类的每个实例。你可能想在构造函数中添加下面这行代码:
instances.add(this);
但是,在对象的构造完成之前,其他线程可以使用instances访问该对象。
同步方法支持一种简单的策略来防止线程干扰和内存一致性错误:如果一个对象对多个线程可见,那么对该对象变量的所有读写都通过同步synchronized完成。(一个重要的例外:final字段,在对象构造之后不能修改,一旦对象构造完成,就可以通过非同步方法安全地读取)这种策略是有效的,但是可能会出现liveness问题,我们将在本课后面看到。
同步是围绕称为内部锁(intrinsic lock)或监视锁(monitor lock)的内部实体构建的。(API规范通常将此实体简单地称为“监视器”)内部锁在同步的两个方面都发挥作用: 强制对对象状态的独占访问,以及建立对可见性至关重要的happens-before关系。
每个对象都有一个与之相关的内部锁。按照惯例,需要独占和一致访问对象字段的线程必须在访问它们之前获得(acquire)对象的内部锁,然后在使用完它们时释放(release)内部锁。一个线程在获得锁和释放锁之间拥有(own)该锁。只要一个线程拥有一个内部锁,其他线程就不能获得这个锁。当另一个线程试图获取锁时,它将阻塞。
当线程释放一个内部锁时,在该操作和任何后续获取该锁之间建立了happens-before关系。
当线程调用同步方法时,它会自动获取该方法对象的内部锁,并在方法返回时释放锁。即使返回是由未捕获的异常引起的,也会发生锁释放。
您可能想知道当调用静态同步方法时会发生什么,因为静态方法与类而不是对象相关联。在这种情况下,线程获取与类关联的Class对象的内部锁。因此,对类静态字段的访问由一个锁控制,该锁与类的任何实例的锁不同。
另一种创建同步代码的方法是使用同步语句(synchronized statements)。与同步方法不同,同步语句必须指定提供内部锁的对象:
public void addName(String name) {
synchronized(this) {
lastName = name;
nameCount++;
}
nameList.add(name);
}
在本例中,addName方法需要同步对lastName和nameCount的更改,但也需要避免同步调用其他的对象方法。(从同步代码中调用其他对象的方法可能会产生一些问题,这些问题将在livveness部分中描述。)如果没有同步语句,就必须有一个单独的非同步方法,其唯一目的是调用nameList.add。
同步语句对于通过细粒度同步提高并发性也很有用。例如,假设MsLunch类有两个实例字段c1和c2,它们永远不会一起使用。这些字段的所有更新都必须同步,但是没有理由阻止c1的更新与c2的更新交织在一起——这样做会创建不必要的阻塞,从而降低并发性。我们没有使用同步方法,也没有使用与this相关的锁,而是创建了两个对象来单独提供锁。
public class MsLunch {
private long c1 = 0;
private long c2 = 0;
private Object lock1 = new Object();
private Object lock2 = new Object();
public void inc1() {
synchronized(lock1) {
c1++;
}
}
public void inc2() {
synchronized(lock2) {
c2++;
}
}
}
要非常小心地使用这个习语。您必须绝对确定交叉访问受影响的字段确实是安全的。
回想一下,一个线程不能获得另一个线程拥有的锁。但是线程可以获得它已经拥有的锁。允许一个线程多次获取同一个锁可以实现可重入同步(reentrant synchronization)。这描述了一种情况,即同步代码直接或间接地调用了一个也包含同步代码的方法,并且两组代码使用相同的锁。如果没有可重入同步,同步代码将不得不采取许多额外的预防措施,以避免线程导致自身阻塞。
在编程中,原子操作是有效地同时发生的操作。原子操作不会中途停止:它要么完全发生,要么根本不发生。在操作完成之前,原子操作的副作用是不可见的。
我们已经看到,增加表达式(如c++)不被描述成原子操作。即使是非常简单的表达式也可以定义复杂的操作,这些操作可以分解为其他操作。然而,你可以指定原子操作:
long和double类型之外的所有类型),读和写都是原子的。volatile的变量(包括long和double),读和写都是原子性的。原子操作不能交错,因此可以不用担心线程干扰而使用它们。但是,这并不能消除同步原子操作的所有需要,因为仍然可能出现内存一致性错误。使用volatile 变量降低了内存一致性错误的风险,因为对volatile变量的任何写入都会与对该变量的后续读取建立happens-before关系。这意味着对volatile变量的更改对于其他线程总是可见的。此外,这还意味着,当线程读取一个volatile 变量时,它不仅会看到对该volatile 变量的最新更改,还会看到导致更改的代码的副作用。
使用简单的原子变量访问比通过同步代码访问这些变量更有效,但需要程序员更加小心,以避免内存一致性错误。额外的努力是否值得取决于应用程序的大小和复杂性。
java.util.concurrent 包中的一些类提供了不依赖于同步的原子方法。我们将在高阶并发对象一节中讨论它们。
并发应用程序及时执行的能力称为其存活性(liveness)。本节描述了最常见的生存问题——死锁( deadlock),并接着简要描述了另外两种生存问题——饥饿和活锁(starvation and livelock)。
死锁描述了两个或多个线程被永久阻塞,彼此等待的情况。这里有一个例子。
阿尔方斯和加斯顿是朋友,他们都非常相信礼貌。礼貌的一条严格规则是,当你向朋友鞠躬时,你必须一直鞠躬,直到你的朋友有机会回礼。不幸的是,这条规则并没有考虑到两个朋友同时互相鞠躬的可能性。这个示例应用程序Deadlock模拟了这种可能性:
public class Deadlock {
static class Friend {
private final String name;
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public synchronized void bow(Friend bower) {
System.out.format("%s: %s"
+ " has bowed to me!%n",
this.name, bower.getName());
bower.bowBack(this);
}
public synchronized void bowBack(Friend bower) {
System.out.format("%s: %s"
+ " has bowed back to me!%n",
this.name, bower.getName());
}
}
public static void main(String[] args) {
final Friend alphonse =
new Friend("Alphonse");
final Friend gaston =
new Friend("Gaston");
new Thread(new Runnable() {
public void run() { alphonse.bow(gaston); }
}).start();
new Thread(new Runnable() {
public void run() { gaston.bow(alphonse); }
}).start();
}
}
当Deadlock运行时,两个线程在试图调用bowBack时极有可能会阻塞。两个块都不会结束,因为每个线程都在等待另一个线程退出。
与死锁相比,饥饿和活锁(Starvation and livelock)不太常见,但仍然是每个并发软件设计人员都可能遇到的问题。
饥饿(Starvation )描述了线程无法获得对共享资源的常规访问并且无法取得进展的情况。当共享资源被“贪婪”线程长时间占有不可用时,就会发生这种情况。例如,假设对象提供了一个同步方法,该方法通常需要很长时间才能返回。如果一个线程频繁地调用此方法,那么同样需要频繁同步访问同一对象的其他线程通常会被阻塞。
一个线程经常响应另一个线程的动作。如果另一个线程的操作也是对另一个线程的操作的响应,那么就可能导致活锁。与死锁一样,被活动锁定的线程无法进行进一步的处理。然而,线程并没有被阻塞——它们只是忙于响应彼此而无法恢复工作。这就好比两个人在走廊里互相超车:阿尔方斯移到他的左边让加斯顿通过,而加斯顿移到右边让阿尔方斯通过。看到他们仍然互相阻挡,Alphone移到他的右边,而Gaston移到他的左边。他们还在互相阻挡,所以…
线程经常需要协调它们的动作。最常见的协调方式是保护块(guarded block)。这样的块首先轮询一个条件,该条件必须为真,然后块才能继续。为了正确地做到这一点,需要遵循一些步骤。
例如,假设guardedJoy是一个方法,它必须在另一个线程设置了共享变量joy之后才能进行。理论上,这样的方法可以简单地循环,直到满足条件为止,但是这种循环是浪费的,因为它在等待期间连续执行。
public void guardedJoy() {
// Simple loop guard. Wastes
// processor time. Don't do this!
while(!joy) {}
System.out.println("Joy has been achieved!");
}
更有效的守卫是调用 Object.wait 挂起当前线程。wait的调用不会返回,直到另一个线程发出某个特殊事件可能已经发生的通知——尽管不一定是这个线程正在等待的事件:
public synchronized void guardedJoy() {
// This guard only loops once for each special event, which may not
// be the event we're waiting for.
while(!joy) {
try {
wait();
} catch (InterruptedException e) {}
}
System.out.println("Joy and efficiency have been achieved!");
}
注意:总是在测试等待条件的循环中调用
wait。不要假设中断是针对您正在等待的特定条件,或者该条件仍然为真。
与许多挂起执行的方法一样,wait可以抛出InterruptedException。在这个例子中,我们可以忽略这个异常——我们只关心joy的值。
为什么这个版本的guardedJoy是同步的?假设d是我们用来调用wait的对象。当线程调用d.wait时,它必须拥有d的固有锁,否则会抛出错误。在同步方法中调用wait是获取这个内部锁的一种简单方法。
当调用wait时,线程释放锁并挂起执行。在将来的某个时间,另一个线程将获得相同的锁并调用 Object.notifyAll,通知所有等待该锁的线程发生了重要的事情:
public synchronized notifyJoy() {
joy = true;
notifyAll();
}
在第二个线程释放锁一段时间后,第一个线程重新获得锁,并通过调用wait返回继续执行。
注意:还有第二个通知方法
notify,它唤醒单个线程。因为notify不允许指定被唤醒的线程,所以它只在大规模并行应用程序中有用——也就是说,具有大量线程的程序,所有线程都做类似的工作。在这样的应用程序中,您不关心哪个线程被唤醒。
让我们使用保护块来创建一个生产者-消费者( Producer-Consumer )应用程序。这种类型的应用程序在两个线程之间共享数据:创建数据的生产者(producer)和处理数据的消费者(consumer)。这两个线程使用共享对象进行通信。协调是必不可少的:在生产者线程交付数据之前,消费者线程不能试图检索数据,如果消费者还没有检索到旧数据,生产者线程也不能试图交付新数据。
在本例中,数据是一系列文本消息,它们通过Drop类型的对象共享:
public class Drop {
// Message sent from producer
// to consumer.
private String message;
// True if consumer should wait
// for producer to send message,
// false if producer should wait for
// consumer to retrieve message.
private boolean empty = true;
public synchronized String take() {
// Wait until message is
// available.
while (empty) {
try {
wait();
} catch (InterruptedException e) {}
}
// Toggle status.
empty = true;
// Notify producer that
// status has changed.
notifyAll();
return message;
}
public synchronized void put(String message) {
// Wait until message has
// been retrieved.
while (!empty) {
try {
wait();
} catch (InterruptedException e) {}
}
// Toggle status.
empty = false;
// Store message.
this.message = message;
// Notify consumer that status
// has changed.
notifyAll();
}
}
在 Producer中定义的生产者线程发送一系列熟悉的消息。字符串“DONE”表示所有消息都已发送。为了模拟真实应用程序的不可预测性,生产者线程在消息之间随机暂停。
import java.util.Random;
public class Producer implements Runnable {
private Drop drop;
public Producer(Drop drop) {
this.drop = drop;
}
public void run() {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
Random random = new Random();
for (int i = 0;
i < importantInfo.length;
i++) {
drop.put(importantInfo[i]);
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {}
}
drop.put("DONE");
}
}
在 Consumer中定义的消费者线程只是检索消息并将其打印出来,直到它检索到“DONE”字符串。这个线程也会以随机的间隔暂停。
import java.util.Random;
public class Consumer implements Runnable {
private Drop drop;
public Consumer(Drop drop) {
this.drop = drop;
}
public void run() {
Random random = new Random();
for (String message = drop.take();
! message.equals("DONE");
message = drop.take()) {
System.out.format("MESSAGE RECEIVED: %s%n", message);
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {}
}
}
}
最后,这是在ProducerConsumerExample中定义的主线程,它启动生产者和消费者线程。
public class ProducerConsumerExample {
public static void main(String[] args) {
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
注:
Drop类是为了演示被保护的方块而编写的。为了避免重复发明轮子,在尝试编写自己的数据共享对象之前,请检查Java Collections Framework中现有的数据结构。有关更多信息,请参阅问题和练习部分。
如果对象的状态在构造之后不能改变,则认为对象是不可变的(immutable)。对不可变对象的最大依赖被广泛接受为创建简单、可靠代码的可靠策略。
不可变对象在并发应用程序中特别有用。因为它们不能改变状态,所以它们不会被线程干扰破坏,也不会处于不一致的状态。
程序员通常不愿意使用不可变对象,因为他们担心创建新对象的成本,而不是就地更新对象。对象创建的影响通常被高估了,并且可以通过与不可变对象相关的一些效率来抵消。这包括减少由于垃圾收集造成的开销,以及消除保护可变对象免受损坏所需的代码。
下面的小节接受一个实例是可变的类,并从中派生一个具有不可变实例的类。通过这样做,他们给出了这种转换的一般规则,并演示了不可变对象的一些优点。
SynchronizedRGB类定义了表示颜色的对象。每个对象将颜色表示为三个整数(代表原色值)和一个字符串(表示颜色名称)。
public class SynchronizedRGB {
// Values must be between 0 and 255.
private int red;
private int green;
private int blue;
private String name;
private void check(int red,
int green,
int blue) {
if (red < 0 || red > 255
|| green < 0 || green > 255
|| blue < 0 || blue > 255) {
throw new IllegalArgumentException();
}
}
public SynchronizedRGB(int red,
int green,
int blue,
String name) {
check(red, green, blue);
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
public void set(int red,
int green,
int blue,
String name) {
check(red, green, blue);
synchronized (this) {
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
}
public synchronized int getRGB() {
return ((red << 16) | (green << 8) | blue);
}
public synchronized String getName() {
return name;
}
public synchronized void invert() {
red = 255 - red;
green = 255 - green;
blue = 255 - blue;
name = "Inverse of " + name;
}
}
必须小心使用SynchronizedRGB,以避免被看到处于不一致状态。例如,假设一个线程执行以下代码:
SynchronizedRGB color =
new SynchronizedRGB(0, 0, 0, "Pitch Black");
...
int myColorInt = color.getRGB(); //Statement 1
String myColorName = color.getName(); //Statement 2
如果另一个线程调用color.set在语句1之后,但在语句2之前,myColorInt的值将不匹配myColorName的值。为了避免这种结果,这两个语句必须绑定在一起:
synchronized (color) {
int myColorInt = color.getRGB();
String myColorName = color.getName();
}
这种不一致只可能出现在可变对象中——对于SynchronizedRGB的不可变版本来说,这不会是一个问题
下面的规则定义了一个创建不可变对象的简单策略。并不是所有记录为“不可变”的类都遵循这些规则。这并不一定意味着这些类的创建者很草率——他们可能有充分的理由相信,类的实例在构造之后永远不会改变。然而,这种策略需要复杂的分析,不适合初学者。
final和private。final。更复杂的方法是使构造函数private 并在工厂方法中构造实例。将此策略应用于SynchronizedRGB将导致以下步骤:
set,它任意地转换对象,在类的不可变版本中没有位置。第二个是invert,它可以通过创建一个新对象而不是修改现有对象来进行调整。private;他们进一步被修饰为final。final。在这些改变之后,我们有了 ImmutableRGB:
final public class ImmutableRGB {
// Values must be between 0 and 255.
final private int red;
final private int green;
final private int blue;
final private String name;
private void check(int red,
int green,
int blue) {
if (red < 0 || red > 255
|| green < 0 || green > 255
|| blue < 0 || blue > 255) {
throw new IllegalArgumentException();
}
}
public ImmutableRGB(int red,
int green,
int blue,
String name) {
check(red, green, blue);
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
public int getRGB() {
return ((red << 16) | (green << 8) | blue);
}
public String getName() {
return name;
}
public ImmutableRGB invert() {
return new ImmutableRGB(255 - red,
255 - green,
255 - blue,
"Inverse of " + name);
}
}
同步代码依赖于一种简单的可重入锁(reentrant lock)。这种锁使用方便,但有很多限制。java.util.concurrent.locks包支持更复杂的锁定习惯用法。我们不会详细研究这个包,而是将重点放在它最基本的接口Lock上。
Lock 对象的工作方式与同步代码使用的隐式锁非常相似。与隐式锁一样,一次只能有一个线程拥有一个Lock对象。Lock对象还通过其关联的Condition对象支持wait/notify机制。
Lock对象相对于隐式锁的最大优点是它们能够退出获取锁的尝试。如果锁不能立即可用或在超时到期之前(如果指定),tryLock方法将退出。如果另一个线程在获取锁之前发送中断,lockInterruptibly 方法会退出。
让我们使用Lock对象来解决我们在Liveness中看到的死锁问题。阿尔方斯和加斯顿训练自己注意到朋友要鞠躬。我们通过要求Friend对象在继续执行bow之前必须为两个参与者获取锁来对这种改进进行建模。下面是改进模型Safelock的源代码。为了证明这个习语的多功能性,我们假设阿尔方斯和加斯顿对他们新发现的安全鞠躬能力如此着迷,以至于他们无法停止彼此鞠躬:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;
public class Safelock {
static class Friend {
private final String name;
private final Lock lock = new ReentrantLock();
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public boolean impendingBow(Friend bower) {
Boolean myLock = false;
Boolean yourLock = false;
try {
myLock = lock.tryLock();
yourLock = bower.lock.tryLock();
} finally {
if (! (myLock && yourLock)) {
if (myLock) {
lock.unlock();
}
if (yourLock) {
bower.lock.unlock();
}
}
}
return myLock && yourLock;
}
public void bow(Friend bower) {
if (impendingBow(bower)) {
try {
System.out.format("%s: %s has"
+ " bowed to me!%n",
this.name, bower.getName());
bower.bowBack(this);
} finally {
lock.unlock();
bower.lock.unlock();
}
} else {
System.out.format("%s: %s started"
+ " to bow to me, but saw that"
+ " I was already bowing to"
+ " him.%n",
this.name, bower.getName());
}
}
public void bowBack(Friend bower) {
System.out.format("%s: %s has" +
" bowed back to me!%n",
this.name, bower.getName());
}
}
static class BowLoop implements Runnable {
private Friend bower;
private Friend bowee;
public BowLoop(Friend bower, Friend bowee) {
this.bower = bower;
this.bowee = bowee;
}
public void run() {
Random random = new Random();
for (;;) {
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {}
bowee.bow(bower);
}
}
}
public static void main(String[] args) {
final Friend alphonse =
new Friend("Alphonse");
final Friend gaston =
new Friend("Gaston");
new Thread(new BowLoop(alphonse, gaston)).start();
new Thread(new BowLoop(gaston, alphonse)).start();
}
}
// 如果锁可用,获取锁并立即返回值true。如果锁不可用,则此方法
// 将立即返回值false。
// 这个方法的典型用法参见文档
boolean tryLock();
在前面的所有示例中,新线程(由其Runnable对象定义)正在执行的任务与线程本身(由Thread对象定义)之间存在密切联系。这对于小型应用程序很有效,但是在大型应用程序中,将线程管理和创建与应用程序的其余部分分开是有意义的。封装这些函数的对象称为执行器(executors)。下面的小节详细描述了执行器。
java.util.concurrent包定义了三个执行器接口:
Executor: 支持启动新任务的简单接口。ExecutorService:Executor的子接口,它添加了有助于管理生命周期的特性,包括单个任务和Executor本身。ScheduledExecutorService:ExecutorService的子接口,支持未来和/或定期执行任务。Executor接口提供了一个方法execute,该方法被设计为通用线程创建习惯用法的临时替代品。如果r是一个Runnable 对象,而e是一个的Executor对象,你可以用e.execute(r);替换(new Thread(r)).start();
但是,execute的定义不太具体。低阶用法创建一个新线程并立即启动它。根据Executor实现的不同,execute可能会做同样的事情,但更有可能使用现有的工作线程来运行r,或者将r放在队列中以等待工作线程可用。(我们将在线程池一节中描述工作线程。)
java.util.concurrent中的执行器实现被设计为充分利用更高级的ExecutorService和ScheduledExecutorService接口,尽管它们也使用基本的Executor 接口。
ExecutorService 接口用一个类似但更通用的submit 方法补充了 execute 方法。与execute一样,submit 也接受 Runnable对象,但也接受Callable对象,它允许任务返回一个值。submit方法返回一个Future对象,该对象用于检索Callable返回值并管理Callable和Runnable任务的状态。
ExecutorService还提供了提交大型Callable对象集合的方法。最后,ExecutorService提供了许多方法来管理执行器的关闭。为了支持立即关机,任务应该正确地处理中断。
ScheduledExecutorService接口用schedule来补充其父ExecutorService的方法,schedule在指定的延迟后执行Runnable或Callable任务。此外,接口还定义了scheduleAtFixedRate和scheduleWithFixedDelay,它们以定义的间隔重复执行指定的任务。
java.util.concurrent中的大多数执行器实现都使用线程池(thread pools),线程池由工作线程(worker threads)组成。这种类型的线程独立于它执行的Runnable和Callable任务存在,并且通常用于执行多个任务。
使用工作线程可以最大限度地减少线程创建带来的开销。线程对象使用大量内存,在大规模应用程序中,分配和回收许多线程对象会产生大量内存管理开销。
一种常见的线程池类型是固定线程池( fixed thread pool)。这种类型的池总是有指定数量的线程在运行;如果线程在仍在使用时以某种方式终止,则会自动用新线程替换它。任务通过内部队列提交到池中,每当活动任务多于线程时,该队列就会保存额外的任务。
固定线程池的一个重要优点是,使用它的应用程序可以很好地降级(degrade gracefully)。要理解这一点,请考虑一个web服务器应用程序,其中每个HTTP请求都由一个单独的线程处理。如果应用程序只是为每个新的HTTP请求创建一个新线程,并且系统接收到的请求多于它可以立即处理的请求,那么当所有这些线程的开销超过系统的容量时,应用程序将突然停止响应所有请求。由于限制了可以创建的线程数量,应用程序将不能在HTTP请求到达时以最快的速度为它们提供服务,但它将在系统能够承受的速度内为它们提供服务。
创建一个使用固定线程池的执行器的简单方法是调用java.util.concurrent.Executors中的newFixedThreadPool工厂方法。这个类还提供了以下工厂方法:
ScheduledExecutorService版本。如果上述工厂方法提供的执行器都不能满足您的需求,那么构造java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的实例将为您提供额外的选项。
// 返回 新创建的线程池
public static ExecutorService newFixedThreadPool(int nThreads)
fork/join框架是ExecutorService接口的实现,它可以帮助您利用多个处理器。它是为那些可以递归分解成小块的工作而设计的。目标是使用所有可用的处理能力来增强应用程序的性能。
与任何ExecutorService实现一样,fork/join框架将任务分配给线程池中的工作线程。fork/join框架是不同的,因为它使用了任务窃取(work-stealing)算法。无事可做的工作线程可以从仍然忙碌的其他线程窃取任务。
fork/join框架的中心是ForkJoinPool类,它是AbstractExecutorService类的扩展。ForkJoinPool实现了核心的任务窃取算法,可以执行ForkJoinTask进程。
使用fork/join框架的第一步是编写执行部分工作的代码。你的代码应该类似于下面的伪代码:
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
将此代码包装在ForkJoinTask子类中,通常使用其更专门的类型之一,即RecursiveTask(可以返回结果)或RecursiveAction。
在ForkJoinTask子类准备好之后,创建表示所有要完成的工作的对象,并将其传递给ForkJoinPool实例的invoke()方法。
为了帮助您理解fork/join框架是如何工作的,请考虑以下示例。假设您想要模糊图像。原始source 图像由整数数组表示,其中每个整数包含单个像素的颜色值。模糊的destination 图像也由与源图像大小相同的整数数组表示。
执行模糊是通过一次一个像素地处理源(source)数组来完成的。每个像素与其周围的像素平均(红、绿、蓝组件平均),并将结果放在目标数组中。由于图像是一个大数组,这个过程可能会花费很长时间。通过使用fork/join框架实现算法,您可以利用多处理器系统上的并发处理。这是一个可能的实现:
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
// Processing window size; should be odd.
private int mBlurWidth = 15;
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length - 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
}
// Reassemble destination pixel.
int dpixel = (0xff000000 ) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
}
...
现在实现抽象的compute()方法,该方法可以直接执行模糊,也可以将其分成两个较小的任务。一个简单的数组长度阈值有助于确定是执行工作还是分割工作。
protected static int sThreshold = 100000;
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}
如果前面的方法是在RecursiveAction类的子类中,那么设置任务在ForkJoinPool中运行是很简单的,包括以下步骤:
// source image pixels are in src
// destination image pixels are in dst
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
ForkJoinPool。ForkJoinPool pool = new ForkJoinPool();
pool.invoke(fb);
完整的源代码,包括一些创建目标映像文件的额外代码,请参阅ForkBlur示例。
除了使用fork/join框架为在多处理器系统上并发执行的任务实现自定义算法(如上一节中的ForkBlur.java示例)之外,Java SE中还有一些通常有用的特性已经使用fork/join框架实现了。Java SE 8中引入了一个这样的实现,由java.util.Arrays类用于其parallelSort()方法。这些方法类似于sort(),但通过fork/join框架利用并发性。在多处理器系统上运行时,大型数组的并行排序要比顺序排序快。但是,这些方法如何准确地利用fork/join框架超出了Java教程的范围。有关这些信息,请参阅Java API文档。
fork/join框架的另一个实现由java.util.streams包中的方法使用,该包是计划在Java SE 8发布的Project Lambda的一部分。有关更多信息,请参阅Lambda表达式部分。
java.util.concurrent包包含了许多对Java集合框架的补充。它们最容易通过提供的集合接口进行分类:
ConcurrentMap的标准通用实现是ConcurrentHashMap,它是HashMap的并发模拟。ConcurrentMap的子接口,支持近似匹配。ConcurrentNavigableMap的标准通用实现是ConcurrentSkipListMap,它是TreeMap的并发模拟。所有这些集合都通过在将对象添加到集合的操作与访问或删除该对象的后续操作之间定义happens-before关系来帮助避免内存一致性错误。
java.util.concurrent.atomic 包 定义了支持对单个变量进行原子操作的类。所有类都有get和set方法,就像对volatile变量进行读写一样。也就是说,set与同一变量上的任何后续get之间存在happens-before关系。原子compareAndSet 方法也具有这些内存一致性特性,就像应用于整数原子变量的简单原子算术方法一样。
为了了解如何使用这个包,让我们回到最初用于演示线程干扰的Counter类:
class Counter {
private int c = 0;
public void increment() {
c++;
}
public void decrement() {
c--;
}
public int value() {
return c;
}
}
使Counter免受线程干扰的一种方法是使其方法同步,如SynchronizedCounter:
class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c--;
}
public synchronized int value() {
return c;
}
}
对于这个简单的类,同步是一个可接受的解决方案。但是对于更复杂的类,我们可能希望避免不必要的同步对活动性的影响。将int字段替换为AtomicInteger允许我们在不诉诸同步的情况下防止线程干扰,如AtomicCounter:
import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() {
c.incrementAndGet();
}
public void decrement() {
c.decrementAndGet();
}
public int value() {
return c.get();
}
}
// 获取当前值。
public final int get()
// 设置为给定的值。
public final void set(int newValue)
// 原子地设置为给定值并返回旧值。
public final int getAndSet(int newValue)
// 如果当前值==期望值,则自动将值设置为给定的更新值。
// 如果成功,则为 true;False表示实际值不等于期望值。
public final boolean compareAndSet(int expect, int update)
// 将当前值自动加1,返回更新后的值
public final int incrementAndGet()
// 将当前值自动减1,返回:更新后的值
public final int decrementAndGet()
// 自动将给定值添加到当前值,返回:更新后的值
public final int addAndGet(int delta)
// 将当前值自动加1,返回前值
public final int getAndIncrement()
// 将当前值自动减1。返回前值
public final int getAndDecrement()
// 自动将给定值添加到当前值, 返回前值
public final int getAndAdd(int delta)
在JDK 7中,java.util.concurrent包含了一个方便的类ThreadLocalRandom,用于期望使用来自多个线程或ForkJoinTasks的随机数的应用程序。
对于并发访问,使用ThreadLocalRandom而不是Math.random()可以减少争用,最终获得更好的性能。
您所需要做的就是调用ThreadLocalRandom.current(),然后调用它的一个方法来检索一个随机数。这里有一个例子:
int r = ThreadLocalRandom.current() .nextInt(4, 77);
// 返回一个介于指定的起点(origin, 包括)和指定的边界(bound, 不包括)之间的
// 伪随机int值。
public int nextInt(int origin, int bound)
// 返回介于0(包含)和指定的边界(不包含)之间的伪随机int值。
public int nextInt(int bound)
// 返回一个伪随机int值。
public int nextInt()
Concurrent Programming in Java: Design Principles and Pattern (2nd Edition)),作者:Doug Lea。这是一位顶级专家的全面工作,他也是Java平台并发性框架的架构师。Java Concurrency in Practice )》。为新手设计的实用指南。Java Concurrent Animated):显示并发特性使用情况的动画。