在介绍线程 Thread 之前,我们必须先搞清楚程序 Program 和进程 Process 这两个概念。
main)开始,到目前为止所有函数的调用路径,以及这些调用路径上所使用的局部变量。当然,除了在主存储器中记录程序的执行状态之外,CPU 内部的寄存器(如程序计数器、堆栈指针、程序状态字等)也需要一起记录。因此线程又由以下两项组成:
根据上述描述,我们总结出线程的重点如下:
Java 使用java.lang.Thread类来表示线程。Thread 类有两个构造函数:
第一个构造函数没有参数,第二个构造函数需要一个Runnable对象作为参数。Runnable是一个接口,在java.lang中定义,其声明为:
public interface Runnable {
public void run();
}
使用Thread()生成的线程,其入口是Thread类中的run()方法;使用Thread(Runnable)生成的线程,其入口是Runnable对象中的run()方法。当run()方法执行完毕时,该线程也就结束了,与main()方法结束具有相同的效果。使用示例如下:
public class ThreadExample1 extends Thread {
@Override
public void run() { // 重写 Thread 的 run()
System.out.println("这是线程的起点。");
for (; ; ) { // 无限循环打印消息
System.out.println("用户创建的线程");
}
}
public static void main(String[] argv) {
Thread t = new ThreadExample1(); // 生成线程对象
t.start(); // 开始执行 t.run()
for (; ; ) {
System.out.println("主线程");
}
}
}
运行上述程序后,屏幕上会不断打印出"用户创建的线程"或"主线程"。使用Runnable的写法如下:
public class ThreadExample2 implements Runnable {
@Override
public void run() { // 实现 Runnable 的 run()
System.out.println("这是线程的起点。");
for (; ; ) { // 无限循环打印消息
System.out.println("用户创建的线程");
}
}
public static void main(String[] argv) {
Thread t = new Thread(new ThreadExample2()); // 生成线程对象
t.start(); // 开始执行 Runnable.run();
for (; ; ) {
System.out.println("主线程");
}
}
}
Thread.setPriority(int)可以设置Thread的优先级,数字越大优先级越高。Thread定义了三个相关的常量:
public static final int MAX_PRIORITY 10
public static final int MIN_PRIORITY 1
public static final int NORM_PRIORITY 5
要提醒的是,优先级高的 Thread 其占有 CPU 的机会比较高,但优先级低的也都会有机会被执行到。其他关于 Thread 执行的方法有:
yield():先让给别的 Thread 执行sleep(int time):休息 time 毫秒(1/1000 秒)join():调用ThreadA.join()的线程会等到 ThreadA 结束后,才能继续执行你可以执行下面的程序,看看yield()的效果。
public class ThreadExample1 extends Thread {
@Override
public void run() { // overwrite Thread's run()
System.out.println("Here is the starting point of Thread.");
for (; ; ) { // infinite loop to print message
System.out.println("用户创建的线程");
yield();
}
}
public static void main(String[] argv) {
Thread t = new ThreadExample1(); // 生成 Thread 对象
t.start(); // 开始执行 t.run()
for (; ; ) {
System.out.println("主线程");
yield();
}
}
}
观看join()的效果
public class JoinExample extends Thread {
String myId;
public JoinExample(String id) {
myId = id;
}
@Override
public void run() { // overwrite Thread's run()
for (int i = 0; i < 500; i++)
System.out.println(myId + " Thread");
}
public static void main(String[] argv) {
Thread t1 = new JoinExample("T1"); // 生成 Thread 对象
Thread t2 = new JoinExample("T2"); // 生成 Thread 对象
t1.start(); // 开始执行t 1.run()
t2.start();
try {
t1.join(); // 等待t1结束
t2.join(); // 等待t2结束
} catch (InterruptedException e) {
}
for (int i = 0; i < 5; i++)
System.out.println("主线程");
}
}
观看sleep()的效果。
public class SleepExample extends Thread {
String myId;
public SleepExample(String id) {
myId = id;
}
@Override
public void run() { // overwrite Thread's run()
for (int i = 0; i < 500; i++) {
System.out.println(myId + " Thread");
try {
sleep(100);
} catch (InterruptedException e) {
}
}
}
public static void main(String[] argv) {
Thread t1 = new SleepExample("T1"); // 生成 Thread 对象
Thread t2 = new SleepExample("T2"); // 生成 Thread 对象
t1.start(); // 开始执行 t1.run()
t2.start();
}
}
如果设计者没有提供相应的保护机制的话,那么将会由操作系统来决定 Thread 对 CPU 的控制权,也就是说 Thread 可能在其执行任何一条机器指令的时候,被操作系统取走 CPU 的控制权,并交给另一个 Thread。然而,真实世界中的某些动作是不可分割的,例如银行转帐 X 元由 A 帐户到 B 帐户,转帐前后这两个帐户的总金额必须相同,但以程序来实作时却无法用一条指令就完成,如转帐可能要写成下面的这一段程序代码
if (A >= X) {
A = A - X; // 翻译成3个机器指令 LOAD A, SUB X, STORE A
B = B + X;
}
如果两个线程同时要存取 A、B 两账户进行转账,假设当 Thread 1 执行到SUBX后被中断,Thread 2 接手执行完成另一个转账要求,然后 Thread 1 继续执行未完成的动作,请问这两个转账动作正确吗?我们以A=1000, B=0分别转账 100、200圆来说明此结果。
LOAD A // Thread 1,现在 A 还是 1000
SUB 100 // Thread 1
LOAD A // 假设此时 Thread 1 被中断,Thread 2 接管,因为 Thread 1 还没有执行 STORE A,所以变量 A 还是 1000
SUB 200 // Thread 2
STORE A // Thread 2,A=800
LOAD B // Thread 2,B 现在是 0
ADD 200 // Thread 2
STORE B // B=200
STORE A // Thread 1 拿回控制权,A=900
LOAD B // Thread 1,B=200
ADD 100 // Thread 1
STORE B // B=300
你会发现执行完成后A=900,B=300,也就是说银行平白损失了 200 圆。当然另外的执行顺序可能造成其他不正确的结果。我们把这问题再整理一下:
因此在编写多线程的程序时候必须考虑这种特别的情况(又称为 Race Condition)。Java 解决的办法是,JVM 会在每个对象上放一把锁 Lock,然后程序设计者可以声明执行某一段程序(通常是用来访问共享数据结构的代码,又称为 Critical Section)时,必须拿到某对象的锁才行,这把锁同时最多只有一个线程可以拥有它。
其实觉得"锁",获取"锁"的说法似乎不太符合中国人的思维,这个"锁"其实就是一个"令牌",有令牌就能进入,无令牌就不能进入。这么说来这个"锁"其实应该叫"钥匙"。当然这里应该还有一个概念,叫房间,这个钥匙是对应什么房间的(锁住什么东西,即锁住什么范围)。所以光叫"钥匙",似乎也不太准确。所以西方人的这个"锁",应该有两个概念: “锁对象”,就是我们说的"钥匙";“锁了什么范围”,就是这个锁锁了什么房间。
public class Transfer extends Thread {
public static Object lock = new Object();
public static int A = 1000;
public static int B = 0;
private final int amount;
public Transfer(int x) {
amount = x;
}
@Override
public void run() {
synchronized (lock) { // 获取 lock,如果别的 thread A已获取,则当前这个 thread 会等到 thread A 释放该 lock
if (A >= amount) {
A = A - amount;
B = B + amount;
}
} // 离开 synchronized 区块后,此 thread会 自动释放 lock
}
public static void main(String[] argv) {
Thread t1 = new Transfer(100);
Thread t2 = new Transfer(200);
t1.start();
t2.start();
}
}
除了synchronized(ref)的语法可以锁定ref指向的对象外,synchronized也可以用在方法前面,表示要锁定this对象才能执行该方法。以下是Queue结构的范例。
public class Queue {
private final Object[] data;
private int size;
private int head;
private int tail;
public Queue(int maxLen) {
data = new Object[maxLen];
}
public synchronized Object deQueue() {
Object tmp = data[head];
data[head] = null;
head = (head + 1) % data.length;
size--;
return tmp;
}
public synchronized void enQueue(Object c) {
data[tail++] = c;
tail %= data.length;
size++;
}
}
虽然上面的程序正确无误,但并未考虑资源不足时该如何处理。例如Queue已经没有数据了,却还想拿出来;或是Queue里已经塞满了数据,使用者却还要放进去?我们当然可以使用Exception Handling的机制:
public class Queue {
private final Object[] data;
private int size;
private int head;
private int tail;
public Queue(int maxLen) {
data = new Object[maxLen];
}
public synchronized Object deQueue() throws Exception {
if (size == 0)
throw new Exception();
Object tmp = data[head];
data[head] = null;
head = (head + 1) % data.length;
size--;
return tmp;
}
public synchronized void enQueue(Object c) throws Exception {
if (size >= maxLen)
throw new Exception();
data[tail++] = c;
tail %= data.length;
size++;
}
}
但假设我们的执行环境是,某些 Thread 专门负责读取用户的需求,并把工作放到 Queue 里面,某些 Thread 则专门由 Queue 里抓取工作需求做进一步处理。这种架构的好处是,可以把慢速或不定速的输入(如通过网络读数据,连接速度可能差很多),和快速的处理分开,可使系统的反应速度更快,更节省资源。那么以 Exceptoin 来处理 Queue 空掉或爆掉的情况并不合适,因为使用 Queue 的人必须处理异常状况,并不断的消耗 CPU 资源:
public class Getter extends Thread {
Queue q;
public Getter(Queue q) {
this.q = q;
}
public void run() {
for (; ; ) {
try {
Object data = q.deQueue();
// processing
} catch (Exception e) {
// 如果在这里进行 sleep,就会卡住了;但如果不 sleep,却会浪费 CPU 资源
}
}
}
}
public class Putter extends Thread {
Queue q;
public Putter(Queue q) {
this.q = q;
}
public void run() {
for (; ; ) {
try {
Object data = null;// get user request
q.enQueue(data);
} catch (Exception e) {
// 如果在这里进行 sleep,就会卡住了;但如果不 sleep,却会浪费 CPU 资源
}
}
}
}
public class Main {
public static void main(String[] argv) {
Queue q = new Queue(10);
Getter r1 = new Getter(q);
Getter r2 = new Getter(q);
Putter w1 = new Putter(q);
Putter w2 = new Putter(q);
r1.start();
r2.start();
w1.start();
w2.start();
}
}
为了解决这类资源分配的问题,Java 对象提供了以下三个方法:
wait():使调用此方法的 Thread 进入 Blocking Mode,并设为等待该对象,调用wait()时,该 Thread 必须拥有该对象的 lock。Blocking Mode 下的 Thread 必须释放所有手中的 lock,并且无法使用 CPU。notifyAll():让等待该对象的所有 Thread 进入Runnable Mode。notify():让等待该对象的某一个 Thread 进入Runnable Mode。所谓 Runnable Mode 是指该 Thread 随时可由操作系统分配 CPU 资源。Blocking Mode 表示该 Thread 正在等待某个事件发生,操作系统不会让这种 Thread 取得 CPU 资源。前一个 Queue 的范例就可以写成:
public class Queue {
private final Object[] data;
private int size;
private int head;
private int tail;
public Queue(int maxLen) {
data = new Object[maxLen];
}
public synchronized Object deQueue() {
while (size == 0) { // 当执行到这里时,线程必须已经获取到锁并处于运行状态
// 让当前线程等待该对象(进入睡眠模式)
try {
wait(); // 进入睡眠模式,并释放所有锁
} catch (Exception ex) {
}
}
Object tmp = data[head];
data[head] = null;
head = (head + 1) % data.length;
if (size == data.length) {
// 唤醒所有等待该对象的线程
notifyAll();
}
size--;
return tmp;
} // 释放锁
public synchronized void enQueue(Object c) {
while (size == data.length) { // 当执行到这里时,线程必须已经获取到锁并处于运行状态
// 让当前线程等待该对象(进入睡眠模式)
try {
wait(); // 进入睡眠模式,并释放所有锁
} catch (Exception ex) {
}
}
data[tail++] = c;
tail %= data.length;
size++;
if (size == 1)
// 唤醒所有等待该对象的线程
notifyAll();
}
}
public class ReaderWriter extends Thread {
public static final int READER = 1;
public static final int WRITER = 2;
private Queue q;
private int mode;
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
if (mode == READER) q.deQueue();
else if (mode == WRITER) q.enQueue(i);
}
}
public ReaderWriter(Queue q, int mode) {
this.q = q;
this.mode = mode;
}
public static void main(String[] args) {
Queue q = new Queue(5);
ReaderWriter r1, r2, w1, w2;
(w1 = new ReaderWriter(q, WRITER)).start();
(w2 = new ReaderWriter(q, WRITER)).start();
(r1 = new ReaderWriter(q, READER)).start();
(r2 = new ReaderWriter(q, READER)).start();
try {
w1.join(); // 等待w1线程完成
w2.join(); // 等待w2线程完成
r1.join(); // 等待r1线程完成
r2.join(); // 等待r2线程完成
} catch (InterruptedException epp) {
}
}
}
上一节的队列数据结构,无论是enQueue()还是deQueue()都会修改队列的内容。而在许多应用里,数据结构可以允许多个读取者和一个写入者同时操作。本节举出几个不同的例子,说明多个读者-写者(Reader-Writer)时的可能调度方法。
单个读者-写者(Single Reader-Writer,)只允许一个线程同时访问。
public class SingleReaderWriter {
int n; // number of reader and write, 0 or 1
public synchronized void startReading() throws InterruptedException {
while (n != 0)
wait();
n = 1;
}
public synchronized void stopReading() {
n = 0;
notify();
}
public synchronized void startWriting() throws InterruptedException {
while (n != 0)
wait();
n = 1;
}
public synchronized void stopWriting() {
n = 0;
notify();
}
}
这是一个使用示例,程序能否正确执行取决于调用正确的start和stop。
public class WriterThread extends Thread {
SingleReaderWriter srw;
public WriterThread(SingleReaderWriter srw) {
this.srw = srw;
}
public void run() {
startWring();
// 实际的逻辑……
stopWriting();
}
}
public class ReaderThread extends Thread {
SingleReaderWriter srw;
public ReaderThread(SingleReaderWriter srw) {
this.srw = srw;
}
public void run() {
startReading();
// 实际的逻辑……
stopReading();
}
}
public class Test {
public static void main(String[] argv) {
SingleReaderWriter srw = new SingleReaderWriter;
// 创建四个线程
(new WriterThread(srw)).start();
(new WriterThread(srw)).start();
(new ReaderThread(srw)).start();
(new ReaderThread(srw)).start();
}
}
其他可能的策略实现如下。
Reader优先:
public class ReadersPreferredMonitor {
int nr; // 当前正在读取的线程数量, nr >= 0
int nw; // 当前正在写入的线程数量, 只能为0或1
int nrtotal; // 正在读取或等待读取的线程数量, nrtotal >= nr
int nwtotal; // 正在写入或等待写入的线程数量
public synchronized void startReading() throws InterruptedException {
nrtotal++; // 有一个想要读取的线程加入
while (nw != 0) // 有线程正在写入
wait();
nr++; // 正在读取的线程数量加一
}
public synchronized void startWriting() throws InterruptedException {
nwtotal++; // 有一个想要写入的线程加入
while (nrtotal+nw != 0) // 只要有线程想要读取或有线程正在写入,就等待
wait();
nw = 1;
}
public synchronized void stopReading() {
nr--; // 正在读取的线程数量减一
nrtotal--; // 有想要读取的线程数量减一
if (nrtotal == 0) // 如果没有线程需要读取,则唤醒想要写入的线程
notify();
}
public synchronized void stopWriting() {
nw = 0; // 没有线程正在写入
nwtotal--; // 有想要写入的线程数量减一
notifyAll(); // 唤醒所有想要读取或写入的线程
}
}
Writer优先:
public class WritersPreferredMonitor {
int nr; // 当前正在读取的线程数量, nr >= 0
int nw; // 当前正在写入的线程数量, 只能为0或1
int nrtotal; // 正在读取或等待读取的线程数量, nrtotal >= nr
int nwtotal; // 正在写入或等待写入的线程数量
public synchronized void startReading() throws InterruptedException {
nrtotal++; // 有一个想要读取的线程加入
while (nwtotal != 0) // 还有线程想要写入
wait();
nr++; // 正在读取的线程数量加一
}
public synchronized void startWriting() throws InterruptedException {
nwtotal++; // 有一个想要写入的线程加入
while (nr+nw != 0) // 有线程正在读取或有线程正在写入
wait();
nw = 1;
}
public synchronized void stopReading() {
nr--; // 正在读取的线程数量减一
nrtotal--; // 有想要读取的线程数量减一
if (nr == 0) // 如果没有正在读取的线程,则唤醒所有线程(包括想要写入的)
notifyAll();
}
public synchronized void stopWriting() {
nw = 0; // 没有线程正在写入
nwtotal--; // 有想要写入的线程数量减一
notifyAll(); // 唤醒所有想要读取或写入的线程
}
}
Reader和Writer交互执行:
public class AlternatingReadersWritersMonitor {
int[] nr = new int[2]; // 当前正在读取的线程数量
int thisBatch; // 当前正在读取的批次号(0或1)
int nextBatch = 1; // 等待读取的批次号(始终为1-thisBatch)
int nw; // 当前正在写入的线程数量(0或1)
int nwtotal; // 正在写入或等待写入的线程数量
public synchronized void startReading() throws InterruptedException {
if (nwtotal == 0) // 没有线程要写入,将所有的 reader 放到当前处理的批次
nr[thisBatch]++;
else {
nr[nextBatch]++;
int myBatch = nextBatch;
while (thisBatch != myBatch)
wait();
}
}
public synchronized void stopReading() {
nr[thisBatch]--;
if (nr[thisBatch] == 0) // 当前批次的 reader 都读完了,找下一个 writer
notifyAll();
}
public synchronized void startWriting() throws InterruptedException {
nwtotal++;
while (nr[thisBatch]+nw != 0) // 当前批次还没读完,或有线程正在写入
wait();
nw = 1;
}
public synchronized void stopWriting() {
nw = 0;
nwtotal--;
int tmp = thisBatch; // 交换下一个要读取的批次
thisBatch = nextBatch;
nextBatch = tmp;
notifyAll();
}
}