源码 + 官方文档
java.util.concurrent
普通的线程代码 Thread
Runnable没有返回值,效率相比Callable相对较低!
进程、线程
进程:一个程序,QQ.exe
一个进程往往可以包含多个线程,至少包含一个;
Java默认有几个线程?-- 2个 mian线程,GC线程
对于java而言:Thread/Runnable/Callable
java真的可以开启线程吗? – 不可以
Thread部分源码
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//本地方法,底层的C++ java无法直接操作硬件
private native void start0();
并发、并行
并发(多线程操作同一个资源)
并行(多个人一次行走)
查询cpu核数
public class Test1 {
public static void main(String[] args) {
//查询cpu核数
//CPU 密集型,IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
并发编程的本质:充分利用CPU的资源
线程有几个状态 – 6种
public enum State {
/**
* 新建
*/
NEW,
/**
* 运行
*/
RUNNABLE,
/**
* 阻塞
*/
BLOCKED,
/**
* 等待,死死的等
*/
WAITING,
/**
* 超时等待
*/
TIMED_WAITING,
/**
* 停止
*/
TERMINATED;
}
wait/sleep区别
wait -> Object
sleep -> Thread
wait会释放锁
sleep睡觉了,抱着锁睡觉,不会释放!
wait必须在同步代码块中
sleep可以在任何地方睡
wait不需要捕获异常
sleep必须捕获异常
public class SaleTickerDemo01 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "C").start();
}
}
class Ticket {
private int number = 50;
public synchronized void sale() {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "买了第" + (number--) + "张票");
}
}
}
输出
A买了第50张票
A买了第49张票
A买了第48张票
A买了第47张票
A买了第46张票
A买了第45张票
B买了第44张票
B买了第43张票
B买了第42张票
B买了第41张票
B买了第40张票
B买了第39张票
B买了第38张票
B买了第37张票
B买了第36张票
B买了第35张票
B买了第34张票
B买了第33张票
B买了第32张票
B买了第31张票
B买了第30张票
B买了第29张票
B买了第28张票
B买了第27张票
B买了第26张票
B买了第25张票
B买了第24张票
B买了第23张票
A买了第22张票
A买了第21张票
A买了第20张票
A买了第19张票
A买了第18张票
A买了第17张票
A买了第16张票
A买了第15张票
A买了第14张票
A买了第13张票
A买了第12张票
A买了第11张票
A买了第10张票
A买了第9张票
A买了第8张票
A买了第7张票
A买了第6张票
A买了第5张票
A买了第4张票
A买了第3张票
A买了第2张票
A买了第1张票
加锁,释放锁
实现类(可重入锁,读锁,写锁)
可重入锁构造方法
公平锁:十分公平,可以先来后到
非公平锁:十分不公平,可以插队(默认)
public class SaleTickerDemo02 {
public static void main(String[] args) {
Ticket2 ticket = new Ticket2();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "C").start();
}
}
/**
* 1.创建锁 Lock lock = new ReentrantLock();
* 2.加锁 lock.lock();
* 3.释放锁 lock.unlock();
*/
class Ticket2 {
private int number = 50;
/**
* 创建锁
*/
Lock lock = new ReentrantLock();
public void sale() {
//加锁
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "买了第" + (number--) + "张票");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
lock.unlock();
}
}
}
锁是什么?如何判断锁的是谁?
生产者和消费者问题 Synchronized 版本
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
class Data {
private int number = 0;
public synchronized void increment() throws InterruptedException {
if (number != 0) {
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他线程,我+1完毕了
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他线程,我-1完毕了
this.notifyAll();
}
}
输出
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
问题存在,如果是A B C D 四个线程(可能不会出现上面预期的结果,可能是有2,3出现) -》 虚假唤醒
解决:把if 改成while
用if判断的话,唤醒后线程会从wait之后的代码开始运行,但是不会重新判断if条件,直接继续运行if代码块之后的代码,而如果使用while的话,也会从wait之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行while代码块之后的代码块,成立的话继续wait。
拿两个加法线程A、C来说,比如A先执行,执行时调用了wait方法,那它会等待,此时会释放锁,那么线程C获得锁并且也会执行wait方法,两个加线程一起等待被唤醒。此时减线程中的某一个线程执行完毕并且唤醒了这俩加线程,那么这俩加线程不会一起执行,其中A获取了锁并且加1,执行完毕之后C再执行。如果是if的话,那么A修改完num后,C不会再去判断num的值,直接会给num+1。如果是while的话,A执行完之后,C还会去判断num的值,因此就不会执行。
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data {
private int number = 0;
public synchronized void increment() throws InterruptedException {
while (number != 0) {
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他线程,我+1完毕了
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他线程,我-1完毕了
this.notifyAll();
}
}
输出
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
C==>1
B==>0
A==>1
B==>0
C==>1
D==>0
A==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
生产者和消费者问题 JUC版本
通过Lock找到Condition
代码实现
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "D").start();
}
}
class Data2 {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (number != 0) {
//等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他线程,我+1完毕了
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (number == 0) {
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他线程,我-1完毕了
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
输出
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
C==>1
B==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
C==>1
D==>0
A==>1
D==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
A==>1
B==>0
任何一个新的技术,绝对不是仅仅只是覆盖了原来的技术,优化和补充!
Condition 精准的通知和唤醒线程
代码
public class C {
public static void main(String[] args) {
Data3 data3 = new Data3();
//A执行完,调用B,B执行完,调用C,C执行完,调用A
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printC();
}
}, "C").start();
}
}
class Data3 {
private Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
private char ch = 'A';
public void printA() {
lock.lock();
try {
while (ch != 'A') {
//等待
conditionA.await();
}
System.out.println(Thread.currentThread().getName() + "--->A");
//唤醒
ch = 'B';
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (ch != 'B') {
//等待
conditionB.await();
}
System.out.println(Thread.currentThread().getName() + "--->B");
//唤醒
ch = 'C';
conditionC.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (ch != 'C') {
//等待
conditionC.await();
}
System.out.println(Thread.currentThread().getName() + "--->C");
//唤醒
ch = 'A';
conditionA.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
输出
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
如何判断锁的是谁?什么是锁?锁到底锁的是谁?
对象、Class
1,2.
/**
* 8锁,就是关于锁的8个问题
* 1.两个普通的同步方法,一个对象,两个线程先打印发短信还是打电话? -- 1.发短信 2.打电话
* 2.两个普通的同步方法,一个对象,发短信休眠4s,两个线程先打印发短信还是打电话? -- 1.发短信 2.打电话
*/
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.sendSms();
}, "A").start();
//休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
//synchronized 锁的对象是方法的调用者
//两个方法用的同一个锁,谁先拿到锁谁就先执行
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
/**
* 3。一个普通方法(非同步),一个普通的同步方法,一个对象,两个线程先打印发短信还是hello? -- 1.hello 2.发短信
*/
public class Test2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(() -> {
phone.sendSms();
}, "A").start();
//休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.hello();
}, "B").start();
}
}
class Phone2 {
//synchronized 锁的对象是方法的调用者
//两个方法用的同一个锁,谁先拿到锁谁就先执行
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
//这里没有锁!不是同步方法,不受锁的影响
public void hello() {
System.out.println("hello");
}
}
/**
* 4。两个同步方法,两个对象, 两个线程先打印发短信还是打电话 -- 1.打电话 2.发短信
*/
public class Test2 {
public static void main(String[] args) {
//2个对象,2个调用者,两把锁
Phone2 phone = new Phone2();
Phone2 phone2 = new Phone2();
new Thread(() -> {
phone.sendSms();
}, "A").start();
//休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone2 {
//synchronized 锁的对象是方法的调用者
//两个方法用的同一个锁,谁先拿到锁谁就先执行
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
/**
* 5.两个静态的同步方法,一个对象,两个线程先打印发短信还是打电话 -- 1.发短信 2.打电话
*/
public class Test3 {
public static void main(String[] args) {
Phone3 phone3 = new Phone3();
new Thread(() -> {
phone3.sendSms();
}, "A").start();
//休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone3.call();
}, "B").start();
}
}
class Phone3 {
//synchronized 锁的对象是方法的调用者
//static 静态方法
//类一加载就有了,锁的是Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
/**
* 6.两个静态的同步方法,两个对象,两个线程先打印发短信还是打电话 -- 1.发短信 2.打电话
*/
public class Test3 {
public static void main(String[] args) {
//连个对象的Class类模板只有一个,static,锁的是Class
Phone3 phone3 = new Phone3();
Phone3 phone4 = new Phone3();
new Thread(() -> {
phone3.sendSms();
}, "A").start();
//休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone4.call();
}, "B").start();
}
}
//Phone3 唯一的一个Class对象
class Phone3 {
//synchronized 锁的对象是方法的调用者
//static 静态方法
//类一加载就有了,锁的是Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
/**
* 7. 一个静态同步方法,一个普通的同步方法,一个对象,两个线程先打印发短信还是打电话--1.打电话 2.发短信
*/
public class Test4 {
public static void main(String[] args) {
//连个对象的Class类模板只有一个,static,锁的是Class
Phone4 phone4 = new Phone4();
new Thread(() -> {
phone4.sendSms();
}, "A").start();
//休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone4.call();
}, "B").start();
}
}
//Phone3 唯一的一个Class对象
class Phone4 {
//静态的同步方法,锁的是Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
//普通的同步方法,锁的是调用者
public synchronized void call() {
System.out.println("打电话");
}
}
/**
* 8. 一个静态同步方法,一个普通的同步方法,两个对象,两个线程先打印发短信还是打电话--1.打电话 2.发短信
*/
public class Test4 {
public static void main(String[] args) {
//连个对象的Class类模板只有一个,static,锁的是Class
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(() -> {
phone1.sendSms();
}, "A").start();
//休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
//Phone3 唯一的一个Class对象
class Phone4 {
//静态的同步方法,锁的是Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
//普通的同步方法,锁的是调用者
public synchronized void call() {
System.out.println("打电话");
}
}
小结
new this 具体的一个手机
static Class 唯一的一个模板
public class ListTest {
public static void main(String[] args) {
//并发下ArrayList不安全
/**
* 解决方法:
* 1. List list = new Vector<>();
* 2. List list = Collections.synchronizedList(new ArrayList<>());
* 3. List list = new CopyOnWriteArrayList<>()
*/
//CopyOnWrite 写入时复制 COW 计算机程序设计领域的一种优化策略
//多个线程调用的时候,list,读取的时候,固定的,写入(覆盖)
//在写入的时候避免覆盖,造成数据问题
//读写分离
//CopyOnWriteArrayList 比 Vector 的优势
List<Integer> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 1000; i++) {
final int finalI = i;
new Thread(() -> {
list.add(finalI);
}, String.valueOf(i)).start();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(list.size());
}
}
源码比较
CopyOnWriteArrayList的add源码
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
Vector的add源码
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
学习方法推荐:1.先会用,2.寻找其他解决方案,3.分析源码
public class SetTest {
public static void main(String[] args) {
/**
* Set set = new HashSet<>(); 不安全
* 解决方案:
* 1. Set set = Collections.synchronizedSet(new HashSet<>());
* 2. Set set = new CopyOnWriteArraySet<>();
*/
Set<Integer> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 1000; i++) {
final int finalI = i;
new Thread(() -> {
set.add(finalI);
}).start();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(set.size());
}
}
HashSet底层是什么?入参构造方法和add方法如下(源码)
public HashSet() {
map = new HashMap<>();
}
// 常量
private static final Object PRESENT = new Object();
//add set本质就是map key是无法重复的!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
public class MapTest {
public static void main(String[] args) {
//new HashMap<>() 等价于 new HashMap<>(16, 0.75f)
/**
* Map map = new HashMap<>(); 不安全
* 解决方案:
* 1. Map map = Collections.synchronizedMap(new HashMap<>());
* 2. Map map = new ConcurrentHashMap<>();
*/
//看看官方api文档
Map<Integer, String> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 1000; i++) {
final int finalI = i;
new Thread(() -> {
map.put(finalI, Thread.currentThread().getName());
}, String.valueOf(i)).start();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(map.size());
}
}
Runnable的Api文档
FutureTask的Api文档
public class CallableTest {
public static void main(String[] args) {
MyThread thread = new MyThread();
//Callable --- Runnable 中间转换(适配类)
FutureTask<String> futureTask = new FutureTask<>(thread);
//结果会被缓存,提高效率
new Thread(futureTask, "A").start();
new Thread(futureTask, "B").start();
//获取返回值
try {
//这个get方法可能会产生阻塞,把它放到最后
String s = futureTask.get();
//或者使用异步通信来处理
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class MyThread implements Callable<String> {
@Override
public String call() {
System.out.println("call()");
//可能是耗时的操作
return "123";
}
}
输出
call()
123
细节;
减法计数器
public class CountDownLatchDemo {
public static void main(String[] args) {
//总数是6 必须要执行的任务的时候再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" go out");
// 数量-1
countDownLatch.countDown();
}).start();
}
try {
//等待计数器归零,然后再往下执行
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("close door");
}
}
原理:
countDownLatch.countDown(); //数量-1
countDownLatch.await();//等待计数器归零,然后再往下执行
每次线程调用countDown()数量-1,假设计数器变成0,countDownLatch.await()就会被唤醒,继续执行!
加法计算器
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐七颗龙珠,召唤神龙
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println("收集"+temp+"星龙珠");
try {
//阻塞
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
输出
收集1星龙珠
收集2星龙珠
收集3星龙珠
收集5星龙珠
收集4星龙珠
收集6星龙珠
收集7星龙珠
召唤神龙成功!
Semaphore:信号量
抢车位
5辆车,3个停车位
public class SemaphoreDemo {
public static void main(String[] args) {
//3个停车位 限流
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
//获得
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获得停车位");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放
semaphore.release();
System.out.println(Thread.currentThread().getName() + "离开停车位");
}
}, String.valueOf(i)).start();
}
}
}
输出
2获得停车位
3获得停车位
1获得停车位
3离开停车位
4获得停车位
2离开停车位
1离开停车位
5获得停车位
4离开停车位
5离开停车位
原理:
semaphore.acquire(); //获得,假设已经满了,等待,等待被释放为止!
semaphore.release();//释放,会将当前的信号量释放,然后唤醒等待线程!
作用:
多个共享资源互斥使用!并发限流,控制最大线程数!
读写锁
/**
* 独占锁(写锁)一次只能被一个线程占有
* 共享锁(读锁)多个线程可以同时占有
* ReadWriteLock
* 读-读 可以共存!
* 读-写 不能共存!
* 写-写 不能共存!
*
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
for (int i = 1; i <= 5; i++) {
final String temp = String.valueOf(i);
new Thread(() -> {
myCache.put(temp, temp);
}, temp).start();
}
for (int i = 1; i <= 5; i++) {
final String temp = String.valueOf(i);
new Thread(() -> {
myCache.get(temp);
}, temp).start();
}
}
}
class MyCacheLock {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存,写
public void put(String key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//取,读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
//存,写
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入ok");
}
//取,读
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取ok");
}
}
输出
1写入1
1写入ok
3写入3
3写入ok
4写入4
4写入ok
2写入2
2写入ok
2读取2
2读取ok
1读取1
1读取ok
5写入5
5写入ok
4读取4
4读取ok
5读取5
3读取3
5读取ok
3读取ok
阻塞队列
阻塞队列
非阻塞队列
双端队列
什么情况下我们会使用阻塞队列? – 多线程并发处理,线程池!
使用队列
四组API
方式 | 有返回值,抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,) |
移除 | remove() | poll() | take() | poll(,) |
查看队列首元素 | element() | peek() | - | - |
/**
* 有返回值,抛出异常
*/
public static void test1() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//抛出异常 java.lang.IllegalStateException: Queue full
// System.out.println(blockingQueue.add("d"));
//查看队列首元素
System.out.println(blockingQueue.element());
System.out.println("===================");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//抛出异常 java.util.NoSuchElementException
// System.out.println(blockingQueue.remove());
}
/**
* 有返回值,不抛出异常
*/
public static void test2() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//不抛出异常,返回false
System.out.println(blockingQueue.offer("d"));
//查看队列首元素
System.out.println(blockingQueue.peek());
System.out.println("===================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//不抛出异常,返回null
System.out.println(blockingQueue.poll());
}
/**
* 阻塞等待,一直等
*/
public static void test3() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
try {
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//队列没有位置了,一直阻塞
// blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//队列没有元素了,一直阻塞
// System.out.println(blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 超时等待
*/
public static void test4() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
try {
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2, TimeUnit.SECONDS));
//等待超过2秒 返回false
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));
System.out.println("================");
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
//等待超过2秒 返回null
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
没有容量
进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
put、take
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + " put b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + " put c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}
输出
T1 put a
T2 ==> a
T1 put b
T2 ==> b
T1 put c
T2 ==> c
占用系统的资源
优化资源的使用 --》池化技术
线程池、连接池、内存池、对象池。。。(创建和销毁十分浪费资源)
池化技术:事先贮备好一些资源,要用,就来这里拿,用完之后还回来
线程池的好处:
线程复用,可以控制最大并发数、管理线程
线程池:3大方法、7大参数、4种拒绝策略
3大方法
public class Demo01 {
public static void main(String[] args) {
//第1大方法:单个线程
// ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
//第2大方法:创建一个固定的线程池大小
// ExecutorService threadExecutor = Executors.newFixedThreadPool(5);
//第3大方法:可伸缩的,遇强则强,遇弱则弱
ExecutorService threadExecutor = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
//使用了线程池之后,使用线程池来创建线程
threadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}
}
输出
第1大方法
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
第2大方法
pool-1-thread-1 ok
pool-1-thread-5 ok
pool-1-thread-5 ok
pool-1-thread-5 ok
pool-1-thread-5 ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-5 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
第3大方法
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-6 ok
pool-1-thread-5 ok
pool-1-thread-7 ok
pool-1-thread-9 ok
pool-1-thread-10 ok
pool-1-thread-8 ok
源码分析(3大方法)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
本质 ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大小线程池大小
long keepAliveTime, //存活时间,超时了没有调用就会释放
TimeUnit unit,//超时单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程的,一般不用动
RejectedExecutionHandler handler //拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手动创建线程池
public class Demo01 {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
//队列满了,线程数达到最大线程数,还有线程过来,不处理这个线程,抛出异常
// new ThreadPoolExecutor.AbortPolicy()
//哪里来的就去哪里
// new ThreadPoolExecutor.CallerRunsPolicy()
//队列满了,丢掉任务,不会抛出异常
// new ThreadPoolExecutor.DiscardPolicy()
//队列满了,尝试和最早的竞争,竞争失败丢掉任务,也不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy()
);
try {
//最大承载:Deque + Max 超过,RejectedExecutionException
for (int i = 0; i < 9; i++) {
//使用了线程池之后,使用线程池来创建线程
threadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}
}
new ThreadPoolExecutor.AbortPolicy() 输出
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-4 ok
pool-1-thread-5 ok
pool-1-thread-1 ok
java.util.concurrent.RejectedExecutionException: Task com.zyy.pool.Demo01$$Lambda$1/1096979270@7ba4f24f rejected from java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.zyy.pool.Demo01.main(Demo01.java:40)
new ThreadPoolExecutor.CallerRunsPolicy() 输出
pool-1-thread-2 ok
main ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-5 ok
pool-1-thread-2 ok
new ThreadPoolExecutor.DiscardPolicy() 输出
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-5 ok
new ThreadPoolExecutor.DiscardOldestPolicy() 输出
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
pool-1-thread-4 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-5 ok
四种拒绝策略
1. 队列满了,线程数达到最大线程数,还有线程过来,不处理这个线程,抛出异常
new ThreadPoolExecutor.AbortPolicy()
2. 哪里来的就去哪里
new ThreadPoolExecutor.CallerRunsPolicy()
3. 队列满了,丢掉任务,不会抛出异常
new ThreadPoolExecutor.DiscardPolicy()
4. 队列满了,尝试和最早的竞争,竞争失败丢掉任务,也不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy()
最大线程数应该如何设置?
CPU密集型,几核,就是几,可以保证CPU效率最高
IO密集型 (判断你程序中十分耗IO的线程)
如程序中有15个大型任务,IO十分消耗资源,一般设置为2倍,为30
获取CPU核数
//获取CPU核数
System.out.println(Runtime.getRuntime().availableProcessors());
代码优化
public class Demo01 {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadExecutor = new ThreadPoolExecutor(
4,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
//队列满了,丢掉任务,不会抛出异常
new ThreadPoolExecutor.DiscardPolicy()
);
try {
//最大承载:Deque + Max 超过,RejectedExecutionException
for (int i = 0; i < 10; i++) {
//使用了线程池之后,使用线程池来创建线程
threadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}
}
新时代的程序员:lambda表达式,链式编程,函数式接口,Stream流式计算
函数式接口:只有一个方法的接口(简化编程模型,在新版本框架底层中大量应用!)
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
四大函数式接口
/**
* Function 函数式接口,有一个输入参数,有一个输出
* 只要是函数式,可以用lambda表达式简化
*/
public class Demo01 {
public static void main(String[] args) {
//匿名内部类
/*Function function = new Function() {
@Override
public String apply(String str) {
return str;
}
};*/
//用lambda表达式简化
Function function = (str) -> {return str;};
System.out.println(function.apply("11"));
}
}
/**
* 断定型接口:有一个输入参数,返回值只能是布尔值
*/
public class Demo02 {
public static void main(String[] args) {
//判断字符串是否为空
/*Predicate predicate = new Predicate() {
@Override
public boolean test(String o) {
if (o == null || o.length() == 0) {
return true;
}
return false;
}
};*/
//用lambda表达式简化
Predicate<String> predicate = (o) -> {
if (o == null || o.length() == 0) {
return true;
}
return false;
};
System.out.println(predicate.test(""));
}
}
/**
* Consumer 消费型接口:只有输入,没有返回值
*/
public class Demo03 {
public static void main(String[] args) {
/*Consumer consumer = new Consumer() {
@Override
public void accept(String o) {
System.out.println(o);
}
};*/
//用lambda表达式简化
Consumer<String> consumer = (o) -> System.out.println(o);
consumer.accept("123");
}
}
/**
* Supplier 供给型接口 没有参数,只有返回值
*/
public class Demo04 {
public static void main(String[] args) {
/*Supplier supplier = new Supplier() {
@Override
public Integer get() {
return 1024;
}
};*/
//用lambda表达式简化
Supplier<Integer> supplier = () -> 1024;
System.out.println(supplier.get());
}
}
什么是Stream流式计算
大数据:存储+计算
集合、MySQL本质就是存储东西的
计算都应该交给流来操作
/**
* 题目要求:一分钟内完成此题,只能用一行代码实现
* 现在有5个用户!筛选
* 1.ID必须是偶数
* 2.年龄必须大于23岁
* 3.用户转为大写
* 4.用户名字母倒着排序
* 5.只输出一个用户
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(6, "e", 25);
//集合就是储存
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
//计算交给Stream流
//链式编程
list.stream()
.filter(u -> u.getId() % 2 == 0)
.filter(u -> u.getAge() > 23)
.map(u -> u.getName().toUpperCase())
.sorted((user1, user2) -> user2.compareTo(user1))
.limit(1)
.forEach(u -> System.out.println(u));
}
}
class User {
private int id;
private String name;
private int age;
public User() {
}
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
分支合并
什么是ForkJoin
ForkJoin在JDK1.7,并行执行任务!提高效率,大数据量!
大数据:Map Reduce(把大任务拆分为小任务)
ForkJoin特点:工作窃取
这个里面维护的都是双端队列
ForkJoin操作
ForkJoinPool的方法
/**
* 求和计算的任务
* 如何使用ForkJoin
* 1. ForkJoinPool 通过它来执行
* 2. 计算任务:forkJoinPool.execute(ForkJoinTask> task)
* 3. 计算类要继承RecursiveTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end - start) < temp) {
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
//分支合并计算 forkjoin
//中间值
long middle = (start + end) /2;
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
//拆分任务,把任务压入队列
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
//拆分任务,把任务压入队列
task2.fork();
return task1.join() + task2.join();
}
}
}
public class Test {
public static void main(String[] args) {
//耗时:6295
// test1();
//耗时:4401
// test2();
//耗时:294
test3();
}
//普通方式
public static void test1() {
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum:" + sum + " 时间:" + (end - start));
}
public static void test2() {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> forkJoinTask = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTask);
Long sum = null;
try {
sum = submit.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("sum:" + sum + " 时间:" + (end - start));
}
public static void test3() {
long start = System.currentTimeMillis();
//Stream 并行流
long sum = LongStream.range(0L, 10_0000_0001L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum:" + sum + " 时间:" + (end - start));
}
}
Future设计的初衷:对将来的某个事件的结果进行建模
没有返回值的runAsync异步回调
/**
* 异步调用
* -- 成功回调
* -- 失败回调
*/
public class Demo01 {
public static void main(String[] args) {
//没有返回值的runAsync异步回调
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
});
System.out.println("main");
try {
//获取阻塞直接结果
completableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
有返回值的supplyAsync异步回调
public class Demo01 {
public static void main(String[] args) {
//有返回值的异步回调
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+"supplyAsync=>");
//这里会抛出异常
int i = 10/0;
return 200;
});
try {
System.out.println(completableFuture.whenComplete((Integer integer, Throwable throwable) -> {
//返回值
System.out.println(integer);
//错误信息
System.out.println(throwable);
}).exceptionally((Throwable throwable) -> {
System.out.println(throwable);
//可以获取到错误的返回结果
return 501;
}).get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
请你谈谈你的Volatile的理解
Volatile是java虚拟机提供轻量级的同步机制
什么是JMM
JMM java内存模型,不存在的东西,概念!约定!
关于JMM的一些同步的约定
线程 工作内存 主内存
8中操作
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
JMM对这八种指令的使用,制定了如下规则:
上次这里有个问题,线程B修改了flag的值,但是线程A不能及时可见
public class JMMDemo {
private static int number = 0;
public static void main(String[] args) {
//main
new Thread(() -> {
//子线程 循环
while (number == 0) {
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//我们想number = 1,之后子线程会停止循环,然而结果是子线程的循环并没有停止
//这里有个问题就是主内存的值已经被修改了,但是并没有通知子线程
number = 1;
System.out.println("number "+number);
}
}
1.保证可见性
public class JMMDemo {
//加了volatile保证可见性
private volatile static int number = 0;
public static void main(String[] args) {
//main
new Thread(() -> {
//子线程
while (number == 0) {
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//我们想number = 1,之后子线程会停止循环,然而结果是子程序的循环并没有停止
//这里有个问题就是主内存的值已经被修改了
number = 1;
System.out.println("number "+number);
}
}
- 不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割,要么同时成功,要么同时失败。
/**
* 不保证原子性
*/
public class VDemo02 {
//这里加了volatile是不能保证原子性的
private volatile static int number = 0;
public static void add() {
//不是一个原子性操作
number ++;
}
public static void main(String[] args) {
//理论上num结果应该为2万
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2) {
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"-->"+number);
}
}
输出(每次结果不固定)
main–>18795
如果不加lock和synchronized,怎么样保证原子性?
使用原子类,解决原子性问题
原子类为啥可以做到原子性?
public class VDemo02 {
//这里加了volatile是不能保证原子性的
private volatile static AtomicInteger number = new AtomicInteger(0);
public static void add() {
//不是一个原子性的操作
// number ++;
//CAS
number.getAndIncrement();
}
public static void main(String[] args) {
//理论上num结果应该为2万
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2) {
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"-->"+number);
}
}
这些类的底层都直接和操作系统挂钩,在内存中修改值!Unsafe是一个很特殊的存在
指令重排
什么是指令重排?-- 你写的程序,计算机并不是按照你写那样去执行的。
源代码 --》 编译器优化的重排 --》指令并行也可能会重排 --》内存系统也会重排 --》执行
处理器在进行指令重排的时候,考虑:数据之间的依赖性!
int x = 1;// 1
int y = 1;// 2
x = x + 5;// 3
y = x * x;// 4
//我们所期望的执行顺序:1234 但是可能执行的时候会变成2134 1324
//不可能是 4123
可能造成影响的结果
a b x y 这四个值默认值都是0
线程A | 线程B |
---|---|
x=a | y=b |
b=1 | a=2 |
正常的结果:x=1;y=0;但是可能由于指令重排
线程A | 线程B |
---|---|
b=1 | a=2 |
x=a | y=b |
指令重排导致的异常结果:x=2;y=1;
volatile可以避免指令重排:
内存屏障。cpu指令,作用:
volatile可以保证可见性,不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
饿汉式 DCL懒汉式
饿汉式
/**
* 饿汉式单例
*/
public class Hungry {
//可能会浪费空间
private byte[] data1 = new byte[1024*1024];
/**
* 构造方法私有化
*/
private Hungry() {
}
private static final Hungry HUNGRY = new Hungry();
public static Hungry getInstance() {
return HUNGRY;
}
}
DCL懒汉式
public class Lazy {
private Lazy() {
//理论上,这里只会打印一次
System.out.println(Thread.currentThread().getName() + " ok");
}
//一定要加上volatile,防止指令重排造成的问题
private volatile static Lazy LAZY;
//双核从检测锁模式的懒汉式单例 DCL懒汉式
public static Lazy getInstance () {
if (LAZY == null) {
synchronized (Lazy.class) {
if (LAZY == null) {
/**
* 不是一个原子性操作
* 1. 分配内存空间
* 2. 执行构造方法,初始化对象
* 3. 把这个对象指向这个空间
*
* 正常情况123
* 也可能出现132 这个时候还没有完成构造,所以需要避免指令重排,用volatile
*/
LAZY = new Lazy();
}
}
}
return LAZY;
}
//多线程并发
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Lazy.getInstance();
}).start();
}
}
}
静态内部类的方式
public class Holder {
private Holder() {
System.out.println(Thread.currentThread().getName()+" ok");
}
public static Holder getInstance() {
return InnerClass.HOLDER;
}
private static class InnerClass {
private static final Holder HOLDER = new Holder();
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
Holder.getInstance();
}).start();
}
}
}
单例不安全,因为有反射
枚举的方式
枚举不能被反射(看反射的源码)
验证
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance() {
return INSTANCE;
}
}
class Test {
public static void main(String[] args) {
//枚举不能被反射
EnumSingle enumSingle = EnumSingle.INSTANCE;
EnumSingle enumSingle1 = null;
try {
//Constructor singleConstructor = EnumSingle.class.getDeclaredConstructor() 报java.lang.NoSuchMethodException: com.zyy.single.EnumSingle.()
//java.lang.IllegalArgumentException: Cannot reflectively create enum objects
Constructor<EnumSingle> singleConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class);
singleConstructor.setAccessible(true);
enumSingle1 = singleConstructor.newInstance();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
System.out.println(enumSingle);
System.out.println(enumSingle1);
}
}
反编译
拓展:利用反编译小工具反编译
jad -sjava EnumSingle.class
什么是CAS
/**
* CAS compareAndSet 比较并交换
*/
public class SACDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//public final boolean compareAndSet(int expect, int update)
//如果我期望的值达到了,那么就更新,否则,就不更新, CAS是CPU的并发原语
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
Unsafe
源码结合下图看
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
CAS:比较当前工作内存中的值和主内存中的,如果这个值是期望的,那么则执行操作!如果不是就一直循环!
缺点:
ABA问题(狸猫换太子)
public class SACDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//======捣乱的线程=========
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2020));
System.out.println(atomicInteger.get());
//======期望的线程=========
System.out.println(atomicInteger.compareAndSet(2020, 1993));
System.out.println(atomicInteger.get());
}
}
解决ABA问题,引入原子引用!对应的思想:乐观锁
带版本号的原子操作!
注意
Integer使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间。
public class SACDemo {
public static void main(String[] args) {
Integer int_1993 = 1993;
Integer int_2020 = 2020;
Integer int_2021 = 2021;
//AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(int_2020, 1);
new Thread(() -> {
System.out.println("A1->" + reference.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A 2020->2021 " + reference.compareAndSet(int_2020, int_2021, reference.getStamp(), reference.getStamp() + 1));
System.out.println("A2->" + reference.getStamp());
System.out.println("A 2021->2020 " + reference.compareAndSet(int_2021, int_2020, reference.getStamp(), reference.getStamp() + 1));
System.out.println("A3->" + reference.getStamp());
}, "A").start();
new Thread(() -> {
System.out.println("B1->" + reference.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B 2020->1993 " + reference.compareAndSet(int_2020, int_1993, reference.getStamp(), reference.getStamp() + 1));
System.out.println("B2->" + reference.getStamp());
}, "B").start();
}
}
输出
A1->1
B1->1
A 2020->2021 true
A2->2
B 2020->1993 false
B2->3
A 2021->2020 true
A3->3
公平锁:不能插队,必须先来后到!
非公平锁:可以插队(默认都是非公平的)
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁(递归锁)
Synchronized版
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.sms();
}, "A").start();
new Thread(() -> {
phone.sms();
}, "B").start();
}
}
class Phone {
public synchronized void sms() {
System.out.println(Thread.currentThread().getName() + " sms");
//这里也有锁
call();
}
public synchronized void call() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " call");
}
}
Lock版
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(() -> {
phone.sms();
}, "A").start();
new Thread(() -> {
phone.sms();
}, "B").start();
}
}
class Phone2 {
private Lock lock = new ReentrantLock();
public void sms() {
//细节问题:lock.lock(); lock.unlock();
//lock锁必须配对,否则会死在里面
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " sms");
//这里也有锁
call();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call() {
lock.lock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " call");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
spinlock
/**
* 自旋锁
*/
public class SpinlockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
//加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+ " lock");
//自旋锁
while (atomicReference.compareAndSet(null, thread)) {
}
}
//解锁
public void myUnLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+ " unLock");
atomicReference.compareAndSet(thread, null);
}
}
验证
public class TestSpinLock {
public static void main(String[] args) {
// ReentrantLock reentrantLock = new ReentrantLock();
// reentrantLock.lock();
// reentrantLock.unlock();
//底层使用的自旋锁CAS
SpinlockDemo spinlockDemo = new SpinlockDemo();
new Thread(()-> {
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnLock();
}
},"T1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()-> {
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnLock();
}
},"T2").start();
}
}
输出
T1 lock
T2 lock
T2 unLock
T1 unLock
死锁是什么?
死锁测试,怎么排除死锁?
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB), "T1").start();
new Thread(new MyThread(lockB, lockA), "T2").start();
}
}
class MyThread implements Runnable {
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockA + " want to get " + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockB + " want to get " + lockA);
}
}
}
}
输出(下面输出卡住,因为死锁了)
T1 lock:lockA want to get lockB
T2 lock:lockB want to get lockA
解决问题
使用jps定位进程号jps -l
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XLldOhLp-1664064165748)(https://typora-picture1234.oss-cn-shenzhen.aliyuncs.com/typora/img/%E5%BE%AE%E4%BF%A1%E6%88%AA%E5%9B%BE_20210426221031.png)]
使用jstack 端口号
找到死锁问题
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tjlpYxyB-1664064165748)(https://typora-picture1234.oss-cn-shenzhen.aliyuncs.com/typora/img/%E5%BE%AE%E4%BF%A1%E6%88%AA%E5%9B%BE_20210426221205.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o1LtJrip-1664064165749)(https://typora-picture1234.oss-cn-shenzhen.aliyuncs.com/typora/img/%E5%BE%AE%E4%BF%A1%E6%88%AA%E5%9B%BE_20210426221448.png)]