翻译:分支合并
ForkJoin在JDk1.7,并行执行任务!提高效率,数据量大!
大数据中的一个概念: Map Reduce,把大任务拆分为小任务.
ForkJoin的 特点: 工作窃取
这个里面维护的都是双端队列
b线程执行完了 去帮a执行一个任务
具体使用:
在用CompletableFuture编写多线程时,如果需要处理异常,可以用exceptionally,它的作用相当于catch。
exceptionally的特点:
当出现异常时,会触发回调方法exceptionally
exceptionally中可指定默认返回结果,如果出现异常,则返回默认的返回结果
public class Demo02 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// completableFuture.get(); // 获取阻塞执行结果
// 有返回值的 supplyAsync 异步回调
// ajax,成功和失败的回调
// 失败返回的是错误信息;
//给一个生产型的接口实现
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
// int i = 10 / 0;
return 200;
});
//给个消费类接口 当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action
Integer integer = completableFuture.whenComplete((t, u) -> {
System.out.println(Thread.currentThread().getName() + "whenComplete");
System.out.println("t=>" + t);// 正常的返回结果 200
System.out.println("u=>" + u);// 错误信息 输出:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
//需要这样链式编程
}).exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable e) {
System.out.println(Thread.currentThread().getName() + "exceptionally");
System.out.println(e.getMessage());
return 500;// 可以获取到错误的返回结果
}
}).get();
System.out.println(integer);
}
/**
* ForkJoinPool.commonPool-worker-19supplyAsync=>Integer
* ForkJoinPool.commonPool-worker-19whenComplete
* t=>200
* u=>null
* 200
*
*
*
*
* int i = 10 / 0;
* ForkJoinPool.commonPool-worker-19supplyAsync=>Integer
* ForkJoinPool.commonPool-worker-19whenComplete
* t=>null
* u=>java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
* ForkJoinPool.commonPool-worker-19exceptionally
* java.lang.ArithmeticException: / by zero
* 500
*
* Process finished with exit code 0
*/
}
线程的工作内存对应虚拟机栈中的栈帧的局部变量表,主存相当于堆
volatile的JVM提供的轻量级的同步机制。
可见性。(可见性和JMM挂钩)
不保证原子性。
禁止指令重排。
什么是JMM?
Java的内存模型,是一个概念,不存在。
JMM同步约定:
线程解锁前,必须把共享变量立即刷回主内存。
线程枷锁前,必须读取主内存中最新值到工作内存中。
加锁和解锁是同一把锁。
Java内存模型定义了8种操作来完成,虚拟机实现必须保证每一种操作都是原子的、不可再拆分的(double和long类型例外)。
lock(锁定):作用于主内存的变量,它把一个变量标识为一条线程独占的状态。
unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用。
load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
use(使用):作用于工作内存的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作。
assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后的write操作使用。
write(写入):作用于主内存的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中。
8种操作必须满足的规则:
不允许read和load、store和write操作之一单独出现。即不允许一个变量从主内存读取了但工作内存不接受;或者从工作内存发起回写了但主内存不接受的情况出现。
不允许一个线程丢弃它的最近的assign操作。即变量在工作内存中改变了之后必须把该变化同步回主内存。
不允许一个线程无原因地(没有发生过任何assign操作)把数据从线程的工作内存同步回主内存。
一个新的变量只能在主内存中“诞生”,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量,换句话说,就是对一个变量实施use、store操作之前,必须先执行过了assign和load操作。
一个变量在同一时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。
如果对一个变量执行lock操作,那将会清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行load或assign操作初始化变量的值。
如果一个变量事先没有被lock操作锁定,那就不允许对它执行unlock操作,也不允许去unlock一个被其他线程锁定住的变量。
对一个变量执行unlock操作之前,必须先把此变量同步回主内存中(执行store、write操作)。
指令重排
你写的程序,计算机并不一定你写的去执行。
源代码->编译器优化的重排->指令并行也可能会重排->内存系统也会重排->执行
处理器在进行指令重排时,考虑数据之间的依赖性。
volatile避免指令重排:
内存屏障(CPU的指令)。
保证特定操作的执行顺序。
可以保证某些变量的内存可见性。
public class JMMDemo {
private static int num=0;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while (num==0){
//不能加sout,因为 printIn() 方法使用了 synchronized 同步代码块,可以保证原子性与可见性,它是 PrintStream 类的方法
}
}).start();
TimeUnit.SECONDS.sleep(1);
num=1;
System.out.println(num);
}
}
// 发现JVM 不会停止
private volatile static int num=0;
改了这一行后 程序就退出了 volatile保证了可见性(可见性和JMM挂钩)
拓展:ACID:原子性、一致性、隔离性、持久性
原子性:不可分割性
线程A在执行任务的时候,不能被打扰,也不能被分割。要么同时成功,要么同时失败
public class volatileDemo2 {
//下面加了volatile 没效果 不保证原子性
private volatile static int num =0;
//下面加一个synchronized 肯定能解决
public static void add(){
num++;
}
public static void main(String[] args) {
//理论值为2w
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//其他线程没完 主线程先等等 礼让
Thread.yield();
}
System.out.println(num);
}
}
上面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对volatile 变量的修改对另一个线程可见, 不能保证原子性,仅用在一个写线程,多个读线程的情况。
如果不加lock和synchronized , 怎么样保证原子性
线程1加一值为1但是线程二拿到的时候是0,加一还是1。所以两次操作值还是1,肯定就少了
使用原子类解决
AtomicInteger的value 是volatile的 同时保证了原子性和可见性
public class volatileDemo2 {
//这里volatile不用加了
private static AtomicInteger num =new AtomicInteger();
public static void add(){
num.getAndIncrement(); //CAS 调用底层操作系统的 在内存中修改值 比 加锁效率高
}
public static void main(String[] args) {
//理论值为2w
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//其他线程没完 主线程先等等 礼让
Thread.yield();
}
System.out.println(num);
}
}
什么是指令重排
指令重排简单来说可以,在程序结果不受影响的前提下,可以调整指令语句执行顺序。多线程下指令重排会影响正确性。
指令重排序是指编译器或CPU为了优化程序的执行性能而对指令进行重新排序的一种手段,重排序会带来可见性问题,所以在多线程开发中必须要关注并规避重排序。
从源代码到最终运行的指令,会经过如下两个阶段的重排序。
第一阶段,编译器重排序,就是在编译过程中,编译器根据上下文分析对指令进行重排序,目的是减少CPU和内存的交互,重排序之后尽可能保证CPU从寄存器或缓存行中读取数据。
在前面分析JIT优化中提到的循环表达式外提(Loop Expression Hoisting)就是编译器层面的重排序,从CPU层面来说,避免了处理器每次都去内存中加载stop,减少了处理器和内存的交互开销。
第二阶段,处理器重排序,处理器重排序分为两个部分。
- 并行指令集重排序,这是处理器优化的一种,处理器可以改变指令的执行顺序。
- 内存系统重排序,这是处理器引入Store Buffer缓冲区延时写入产生的指令执行顺序不一致的问题。
例子:
int num = 0;
// volatile 修饰的变量,可以禁用指令重排 volatile boolean ready = false; 可以防止变量之前的代码被重排序
boolean ready = false; //加volatile
// 线程1 执行此方法
public void actor1(I_Result r) {
//volatile 保证了读ready时后面的读的num 都是主内存里最新的
if(ready) {
r.r1 = num + num;
}
else {
r.r1 = 1;
}
}
// 线程2 执行此方法
public void actor2(I_Result r) {
num = 2;
ready = true; //volatile保证了ready写时 前面的已经写了到主内存中
}
在多线程环境下,以上的代码 r1 的值有三种情况:
第一种:线程 2 先执行,然后线程 1 后执行,r1 的结果为 4
第二种:线程 1 先执行,然后线程 2 后执行,r1 的结果为 1
第三种:线程 2 先执行,但是发送了指令重排,num = 2 与 ready = true 这两行代码语序发生装换,
原文链接:https://blog.csdn.net/weixin_50280576/article/details/113532093
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WycGJmBD-1667810599492)(http://bijioss.donggei.top/image-20220929124853911.png)]
经过重排序之后,如果t1和t2线程同时运行,这个结果从人的视角来看,就有点类似于t1线程中a=1的修改结果对t2线程不可见,同样t2线程中b=1的执行结果对t1线程不可见。
volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)
对 volatile 变量的写指令后会加入写屏障
对 volatile 变量的读指令前会加入读屏障
写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
// 懒汉式: 用的时候创建
public class LazyMan {
private LazyMan() {
}
// volatile保证lazyMan创建过程中不进行指令重排
private volatile static LazyMan lazyMan;
// 双重检验锁 DCL懒汉(Double Check Lock)
public static LazyMan getLazyMan() {
但是因为用到了synchronized,会导致很大的性能开销,并且加锁其实只需要在第一次初始化的时候用到,之后的调用都没必要再进行加锁。先判断对象是否已经被初始化,再决定要不要加锁。第二个lazyMan == null: 如果都进到了拿锁那一行,拿到锁再判断一下是不是null
if (lazyMan == null) {
// 获取LazyMan这个类的锁
synchronized (LazyMan.class) {
if (lazyMan == null) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lazyMan = new LazyMan();
}
}
}
return lazyMan;
}
// 模拟多线程
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "lazyMan : " + getLazyMan());
}, "" + i).start();
}
}
}
实例化对象的那行代码
实际上可以分解成以下三个步骤:
但是有些编译器为了性能的原因,可能会将第二步和第三步进行重排序,顺序就成了:
扩展:玩转单例类
/** * 饿汉式单例模式,一加载就创建单例对象 */ public class Hungry { private Hungry(){ } private static final Hungry hungry = new Hungry(); public static Hungry getInstance(){ return hungry; } } /** * 懒汉式单例模式 */ public class LazyMan { private LazyMan() { System.out.println(Thread.currentThread().getName()); } private static LazyMan lazyMan; public static LazyMan getInstance(){ if (lazyMan==null){ lazyMan = new LazyMan(); } return lazyMan; } // 多线程并发创建对象存在问题。 public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ LazyMan.getInstance(); }).start(); } } } /** * 懒汉式单例模式 */ public class LazyMan01 { private LazyMan01(){ System.out.println(Thread.currentThread().getName()); } // 加上volatile关键字禁止指令重排 private volatile static LazyMan01 lazyMan01; /** * 双重检测锁模式,DCL懒汉式 * @return */ public static LazyMan01 getInstance(){ if (lazyMan01==null){ synchronized(LazyMan01.class){ if (lazyMan01==null){ lazyMan01 = new LazyMan01(); } } } return lazyMan01; } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ LazyMan01.getInstance(); }).start(); } } } 对于上面的两种单例模式,使用反射可以简单破解单例 private Hungry(){ } private static final Hungry hungry = new Hungry(); public static Hungry getInstance(){ return hungry; } public static void main(String[] args) throws InvocationTargetException, InstantiationException, IllegalAccessException { Constructor<?> declaredConstructor = Hungry.class.getDeclaredConstructors()[0]; declaredConstructor.setAccessible(true); Object o = declaredConstructor.newInstance(); Hungry instance = Hungry.getInstance(); System.out.println(o); System.out.println(instance); } //解决一:把构造函数改成这样 private Hungry(){ synchronized (Hungry.class){ if (hungry!=null){ throw new RuntimeException("不要试图使用反射破坏"); } } } // 还能破坏: public static void main(String[] args) throws InvocationTargetException, InstantiationException, IllegalAccessException { // LazyMan01 instance = LazyMan01.getInstance(); // System.out.println(instance); Constructor<?> declaredConstructor = LazyMan01.class.getDeclaredConstructors()[0]; declaredConstructor.setAccessible(true); Object o1 = declaredConstructor.newInstance(); Object o2 = declaredConstructor.newInstance(); System.out.println(o1); System.out.println(o2); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
这样又解决了:但是如果知道你这个属性的名字,知道你使用的这种方法, 也可以通过反射破解
在反射的源码里:定义了反射不能创建枚举 可以使用枚举解决[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BOn9K5XR-1667810599498)(http://bijioss.donggei.top/image-20220929221656346.png)]
在上面的cas中底层调用的getAndAddInt就是自旋锁
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
自己写的自旋锁
public class SpinlockDemo {
原子引用想 锁的是一个线程
// 默认值:
// int 0
//thread null
AtomicReference<Thread> atomicReference=new AtomicReference<>();
//加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(thread.currentThread().getName()+"===> mylock");
//自旋锁 如果是null就把这个atomicReference赋值这个锁,如果没锁就加锁
//如果不是null 就一直等着 (其他线程进来这个方法之后,就一直等着)
while (!atomicReference.compareAndSet(null,thread)){
System.out.println(Thread.currentThread().getName()+" ==> 自旋中~");
}
}
//解锁
public void myUnlock(){
Thread thread=Thread.currentThread();
System.out.println(thread.currentThread().getName()+"===> myUnlock");
//如果是我期望的这个线程那就把这个线程改成null
atomicReference.compareAndSet(thread,null);
}
}
public class TestSpinLock {
public static void main(String[] args) throws InterruptedException {
//ReentrantLock reentrantLock = new ReentrantLock();
//reentrantLock.lock();
//reentrantLock.unlock();
//使用CAS实现自旋锁
SpinlockDemo spinlockDemo=new SpinlockDemo();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
},"t1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
},"t2").start();
}
}
t2进程必须等待t1进程Unlock后,才能Unlock,在这之前进行自旋等待
产生死锁必须同时满足以下四个条件,只要其中任一条件不成立,死锁就不会发生。
(1)互斥条件:进程要求对所分配的资源(如打印机)进行排他性控制,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。
(2)不剥夺条件:进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能由获得该资源的进程自己来释放(只能是主动释放)。
(3)请求和保持条件:进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。
(4)循环等待条件:存在一种进程资源的循环等待链,链中每一个进程已获得的资源同时被链中下一个进程所请求。即存在一个处于等待状态的进程集合{Pl, P2, …, pn},其中Pi等 待的资源被P(i+1)占有(i=0, 1, …, n-1),Pn等待的资源被P0占有,
// 例子不好理解,其实是对"lockA"和"lockB" 这两个常量进行加锁和解锁
这里的锁对象是上面的定义的两个字符串,T1线程的A锁是字符串A,T2线程的A锁是字符串B,这样两个线程获取字符串AB的锁就卡死了
public class DeadLock {
public static void main(String[] args) {
String lockA= "lockA";
String lockB= "lockB";
new Thread(new MyThread(lockA,lockB),"t1").start();
new Thread(new MyThread(lockB,lockA),"t2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+" lock"+lockA+"===>get"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+" lock"+lockB+"===>get"+lockA);
}
}
}
}
解决死锁
1 看日志
2 jstack 堆栈信息
java bin 目录下有很多工具 有一个工具叫jps