在并发编程中,有两大核心问题,一是互斥(即同一时刻只允许一个线程访问共享资源);另一个是同步(即线程之间如何通信协作)。而这两大问题,可以通过管程来进行解决。
Monitor,也称为监视器) :是一种程序结构,结构内的多个子程序(对象或模块)形成的多个工作线程互斥访问共享资源。这些共享资源一般是硬件设备或一群变量。Java 中 synchronized 关键字及 wait() 、notify() 、notifyAll() 这三个方法都是管程的组成部分。管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。@Slf4j
public class Sample {
private static int num = 0;
public static void main(String[] args) throws InterruptedException {
// t1线程完成对静态成员变量自增操作。
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
num++;
}
}, "t1");
// t2线程完成对静态成员变量自减操作。
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
num--;
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("num={}", num);
// 执行第一次:num=-6821
// 执行第二次:num=1953
}
}



一个程序运行多个线程本身是没有问题的,问题出在多个线程对共享资源读写操作时发生指令交错,就会出现问题。
synchronized,Lock。synchronized 关键字:即俗称的对象锁,它采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程再想获取这个对象锁时就会阻塞住。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。
虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:
语法示例:
synchronized(Object){ // 线程1进入执行,线程2阻塞等待。
// 临界区
}
@Slf4j
public class Sample {
private static int num = 0;
/**
* 两个线程对同一对象加锁。
*/
private static final Object obj = new Object();
public static void main(String[] args) throws InterruptedException {
// t1线程完成对成员变量自增操作。
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
synchronized (obj) {
num++;
}
}
}, "t1");
// t2线程完成对成员变量自减操作。
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
synchronized (obj) {
num--;
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("num={}", num);
// 执行第一次:num=0
// 执行第二次:num=0
}
}

synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。synchronized 放在循环体外? — 只是锁粒度不同,依然能锁住对象,num=0。 // t1线程完成对成员变量自增操作。
Thread t1 = new Thread(() -> {
// 将 synchronized 放在循环体外,原子性。
synchronized (obj) {
for (int i = 0; i < 10000; i++) {
num++;
}
}
}, "t1");
private static final Object obj1 = new Object();
private static final Object obj2 = new Object();
// t1线程完成对成员变量自增操作。
Thread t1 = new Thread(() -> {
// 锁 obj1
synchronized (obj1) {
for (int i = 0; i < 10000; i++) {
num++;
}
}
}, "t1");
// t2线程完成对成员变量自减操作。
Thread t2 = new Thread(() -> {
// 锁 obj2
synchronized (obj2) {
for (int i = 0; i < 10000; i++) {
num--;
}
}
}, "t2");
此处通过 8 种情况的案例,明白锁住的对象究竟是谁。
@Slf4j
class Sample1 {
/**
* 普通同步方法 a。
*/
public synchronized void a() {
log.debug("a");
}
/**
* 普通同步方法 b。
*/
public synchronized void b() {
log.debug("b");
}
public static void main(String[] args) {
// 通过一个对象去调用普通同步方法。
Sample1 sample1 = new Sample1();
new Thread(sample1::a).start();
new Thread(sample1::b).start();
// a b
}
}
a() 中加入睡眠。 — 因为都是锁同一个对象,即便加了睡眠也是同步执行。@Slf4j
class Sample2 {
/**
* 普通同步方法 a。
*/
public synchronized void a() {
try {
// 增加1s睡眠。
TimeUnit.SECONDS.sleep(1);
log.debug("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 普通同步方法 b。
*/
public synchronized void b() {
log.debug("b");
}
public static void main(String[] args) {
Sample2 sample2 = new Sample2();
new Thread(sample2::a).start();
new Thread(sample2::b).start();
// a b
}
}
c() — 方法 a() 、b() 共享一把对象锁仍是同步执行,非同步方法 c() 无锁则并行执行。@Slf4j
class Sample3 {
/**
* 普通同步方法 a。
*/
public synchronized void a() {
try {
TimeUnit.SECONDS.sleep(1);
log.debug("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 普通同步方法 b。
*/
public synchronized void b() {
log.debug("b");
}
/**
* 新增:普通非同步方法 c。
*/
public void c() {
log.debug("c");
}
public static void main(String[] args) {
Sample3 sample3 = new Sample3();
new Thread(sample3::a).start();
new Thread(sample3::b).start();
new Thread(sample3::c).start();
// c a b
}
}
@Slf4j
class Sample4 {
/**
* 普通同步方法 a。
*/
public synchronized void a() {
log.debug("a");
}
/**
* 普通同步方法 b。
*/
public synchronized void b() {
log.debug("b");
}
public static void main(String[] args) {
Sample4 sample = new Sample4();
Sample4 sample2 = new Sample4();
// 创建不同对象调用不同的同步方法。
new Thread(sample::a).start();
new Thread(sample2::b).start();
// 执行第一次:b a
// 执行第二次:a b
}
}
a() 修改为静态同步方法。 — 静态同步方法 a() 锁住的是 Sample5.class ,普通同步方法 b() 锁住的是实例对象。锁的对象不同,则并行执行。@Slf4j
class Sample5 {
/**
* 静态同步方法 a。
*/
public static synchronized void a() {
try {
// 睡眠1s。
TimeUnit.SECONDS.sleep(1);
log.debug("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 普通同步方法 b。
*/
public synchronized void b() {
log.debug("b");
}
public static void main(String[] args) {
Sample5 sample5 = new Sample5();
new Thread(() -> {sample5.a();}).start();
new Thread(() -> {sample5.b();}).start();
// b a
}
}
a() 、 b() 均为静态同步方法。 — 共享一把 Sample6.class 对象锁,同步执行。@Slf4j
class Sample6 {
/**
* 静态同步方法 a。
*/
public static synchronized void a() {
try {
// 睡眠1s。
TimeUnit.SECONDS.sleep(1);
log.debug("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 静态同步方法 b。
*/
public static synchronized void b() {
log.debug("b");
}
public static void main(String[] args) {
new Thread(Sample6::a).start();
new Thread(Sample6::b).start();
// a b
}
}
a() 及普通同步方法 b() — 锁对象不同,并行执行。@Slf4j
class Sample7 {
/**
* 静态同步方法 a。
*/
public static synchronized void a() {
try {
// 睡眠1s。
TimeUnit.SECONDS.sleep(1);
log.debug("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 普通同步方法 b。
*/
public synchronized void b() {
log.debug("b");
}
public static void main(String[] args) {
Sample7 sample1 = new Sample7();
Sample7 sample2 = new Sample7();
new Thread(() -> {sample1.a();}).start();
new Thread(() -> {sample2.b();}).start();
// b a
}
}
a() 和 b() — 同时锁住 Sample8.class 对象,同步执行。@Slf4j
class Sample8 {
/**
* 静态同步方法 a。
*/
public static synchronized void a() {
try {
// 睡眠1s。
TimeUnit.SECONDS.sleep(1);
log.debug("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 静态同步方法 b。
*/
public static synchronized void b() {
log.debug("b");
}
public static void main(String[] args) {
Sample8 sample1 = new Sample8();
Sample8 sample2 = new Sample8();
new Thread(() -> {sample1.a();}).start();
new Thread(() -> {sample2.b();}).start();
// a b
}
}
如果它们没有共享,则线程安全。
如果它们被共享了,根据它们的状态是否能够改变,又分两种情况:
@Slf4j
class ThreadUnsafe {
/**
* 成员变量列表。
*/
private ArrayList<String> list = new ArrayList<>();
/**
* 添加元素方法。
*/
private void add() {
this.list.add("element");
}
/**
* 删除列表中第一个元素方法。
*/
private void removeFirst() {
this.list.remove(0);
}
/**
* 使添加与删除操作互相竞争的方法。
*/
public void contest() {
for (int i = 0; i < 3000; i++) {
// 临界区。
this.add();
this.removeFirst();
}
}
public static void main(String[] args) {
ThreadUnsafe unsafe = new ThreadUnsafe();
// 创建两个线程执行。
for (int i = 0; i < 2; i++) {
new Thread(() -> {
log.debug(Thread.currentThread().getName());
unsafe.contest();
// Exception in thread "t_0" java.lang.IndexOutOfBoundsException
}, "t_" + i).start();
}
}
}

removeFirst() 与 add() 引用的都是同一个 list 成员变量。add() 还没来得及添加上元素,removeFirst() 去通过索引移除元素就会抛 IndexOutOfBoundsException 异常。局部变量是线程安全的。
但局部变量引用的对象则未必:
将上述例子进行调整,去掉成员变量
list,并在contest()方法中创建成员变量,add()、removeFirst()方法接收传入的list进行操作。
@Slf4j
class ThreadSafe {
/**
* 添加元素方法。
*
* @param list 列表
*/
private void add(ArrayList<String> list) {
list.add("element");
}
/**
* 删除列表中第一个元素方法。
*
* @param list 列表
*/
private void removeFirst(ArrayList<String> list) {
list.remove(0);
}
/**
* 使添加与删除操作互相竞争的方法。
*/
public void contest() {
// 创建成员变量列表。
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
// 临界区。
add(list);
removeFirst(list);
}
}
public static void main(String[] args) {
ThreadSafe safe = new ThreadSafe();
// 创建两个线程执行。
for (int i = 0; i < 2; i++) {
new Thread(() -> {
log.debug(Thread.currentThread().getName());
safe.contest();
}, "t_" + i).start();
}
}
}

list 是局部变量,每个线程调用时会创建其不同实例,没有共享。add()、removeFirst() 方法接收的对象都是通过 contest() 方法传递过来的,也就是说操作的都是同一个对象。将上述示例中
ThreadSafe类中的removeFirst()方法修改为public修饰符。
@Slf4j
class SubClass extends ThreadSafe {
/**
* 【覆盖】删除列表第一个元素的方法。
*
* @param list 列表
*/
@Override
public void removeFirst(ArrayList<String> list) {
// 【创建了新的线程访问到了 list 对象,此时的 list 相当于成了共享资源。】
new Thread(() -> list.remove(0)).start();
}
public static void main(String[] args) {
// 创建子类实例。
SubClass sc = new SubClass();
// 创建两个线程执行。
for (int i = 0; i < 2; i++) {
new Thread(() -> {
log.debug(Thread.currentThread().getName());
// 调用父类方法。
sc.contest();
// java.lang.IndexOutOfBoundsException
}, "t_" + i).start();
}
}
private 修饰,将公共访问方法加上 final 修饰。): private void add(ArrayList<String> list) {
list.add("element");
}
private void removeFirst(ArrayList<String> list) {
list.remove(0);
}
public final void contest() {
// 创建成员变量列表。
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
// 临界区。
add(list);
removeFirst(list);
}
}
private 或 final 修饰符对多线程下的变量安全问题是有意义的。这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。即它们的每个方法都是原子的,但是需要注意的是它们多个方法组合不是原子的。
StringIntegerStringBufffferRandomVectorHashtablejava.util.concurrent 包下的类Monitor 被翻译为监视器或管程。Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针。由于
Java面向对象的思想,在JVM中需要大量存储对象,存储时为了实现一些额外的功能,需要在对象中添加一些标记字段用于增强对象功能,这些标记字段组成了对象头。
在 HotSpot 虚拟机中,对象在内存中存储的布局可以分为3块区域:对象头(Header),实例数据(Instance Data)和对齐填充(Padding)。即 JAVA 对象 = 对象头 + 实例数据 + 对象填充。
而对象头又由两部分组成:一部分用于存储自身的运行时数据,称之为 Mark Word,另外一部分是类型指针,及对象指向它的类元数据的指针。
64 位虚拟机 Mark Word:
|-----------------------------------------------------------------------------------------------------------------|
| Object Header(128bits) |
|-----------------------------------------------------------------------------------------------------------------|
| Mark Word(64bits) | Klass Word(64bits) | State |
|-----------------------------------------------------------------------------------------------------------------|
| unused:25|identity_hashcode:31|unused:1|age:4|biase_lock:0| 01 | OOP to metadata object | Nomal |
|-----------------------------------------------------------------------------------------------------------------|
| thread:54| epoch:2 |unused:1|age:4|biase_lock:1| 01 | OOP to metadata object | Biased |
|-----------------------------------------------------------------------------------------------------------------|
| ptr_to_lock_record:62 | 00 | OOP to metadata object | Lightweight Locked |
|-----------------------------------------------------------------------------------------------------------------|
| ptr_to_heavyweight_monitor:62 | 10 | OOP to metadata object | Heavyweight Locked |
|-----------------------------------------------------------------------------------------------------------------|
| | 11 | OOP to metadata object | Marked for GC |
|-----------------------------------------------------------------------------------------------------------------|
| 标记字 | 状态 |
|---|---|
| unused(25bit)| 对象的hashcode值(31bit)| unused(1bit)| 分代年龄(4bit) | 是否偏向锁(0)| 锁标志位 (01) | 无锁态 |
| 线程ID(54bit) | Epoch(2bit) | unused(1bit)| 分代年龄(4bit) | 是否偏向锁(1)| 锁标志位 (01) | 偏向锁 |
| 指向栈中锁记录的指针(ptr_to_lock_record 62bit) | 锁标志位 (00) | 轻量级锁 |
指向管程 Monitor 的指针(ptr_to_heavyweight_monitor 62bit) | 锁标志位 (10) | 重量级锁 |
| 空 | 锁标志位 (11) | GC标记 |
identity_hashcode):运行期间调用 System.identityHashCode() 来延迟计算,并把结果赋值到这里。当对象加锁后,计算的结果31位不够表示,在偏向锁,轻量锁,重量锁,hashcode 会被转移到 Monitor 中。age):表示对象被 GC 的次数,当该次数到达阈值的时候,对象就会转移到老年代。biased_lock):由于无锁和偏向锁的锁标识都是 01,没办法区分,这里引入一位的偏向锁标识位。lock):区分锁状态,11时表示对象待 GC 回收状态,只有最后2位锁标识(11)有效。thread):偏向模式的时候,当某个线程持有对象的时候,对象这里就会被置为该线程的 ID。 在后面的操作中,就无需再进行尝试获取锁的动作。epoch):偏向锁在 CAS 锁操作过程中,表示对象更偏向哪个锁。ptr_to_lock_record):当锁获取是无竞争的时,JVM 使用原子操作而不是 OS 互斥。这种技术称为轻量级锁定。在轻量级锁定的情况下,JVM 通过 CAS 操作在对象的标题字中设置指向锁记录的指针。ptr_to_heavyweight_monitor):如果两个不同的线程同时在同一个对象上竞争,则必须将轻量级锁定升级到 Monitor 以管理等待的线程。在重量级锁定的情况下,JVM 在对象的 ptr_to_heavyweight_monitor 设置指向 Monitor 的指针。
分析说明:
Monitor 中 Owner 为 null ;synchronized(obj) 就会将 Monitor 的所有者 Owner 置为线程-1,Monitor 中只能有一个 Owner;synchronized(obj) 则会进入 EntryList 此时为阻塞状态;EntryList 中等待的线程来竞争锁,竞争的时是非公平的。WaitSet 是用于存放已经获得了锁的线程(因为缺少某些外部条件,而无法继续进行下去)。注意:
synchronized 必须是进入同一个对象的 monitor 才有上述的效果。synchronized 的对象不会关联监视器,不遵从以上规则。注意:方法级别的
synchronized不会在字节码指令中有所体现。
public class SynchronizedSample {
private static final Object obj = new Object();
private static int counter = 0;
public static void main(String[] args) {
synchronized (obj) {
counter++;
}
}
}
0 getstatic #2 // <- lock引用 (synchronized开始)
3 dup
4 astore_1 // lock引用 -> slot 1
5 monitorenter // 将 lock对象 MarkWord 置为 Monitor 指针
6 getstatic #3 // <- i
9 iconst_1 // 准备常数 1
10 iadd // +1
11 putstatic #3 // -> i
14 aload_1 // <- lock引用
15 monitorexit // 将 lock对象 MarkWord 重置, 唤醒 EntryList
16 goto 24 (+8)
19 astore_2 // e -> slot 2
20 aload_1 // <- lock引用
21 monitorexit // 将 lock对象 MarkWord 重置, 唤醒 EntryList
22 aload_2 // <- slot 2 (e)
23 athrow // throw e
24 return
synchronized。public class SynchronizedSample {
private static final Object obj = new Object();
public static void method1() {
synchronized (obj) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized (obj) {
// 同步块 B
}
}
}
Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的 Mark Word;Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录;cas 替换成功,对象头中存储了锁记录地址和状态 00 ,表示由该线程给对象加锁。
cas 可能会替换失败,这时候又有两种情况:
Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程。synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数。
synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一。
synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象头。
如果在尝试加轻量级锁的过程中,cas 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。
代码示例:
public class SynchronizedSample {
private static final Object obj = new Object();
public static void method1() {
synchronized (obj) {
// 同步块
}
}
}

Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址。Monitor 的 EntryList BLOCKED。
cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程。cpu 时间,单核 cpu 自旋就是浪费,多核 cpu 自旋才能发挥优势。Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。Java 7 之后不能控制是否开启自旋功能。轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 cas 操作。
Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 cas 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 cas ,以后只要不发生竞争,这个对象就归该线程所有。
代码示例:
public class SynchronizedSample {
private static final Object obj = new Object();
public static void method1() {
synchronized (obj) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized (obj) {
// 同步块 B
method3();
}
}
public static void method3() {
synchronized (obj) {
// 同步块 C
}
}
}

markword 值为 0x05 即最后 3 位为 101,这时它的 thread 、epoch 、age 都为 0。VM 参数 -XX:BiasedLockingStartupDelay=0 来禁用延迟。markword 值为 0x01 即最后 3 位为 001,这时它的 hashcode、age 都为 0,第一次用到 hashcode 时才会赋值。| 方法 | 功能 |
|---|---|
wait() | 释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无时限等待,直到 notify 为止。 |
wait(long n) | 有时限的等待, 到 n 毫秒后结束等待,或是被 notify。 |
notify() | 在 object 上正在 waitSet 等待的线程中挑一个唤醒。 |
notifyAll() | 让 object 上正在 waitSet 等待的线程全部唤醒。 |
@Slf4j
public class WaitNotifySample {
private static final Object obj = new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
// 对象锁。
synchronized (obj) {
log.debug("t1 run...");
try {
// 线程等待。
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("t1 wake,do other things...");
}
}, "t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("t2 run...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("t2 wake,do other things...");
}
}, "t2").start();
// 主线程等待2s后执行。
TimeUnit.SECONDS.sleep(2);
log.debug("wake obj thread");
// 通过同一把对象锁去唤醒。
synchronized (obj) {
// 若此处调用 notifyAll() 则唤醒当前对象锁的所有线程。
obj.notify();
// 执行结果如下:
// t1 run...
// t2 run...
// wake obj thread
// t1 wake,do other things...
}
}
}

Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态。BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 cpu 时间片。BLOCKED 线程会在 Owner 线程释放锁时唤醒。WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入 EntryList 重新竞争。sleep() 是 Thread 方法,而 wait() 是 Object 的方法。sleep() 不需要强制和 synchronized 配合使用,但 wait() 需要和 synchronized 一起用。sleep() 在睡眠的同时,不会释放对象锁的,但 wait() 在等待的时候会释放对象锁。TIMED_WAITING。解决某些场景下,使用 sleep() 一直阻塞导致执行效率太低的场景,此时就可以采用 wait-notify 机制。
而 notify() 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,这种情况称之为“虚假唤醒”。故需要 notifyAll() 进行唤醒。
用 notifyAll() 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了,故需要 while 进行循环判断。
最终模板如下:
synchronized(lock){
while(条件不成立){
lock.wait();
}
// 干活
}
// 另一个线程
synchronized(lock){
lock.notifyAll();
}
Guarded Suspension,用在一个线程等待另一个线程的执行结果,它属于同步模式。GuardedObject。jdk 中,join 的实现、Future 的实现,采用的就是此模式。 +- - - - - - - - - +
' GuardedObject: '
' '
+----+ wait ' +--------------+ ' task finish notify t1 +----+
| t1 | ------> ' | response | ' <----------------------- | t2 |
+----+ ' +--------------+ ' +----+
' '
+- - - - - - - - - +
@Slf4j
public class GuardedObject {
private Object response;
/**
* 获取响应对象。
*
* @return {@link Object}
*/
public Object getResponseObj() {
synchronized (this) {
// 不满足条件,一直等待,避免虚假唤醒。
while (null == response) {
try {
log.debug("{}: waiting...", Thread.currentThread().getName());
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
/**
* 设置响应对象后进行通知。
*
* @param response 响应
*/
public void setResponseObjAndNotify(Object response) {
synchronized (this) {
this.response = response;
log.debug("{}: notify all thread", Thread.currentThread().getName());
this.notifyAll();
}
}
/**
* 模拟业务下载。
*
* @return {@link List}<{@link String}>
*/
public List<String> download() {
try {
// 模拟执行耗时 1s。
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回空集合。
return Collections.emptyList();
}
public static void main(String[] args) throws InterruptedException {
GuardedObject gObj = new GuardedObject();
new Thread(() -> {
// t1 线程 waiting。
Object response = gObj.getResponseObj();
log.debug("t1: get response {}", response);
}, "t1").start();
new Thread(() -> {
// t2 线程执行完任务后为对象赋值,再唤醒 t1 线程。
List<String> response = gObj.download();
log.debug("t2: download finish");
gObj.setResponseObjAndNotify(response);
}, "t2").start();
// t1: waiting...
// t2: download finish
// t2: notify all thread
// t1: get response []
}
}
但是上述代码示例有个问题,执行下载的业务如果卡主,t2 线程一直未给 response 进行赋值那么就会导致 t1 线程一直阻塞。
接下来,我们加入一个超时机制,为 getResponseObj() 方法添加一个超时时间,如果超过了超时时间,就算是还没有结果,也进行返回,不再阻塞。
改造后代码示例如下:
@Slf4j
public class GuardedObject {
private Object response;
/**
* 获取响应对象。
*
* @param timeout 超时时间。
* @return {@link Object}
*/
public Object getResponseObj(long timeout) {
synchronized (this) {
// 记录最初时间。
long begin = System.currentTimeMillis();
// 已经经过的时间。
long passTime = 0;
while (null == response) {
// 计算等待了多长时间。(假设 timeout 是 1000,结果在 400 时唤醒了,那么还有 600 要等)
long waitTime = timeout - passTime;
if (waitTime <= 0) {
log.debug("waitTime <= 0,break");
break;
}
try {
log.debug("{}: waiting...", Thread.currentThread().getName());
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求出已经经历的时间。
passTime = System.currentTimeMillis() - begin;
boolean flag = response == null;
log.debug("timePassed: {}ms, get object is null ={}", passTime, flag);
}
return response;
}
}
/**
* 设置响应对象后进行通知。
*
* @param response 响应
*/
public void setResponseObjAndNotify(Object response) {
synchronized (this) {
this.response = response;
log.debug("{}: notify all thread", Thread.currentThread().getName());
this.notifyAll();
}
}
/**
* 模拟业务下载。
*
* @return {@link List}<{@link String}>
*/
public List<String> download() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Collections.emptyList();
}
public static void main(String[] args) throws InterruptedException {
GuardedObject gObj = new GuardedObject();
new Thread(() -> {
// 设置等待时限 2s。
Object response = gObj.getResponseObj(2000L);
if (null != response) {
log.debug("t1: get response {}", response);
} else {
log.debug("can't get response");
}
}, "t1").start();
new Thread(() -> {
try {
// 假设此时任务线程卡主,一直未赋值 response。
TimeUnit.MILLISECONDS.sleep(3000L);
gObj.setResponseObjAndNotify(null);
// 卡了3s后才开始执行下载逻辑。
List<String> response = gObj.download();
log.debug("t2: download finish");
gObj.setResponseObjAndNotify(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
// t1: waiting...
// t2: notify all thread
// timePassed: 3001ms, get object is null =true
// waitTime <= 0,break
// can't get response
// 2: download finish
// t2: notify all thread
}
}
GuardObject 不同,不需要产生结果和消费结果的线程一一对应。JDK 中各种阻塞队列,采用的就是这种模式。
public class ProducerAndConsumerSample {
@AllArgsConstructor
@Getter
static class Message {
private int id;
private Object message;
}
@Slf4j
static final class MessageQueue {
/**
* 消息队列。
*/
private final LinkedList<Message> queue;
/**
* 容量。
*/
private final int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
// 没消息等待。
while (queue.isEmpty()) {
log.debug("No messages in the queue,wait...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 有消息就出列,并通知其他线程。
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
// 消息个数如果等于指定容量就等待。
while (queue.size() == capacity) {
log.debug("there are too many message,wait...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 添加消息,并通知。
queue.addLast(message);
queue.notifyAll();
}
}
public static void main(String[] args) {
// 创建指定容量为 1 的 mq 对象。
MessageQueue mq = new MessageQueue(1);
// 创建3个生产者线程。
for (int i = 0; i < 3; i++) {
int id = i + 1;
String threadId = "thread-" + id;
new Thread(() -> {
log.debug("({}) ,try put message", threadId);
mq.put(new Message(id, threadId + " message info"));
log.debug("({}) ,put message success", threadId);
}, "producer_" + threadId).start();
}
// 消费者线程。
new Thread(() -> {
while (true) {
Message msg = mq.take();
log.debug("(Thread-4) get message:" + msg.getMessage());
}
}, "consumer_thread-4").start();
// 某次运行结果如下:
// No messages in the queue,wait...
// (thread-2) ,try put message
// (thread-3) ,try put message
// (thread-1) ,try put message
// (thread-2) ,put message success
// (thread-1) ,put message success
// there are too many message,wait...
// (Thread-4) get message:thread-2 message info
// (Thread-4) get message:thread-1 message info
// (thread-3) ,put message success
// (Thread-4) get message:thread-3 message info
// No messages in the queue,wait...
}
}
}
LockSupport 类中的方法。// 暂停当前线程。
LockSupport.park();
// 恢复某个线程的运行。(暂停线程对象)
LockSupport.unpark(Thread thread);
@Slf4j
public class ParkAndUnpark {
/**
* 测试先 park 再 unpark 场景。
*
* @throws InterruptedException 中断异常
*/
@Test
public void testParkThenUnpark() throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
TimeUnit.MILLISECONDS.sleep(5);
log.debug("unpark...");
LockSupport.unpark(t1);
t1.join();
// park...
// unpark...
// resume...
}
/**
* 测试先 unpark 再 park 场景。
*
* @throws InterruptedException 中断异常
*/
@Test
public void testUnparkThenPark() throws InterruptedException {
Thread t2 = new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(3);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t2.start();
log.debug("unpark...");
LockSupport.unpark(t2);
t2.join();
// unpark...
// park...
// resume...
}
}
park()&unpark()与Object的wait()¬ify()相比较。
wait(),notify() 和 notifyAll() 必须配合 Object Monitor 一起使用,而 park(),unpark() 则不用。park() & unpark() 是以线程为单位来阻塞和唤醒线程,而 notify() 只能随机唤醒一个等待线程,notifyAll() 是唤醒所有等待线程,就不那么精确。park() & unpark() 可以先 unpark(),而 wait() & notify() 不能先 notify()。每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex 。
情况一(先调用 park(),再调用 unpark()):

park():
Unsafe.park() 方法;_counter ,本情况为 0,这时,获得 _mutex 互斥锁;_cond 条件变量阻塞;_counter = 0 。unpark(Thread thread):
Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1;_cond 条件变量中的 Thread_0;_counter 为 0 。情况二(先调用 unpark(),再调用 park()):

Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1;Unsafe.park() 方法;_counter ,本情况为 1,这时线程无需阻塞,继续运行;_counter 为 0 。假设我们有两个互不相干的任务,而它们都使用同一把对象锁,那么并发度就很低。
static class SameLock {
private void task1() throws InterruptedException {
synchronized (this) {
TimeUnit.SECONDS.sleep(2);
}
}
private void task2() throws InterruptedException {
synchronized (this) {
TimeUnit.SECONDS.sleep(3);
}
}
public static void main(String[] args) throws InterruptedException {
SameLock lock = new SameLock();
Thread t1 = new Thread(() -> {
try {
lock.task1();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
try {
lock.task2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
Instant start = Instant.now();
t1.start();
t2.start();
t1.join();
t2.join();
Instant end = Instant.now();
log.debug("task1 and task2 spend time:{}s", Duration.between(start, end).toSeconds());
// task1 and task2 spend time:5s
}
}
static class DifferentLock {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
private void task1() throws InterruptedException {
synchronized (lock1) {
TimeUnit.SECONDS.sleep(2);
}
}
private void task2() throws InterruptedException {
synchronized (lock2) {
TimeUnit.SECONDS.sleep(3);
}
}
public static void main(String[] args) throws InterruptedException {
DifferentLock lock = new DifferentLock();
Thread t1 = new Thread(() -> {
try {
lock.task1();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
try {
lock.task2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
Instant start = Instant.now();
t1.start();
t2.start();
t1.join();
t2.join();
Instant end = Instant.now();
log.debug("task1 and task2 spend time:{}s", Duration.between(start, end).toSeconds());
// task1 and task2 spend time:3s
}
}
有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁。
@Slf4j
public class DeadLockSample {
public static void main(String[] args) {
Object lockA = new Object();
Object lockB = new Object();
new Thread(()->{
synchronized (lockA){
try {
log.debug("t1 lock A");
TimeUnit.SECONDS.sleep(1);
synchronized (lockB){
log.debug("t1 lock B");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1").start();
new Thread(()->{
synchronized (lockB){
try {
log.debug("t2 lock lockB");
TimeUnit.SECONDS.sleep(1);
synchronized (lockA){
log.debug("t2 lock lockA");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t2").start();
// t1 lock A
// t2 lock lockB
// 程序一直处于运行中状态...
}
}
检测死锁可以使用
jconsole工具,或者使用jps定位进程 id,再用jstack定位死锁。
# jvm进程查看
$ jps
# 线程快照分析
$ jstack id

linux 下可以通过 top 先定位到 cpu 占用高的 Java 进程,再利用 top -Hp Pid 来定位是哪个线程,最后再用 jstack 排查。活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。
@Slf4j
public class LiveLockSample {
private static volatile int count = 10;
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
try {
TimeUnit.MILLISECONDS.sleep(1);
count--;
log.debug("count: {}", count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
try {
TimeUnit.MILLISECONDS.sleep(1);
count++;
log.debug("count: {}", count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2").start();
// 一直运行....
}
}
如果线程优先级”不均“,在
cpu繁忙的情况下,优先级低的线程得到执行的机会很小,就可能发生线程“饥饿”;持有锁的线程,如果执行的时间过长,也可能导致“饥饿”问题。
它与 synchronized 一样,都支持可重入。但是相对于 synchronized 它还具备如下特点:
基本语法:
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
@Slf4j
public class ReentrantLockSample {
static ReentrantLock lock = new ReentrantLock();
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
method1();
// execute method1
// execute method2
// execute method3
}
}
@Slf4j
public class ReentrantLockSample {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("start...");
try {
// 中断。
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("is interrupted!");
return;
}
try {
log.debug("get lock");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("get lock");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
t1.interrupt();
log.debug("run interrupt");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
// [main] get lock
// [t1] start...
// [main] run interrupt
// [t1] is interrupted!
// java.lang.InterruptedException
}
}
@Slf4j
public class ReentrantLockSample {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
log.debug("start...");
lock.lock();
log.debug("get lock");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("get lock");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
t1.interrupt();
log.debug("run interrupt");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("unlock");
lock.unlock();
}
// [main] get lock
// [t1] start...
// [main] run interrupt
// [main] unlock
// [t1] get lock
}
}
@Slf4j
public class ReentrantLockSample {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("start...");
if (!lock.tryLock()) {
log.debug("get lock fail, return now!");
return;
}
try {
log.debug("get lock");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("get lock");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
// [main] get lock
// [t1] start...
// [t1] get lock fail, return now!
}
}
@Slf4j
public class ReentrantLockSample {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("start...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("wait 1s ,get fail return");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("get lock");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("get lock");
t1.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
// [main] get lock
// [t1] start...
// -- 1s --
// [t1] wait 1s ,get fail return
}
}
ReentrantLock默认是不公平的。
ReentrantLock lock = new ReentrantLock(true);
synchronized 中也有条件变量(当条件不满足时进入 waitSet 等待)。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的。
使用要点:
await() 前需要获得锁。await() 执行后,会释放锁,进入 conditionObject 等待。await() 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁。lock 锁成功后,从 await() 后继续执行。代码示例:
@Slf4j
public class ReentrantLockSample {
private static final ReentrantLock LOCK = new ReentrantLock();
private static Condition condition1 = LOCK.newCondition();
private static Condition condition2 = LOCK.newCondition();
private static volatile boolean task1Finish = false;
private static volatile boolean task2Finish = false;
private static void doTask1() {
LOCK.lock();
try {
log.debug("do task1...");
task1Finish = true;
condition1.signal();
} finally {
LOCK.unlock();
}
}
private static void doTask2() {
LOCK.lock();
try {
log.debug("do task2...");
task2Finish = true;
condition2.signal();
} finally {
LOCK.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
LOCK.lock();
// 任务1未完成则等待。
while (!task1Finish) {
try {
log.debug("t1 wait...");
condition1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("task1 finish!");
} finally {
LOCK.unlock();
}
}, "t1").start();
new Thread(() -> {
try {
LOCK.lock();
// 任务2未完成则等待。
while (!task2Finish) {
try {
log.debug("t2 wait...");
condition2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("task2 finish!");
} finally {
LOCK.unlock();
}
}, "t2").start();
TimeUnit.SECONDS.sleep(1);
doTask1();
TimeUnit.SECONDS.sleep(1);
doTask2();
// [t1] t1 wait...
// [t2] t2 wait...
// [main] do task1...
// [t1] task1 finish!
// [main] do task2...
// [t2] task2 finish!
}
}
需求:必须先 2 后 1 打印。
此处使用:使用 LockSupport 类的 park() 和 unpark() 来进行实现。
park() 和 unpark() 方法比较灵活,它俩谁先调用,谁后调用无所谓。并且是以线程为单位进行暂停和恢复,不需要同步对象和运行标记。
代码示例:
@Slf4j
public class SequenceControlSample {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
log.debug("1");
}, "t1");
Thread t2 = new Thread(() -> {
log.debug("2");
// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(t1);
}, "t2");
t1.start();
t2.start();
// [t2] 2
// [t1] 1
}
}
需求:线程 t1 输出 a 5 次,线程 t2 输出 b 5 次,线程 t3 输出 c 5 次,现在要求输出 abcabcabcabcabc 。
@Slf4j
public class SyncWaitNotify {
private int flag;
private final int loopNumber;
public SyncWaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
public void print(int waitFlag, int nextFlag, String str) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (this.flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
public static void main(String[] args) {
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
new Thread(() -> syncWaitNotify.print(1, 2, "a")).start();
new Thread(() -> syncWaitNotify.print(2, 3, "b")).start();
new Thread(() -> syncWaitNotify.print(3, 1, "c")).start();
// abcabcabcabcabc
}
}
“-------怕什么真理无穷,进一寸有一寸的欢喜。”
微信公众号搜索:饺子泡牛奶。