目录
yield()、wait()、sleep()、join()四个方法的介绍
ReentranLlock相对于synchronized的优势
LockSupport中park()和unpark()方法的实现原理
synchronized锁升级的过程
synchronized既保证了原子性,又保证了可见性。
注意上图两条主线:匿名偏向锁启动和没启动两种方式。
synchronized锁优化
锁粒度最好小一些;即 同步代码块中的语句越少越好重量级锁:需要经过操作系统帮忙管理的锁轻量级锁:JVM自己就能进行同步的锁synchronized锁消除
如果JVM通过逃逸分析,发现一个对象只能从一个线程被访问到,则访问这个对象时,可以不加同步锁。如果程序中使用了synchronized锁,则JVM会将synchronized锁消除。注意:这种情况针对的是synchronized锁,而对于Lock锁,则JVM并不能消除。synchronized锁粗化
假如方法中首尾相接,前后相邻的都是同一个锁对象,那 JIT 编译器就会把这几个synchronized块合并成一个大块,加粗加大范围,一次申请锁依用即可,避免次次的申请和释放锁,提升了性能。因为申请锁和释放锁是有操作系统用户态和内核态的切换开销的, 使用一次锁,包括申请,持有到释放,当前进程要进行四次用户态与内核态的切换。
synchronized (o) { System.out.println(1111); } synchronized (o) { System.out.println(222); }读写锁降级
降级是指当前把持住写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。锁降级过程中的读锁的获取是否有必要,答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而直接释放写锁,假设此刻另一个线程获取的写锁,并修改了数据,那么当前线程就步伐感知到线程T的数据更新,如果当前线程遵循锁降级的步骤,那么线程T将会被阻塞,直到当前线程使数据并释放读锁之后,线程T才能获取写锁进行数据更新。对象头信息
无论锁现在是何状态,只要调用wait()方法都会升级为重量级锁。
yield()、wait()、sleep()、join()四个方法的介绍
sleep():
是Thread类的静态方法,当前线程将睡眠n毫秒,线程进入阻塞状态。当睡眠时间到了,会解除阻塞,进入可运行状态,等待CPU的到来。睡眠不释放锁(如果有的话)。
wait():
是Object的方法,必须与synchronized关键字一起使用,线程进入阻塞状态,当notify或者notifyall被调用后,会解除阻塞。但是,只有重新占用互斥锁之后才会进入可运行状态。睡眠时,会释放互斥锁。
注意:因为wait()方法使用在synchronized修饰的范围内,所以:无论锁现在是何状态,只要调用wait()方法都会升级为重量级锁。
yield():
yield方法可以暂停当前正在执行的线程对象,让其它有相同优先级的线程执行。它是一个静态方法而且只保证当前线程放弃CPU执行权而不能保证使其它线程一定能占用CPU,执行yield()的线程有可能在进入到就绪状态后马上又被执行。
join():
意思就是在自己当前线程加入调用Join的线程(),本线程等待。等调用的线程运行完了,自己再去执行。t1和t2两个线程,在t1的某个点上调用了t2.join,它会跑到t2去运行,t1等待t2运行完毕继续t1运行(自己join自己没有意义);通过join()方法的源码可知,它的底层还是wait()方法,一般join() 适用于需要控制线程执行顺序的场景中。新建 T1、T2、T3 三个线程,如何保证它们按顺序执行?用 join 方法。sleep()和wait()方法的异同:
- sleep 方法没有释放锁,而 wait 方法释放了锁 。
- sleep 通常被用于暂停执行;Wait 通常被用于线程间交互/通信
- sleep() 方法执行完成后,线程会自动苏醒。或者可以使用 wait(long timeout)超时后线程会自动苏醒。wait() 方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的 notify() 或者 notifyAll() 方法
- sleep 方法和 wait 方法都可以用来放弃 CPU 一定的时间,不同点在于如果线程持有某个对象的监视器,sleep 方法不会放弃这个对象的监视器,wait方法会放弃这个对象的监视器
为什么wait()的时候必须释放锁?
当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁。此时,调用wait()进入阻塞状态,一直不能退出synchronized代码块;那么,线程B永远无法进入synchronized(obj1)
同步块里,永远没有机会调用notify(),岂不是死锁了!这就涉及一个关键的问题:在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒, 去重新拿锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁。
Thread.sleep(0)的作用是什么?
由于Java采用抢占式的线程调度算法,因此可能会出现某条线程常常获取到CPU控制权的情况,为了让某些优先级比较低的线程也能获取到 CPU 控制权,可以使用Thread.sleep(0)手动触发一次操作系统分配时间片的操作,这也是平衡CPU控制权的一种操作。
异常锁
加了锁synchronized void m(),while(true)不断执行,线程启动,count++ 如果等于5的时候认为的产生异常。这时候如果产生任何异常,就会出现什么情况呢?就会被原来的那些个准备拿到这把锁的程序乱冲进来,程序乱入。这是异常的概念。
public class T { int count = 0; synchronized void m() { System.out.println(Thread.currentThread().getName() + " start"); while (true) { count++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count == 5) { int i = 1 / 0;//此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续 System.out.println(i); } } } public static void main(String[] args) { T t = new T(); Runnable r = new Runnable() { @Override public void run() { t.m(); } }; new Thread(r, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(r, "t2").start(); } }
volatile实现原理
volatile 并不能保证多个线程共同修改 running 变量时所带来的不一致问题,也就是说 volatile 不能替代synchronized1 :保证线程的可见性2 :禁止指令重新排序在 JVM 里面规定了八种原则(happen - before),除了这些之外其他的指令都可以有重排序,保证原子性只是保证这些操作必须要么都完成之后其他才能访问,但是保证了原子性和保证重排序是两回事。
无锁编程
提到多线程编程,就绕不开“锁”,在Java中就是指synchronized关键字和Lock。但锁又是性能杀手,所以很多的前辈大师们研究如何可以不用锁,也能实现线程安全。无锁编程是一个庞大而深入的话题,既涉及底层的CPU架构(例如前面讲的内存屏障),又涉及不同语言的具体实现。一写多读的无锁队列:volatile关键字
多写多读的无锁队列:CAS
CAS机制详解
由于某一些特别常见的操作,老是来回的加锁,加锁的情况特别多,所以干脆java就提供了这些常见的操作这么一些个类,这些类的内部就自动带了锁,当然这些锁的实现并不是synchronized重量级锁,而是CAS的操作来实现的(号称无锁);无锁的操作效率会更高。Atomic类都是“自旋”性质的锁CAS函数,其实是封装的Unsafe类中的一个native函数,如下所示。AtomicInteger封装过的compareAndSet有两个参数。第一个参数e xpect是指变量的旧值(是读出来的值,写回去的时候,希望没有被其 他线程修改,所以称为expect);第二个参数update是指变量的新值 (修改过的,希望写入的值)。当expect等于变量当前的值时,说明 在修改的期间,没有其他线程对此变量进行过修改,所以可以成功写 入,变量被更新为update,返回true;否则返回false。 Unsafe类是整个Concurrent包的基础,里面所有函数都是native 的。具体到compareAndSwapInt函数,如下所示。该函数有4个参数。在前两个参数中,第一个是对象(也就是 Ato micInteger 对象),第二个是对象的成员变量(也就是AtomictInteg er里面包的int变量value),后两个参数保持不变。要特别说明一下第二个参数,它是一个long型的整数,经常被称 为xxxOffset,意思是某个成员变量在对应的类中的内存偏移量(该变 量在内存中的位置),表示该成员变量本身。在Unsafe中专门有一个 函数,把成员变量转化成偏移量,如下所示所有调用CAS的地方,都会先通过这个函数把成员变量转换成一个 Offset。以AtomicInteger为例:从上面代码可以看到,无论是Unsafe还是valueOffset,都是静态 的,也就是类级别的,所有对象共用的。 在转化的时候,先通过反射(getDeclaredField)获取value成员 变量对应的Field对象,再通过objectFieldOffset函数转化成valueOf fset。此处的valueOffset就代表了value变量本身,后面执行CAS操作 的时候,不是直接操作value,而是操作valueOffset。Unsafe类
几乎每个使用 Java开发的工具、软件基础设施、高性能开发库都在底层使用了sun.misc.Unsafe,比如Netty、Cassandra、Hadoop、Kafka等。JDK8的ConcurrentHashMap源码时,发现里面大量用到了Unsafe类的API。
Unsafe类在提升Java运行效率,增强Java语言底层操作能力方面起了很大的作用。但Unsafe类在sun.misc包下,不属于Java标准。
深入了解之后才知道,Unsafe指:该类对于普通的程序员来说是”危险“的,一般应用开发者不会也不应该用到此类。
因为Unsafe类功能过于强大,提供了一些可以绕开JVM的更底层功能。它让Java拥有了像C语言的指针一样操作内存空间的能力,能够提升效率,但也带来了指针的问题。官方并不建议使用,也没提供文档支持,甚至计划在高版本中去掉该类。
但对于开发者来说,了解该类提供的功能更有助于我们学习CAS、并发编程等相关的知识,还是非常有必要学习和了解的。
自旋与阻塞
当一个线程拿不到锁的时候,有以下两种基本的等待策略。 策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操 作系统调度。策略2:不放弃CPU,空转,不断重试,也就是所谓的“自旋”。很显然,如果是单核的CPU,只能用策略1。因为如果不放弃CPU, 那么其他线程无法运行,也就无法释放锁。但对于多CPU或者多核,策 略2就很有用了,因为没有线程切换的开销。 AtomicInteger的实现就用的是“自旋”策略,如果拿不到锁,就 会一直重试。有一点要说明:这两种策略并不是互斥的,可以结合使用。如果 拿不到锁,先自旋几圈;如果自旋还拿不到锁,再阻塞,synchronize d关键字就是这样的实现策略。ABA问题与解决办法
CAS都是基于“值”来做比较的。但如果另外一个线 程把变量的值从A改为B,再从B改回到A,那么尽管修改过两次,可是 在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他 线程修改过,这就是所谓的ABA问题。 要解决 ABA 问题,不仅要比较“值”,还要比较“版本号”,而 这正是 AtomicStamped-Reference做的事情,其对应的CAS函数如下:之前的 CAS只有两个参数,这里的 CAS有四个参数,后两个参数 就是版本号的旧值和新值。 当expectedReference!=对象当前的reference时,说明该数据肯 定被其他线程修改过; 当expectedReference==对象当前的reference时,再进一步比较e xpectedStamp是否等于对象当前的版本号,以此判断数据是否被其他 线程修改过。
Concurrent
因为在Concurrent包中的锁都是“可重入锁”,所以一般都命名为ReentrantX,因为所有的锁。Concurrent 包中的与互斥锁(ReentrantLock)相关类之间的继承层次:I表示界面(Interface),A表示抽象类(AbstractClass),C表示类(Class),$表示内部类。实线表示继承关系,虚线表示引用关系。Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器 (AQS),这个类非常关键,下面会反复提到,该类的父类是Abstract OwnableSynchronizer。本章讲的锁将具备synchronized功能,也就是可以阻塞一个线程。为了实现一把具有 阻塞或唤醒功能的锁,需要几个核心要素:① 需要一个state变量,标记该锁的状态。state变量至少有两个 值:0、1。对state变量的操作,要确保线程安全,也就是会用到CA S。② 需要记录当前是哪个线程持有锁。③ 需要底层支持对一个线程进行阻塞或唤醒操作。④ 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程 安全的无锁队列,也需要用到CAS。针对要素①②,在上面两个类中有对应的体现state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可 重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调 用5次unlock,state减为0。当state=0时,没有线程持有锁,exclusiveOwnerThread=null;当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;当state>;1时,说明该线程重入了该锁。针对要素③,在Unsafe类中,提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。在当前线程中调用park(),该线程就会被阻塞;在另外一个线 程中,调用unpark(Thread t),传入一个被阻塞的线程,就可以唤 醒阻塞在park()地方的线程。尤其是 unpark(Thread t),它实现了一个线程对另外一个线程 的“精准唤醒”。前面讲到的wait()/notify(),notify也只是唤 醒某一个线程,但无法指定具体唤醒哪个线程。针对要素④,在AQS中利用双向链表和CAS实现了一个阻塞队列。如下所示。 阻塞队列是整个AQS核心中的核心
读写锁
和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间可以不用互斥类继承层次
ReadWriteLock是一个接口,内部由两个Lock接口组成ReentrantReadWriteLock实现了该接口,使用方式如下:
当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock。
读写锁实现的基本原理
从表面来看,ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已。什么叫两个视图呢?可以理解为是一把锁,线程分成两类:读线程和写线程。读线程和读线程之间不互斥(可以同时拿到这把锁),读线程和写线程互斥,写线程和写线程也互斥。从下面的构造函数也可以看出,readerLock和writerLock实际共用同一个sync对象。sync对象同互斥锁一样,分为非公平和公平两种策略,并继承自AQS。读锁是共享锁,写锁是独占锁。
ReentranLlock相对于synchronized的优势
1. ReentrantLock有一些功能还是要比synchronized强大的,强大的地方,可以使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行,synchronized如果搞不定的话肯定就阻塞了,但是用ReentrantLock就可以自己决定到底要不要wait。
/*** * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行 * * 可以根据tryLock的返回值来判断是否锁定 * * 也可以指定tryLock的时间 */ void m2() { /* boolean locked = lock.tryLock(); System.out.println("m2 ..." + locked); if(locked) lock.unlock(); */ boolean locked = false; try { locked = lock.tryLock(5, TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (locked) lock.unlock(); } } public static void main(String[] args) { T03_ReentrantLock3 rl = new T03_ReentrantLock3(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); }2. 除了这个之外,ReentrantLock还可以用lock.lockInterruptibly() 这个类,对interrupt()方法做出相应,可以被打断的加锁,如果以这种方式加锁的话我们可以调用一个 t2.interrupt(); 打断线程2的等待。 线程1 上来之后加锁,加锁之后开始睡,睡的没完没了的,被线程1拿到这把锁的话,线程2如果说在想拿到这把锁不太可能,拿不到锁他就会在哪儿等着,如果使用原来的这种lock.lock()是打断不了它的,那么就可以用另外一种方lock.lockInterruptibly() 这个类可以被打断的,当你要想停止线程2就可以用 interrupt() ,这也是ReentrantLock比synchronized好用的一个地方。3. ReentrantLock还可以指定为公平锁,ReentrantLock默认是非公平锁。
Condition
Condition本身也是一个接口,其功能和wait/notify类似:wait()/notify()必须和synchronized一起使用,Condition也是如此,必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition();//所有的Condition都是从Lock中构造出来的 }Condition实现原理
Condition的使用很简洁,避免了 wait/notify 的生产者通知生产者、消费者通知消费者的问题,这是如何做到的呢?下面进入Condition内部一探究竟。因为Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分。下面先分别看一下互斥锁和读写锁中Condition的构造。首先,读写锁中的 ReadLock 是不支持Condition 的,写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承自AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的newCondition。
每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表组成的队列await()实现分析
关于await()的关键点:
(1)线程调用 await()的时候,肯定已经先拿到了锁。所以,在 addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程天生是安全的,代码如下:(2)在线程执行await操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。
(3)线程从await中被唤醒后,必须用acquireQueued(node,savedState)函数重新拿锁。
(4)checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号。这里的await()函数是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()函数也会返回。
(5)isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只在Condition的队列里,而不在AQS的队列里。但执行notity操作的时候,会放进AQS的同步队列。
awaitUninterruptibly()实现分析
与await()不同,awaitUninterruptibly()不会响应中断,其函数的定义中不会有中断异常抛出,整体代码和 await()类似,区别在于收到异常后,不会抛出异常,而是继续执行while循环。signal()实现分析
同 await()一样,在调用 signal()的时候,必须先拿到锁 (否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了。 然后,从队列中取出firstWait,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)函数把这个Node放入AQS的锁对应的阻塞队中。也正因为如此,才有了await()函数里面的判断条件while(!isOnSyncQueue(node)),这个判断条件被满足,说明await线程不是被中断,而是被unpark唤醒的。
知道了notify()实现原理,notifyAll()与此类似。StampedLock
从ReentrantLock到StampedLock,并发度依次提高。因为ReentrantLock采用的是“悲观读”的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程“饿死”。虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。
关于StampedLock的详解看《Java并发实现原理:JDK源码剖析》
AQS机制
reentrantlock、CountDownLatch、CyclicBarrier、Phaser、ReadWriteLock、Semaphore、Exchanger都是用同一个队列,同一个类来实现的,这个类叫AQS。AQS最核心的是它的一个共享的int类型值叫做state,这个state主要是看他的子类是怎么实现的,比如ReentrantLock这个state是用来拿这个state来记录这个线程到底重入了多少次,比如有一个线程拿到state这个把锁了,state的值就从0变成了1,这个线程又重入了一次,state就变成2了,又重入一次就变成3等等,什么时候释放了呢?从3变成2变成1变成0就释放了,这个就是AQS核心的东西,一个数,这个数代表了什么要看子类怎么去实现它,那么在这个 state核心上还会有一堆的线程节点,当然这个节点是node,每个node里面包含一个线程,称为线程节点,这么多的线程节点去争用这个state,谁拿到了state,就表示谁得到了这把锁,AQS得核心就是一个共享的数据,一堆互相抢夺竞争的线程,这个就是AQS。
//JDK源码 final boolean nonfairTrytAcquire(int acquire) { //获取当前线程 final Thread current = Thread.currentThread(); //拿到AQS核心数值 state int c getState(); //如果数值为0说明没人上锁 if (c == 0) { //给当线程上锁 if (compareAndSetState(0, acquires)) { //设置当前线程为独一无二拥有这把锁的线程 setExclusiveOwnerThread(current); return true } } //判断当前线程是否拥有这个把锁 else if (current == getExclusiveOwnerThread) { //设置重入 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count wxceeded"); setState(nextc); return true; } return false; }跟进到tryAcquire(arg)是拿到了这把锁以后的操作,如果拿不到呢?如果拿不到它实际上是调用了acquireQueued()方法,acquireQueued()方法里又调用了addWaiter(Node.EXCLUSIVE)然后后面写一个arg(数值1),方法结构是这样的acquireQueued(addWaiter(Node.EXCLUSIVE),arg)通过acquireQueued这个方法名字猜一下这是干什么的,如果得到这把锁,后面的acquireQueued是不用运行的,如果没有得到这把锁,后面的acquireQueued()才需要运行,那么没有得到这把锁的时候它会运行acquireQueued,Queued队列,acquire获得,跑到队列里去获得,那意思是什么?排队去,那排队的时候需要传递两个参数,第一个参数是某个方法的返回值addWaiter(Node.EXCLUSIVE),这个方法的名字addWaiter,Waiter等待者,addWaiter添加一个等待者,用什么样的方式呢?Node.EXCLUSIVE排他形式,意思就是把当线程作为排他形式扔到队列里边。
//JDK源码 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { //判断是否得到锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } } }个 addWaiter() 方法,这个方法意思是说你添加等待者的时候,使用的是什么类型,如果这个线程是Node.EXCLUSIVE 那么就是排他锁, Node.SHARED 就是共享锁,首先是获得当前要加进等待者队列的线程的节点,然后是一个死循环,这意思就是说我不干成这件事我誓不罢休,那它干了一件什么事呢
//JDK源码 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { //判断是否得到锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter(Node mode) { //获取当前要加进来的线程的node(节点) Node node = new Node(mode); for (; ; ) { //回想一下AQS数据结构图 Node oldTail = tail; if (oldTail != null) { //把我们这个新节点的前置节点设置在等待队列的末端 node.setPrevRelaved(oldTail);//CAS操作,把我们这个新节点设置为tail末端 if (compareAndAetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSuncQueue(); } } } }AQS数据结构图,就是他有一个int类型的数叫state,然后在state下面排了一个队列,这个队列是个双向的链表有一个head和一个tail,现在你要往这个队列中加一个节点上来, 要排队嘛,我们仔细想一下加节点的话,应该得加到这个队列的末端是不是?它是怎么做到的呢?首先把tail记录在oldTail里,oldTail指向这个tail了,如果oldTail不等于空,它会把我们这个新节点的前置节点设置在这个队列的末端,接下来再次用到CAS操作,把我们这个新的节点设置为tail,整段代码看似繁琐,其实很简单,就是要把当前要加进等待者队列的线程的节点加到等待队列的末端,这里提一点,加到末端为什么要用CAS操作呢?因为CAS效率高,这个问题关系到AQS的核心操作,理解了这一点,你就理解了AQS为什么效率高,我们接着讲源码,这个增加线程节点操作,如果没有成功,那么就会不断的试,一直试到我们的这个node节点被加到线程队列末端为止,意思就是说,其它的节点也加到线程
队列末端了,我无非就是等着你其它的线程都加到末端了,我加最后一个,不管怎么样我都要加到线程末端去为止。源码读总结得出,AQS(AbstractQueuedSynchronizer)的核心就是用CAS(compareAndSet)去操作head和tail,就是说用CAS操作代替了锁整条双向链表的操作同步队列和等待队列
在之前的AQS文章中,我们提到了同步队列,本节我们又提到了等待队列,那他们两者是如何协同工作的?
AbstractQueuedSynchronizer内部维护着一个同步队列(双向链表实现),多个条件队列(单向链表实现),条件队列由AbstractQueuedSynchronizer的内部类ConditionObject来维护,new一个ConditonObject ,则多一个条件队列,当一个线程执行await方法是,会把当线程包装成一个Node节点,放到执行await方法的ConditionObject的条件队列中,释放锁并被阻塞,当执行signal方式时,会把条件队列的第一个节点移除,并转移到同步队列中,获取到锁即可继续执行
ConditionObject 是AbstractQueuedSynchronizer的一个内部类,用来实现条件队列,属性如下:
public class ConditionObject implements Condition, java.io.Serializable { // 条件队列的头节点 private transient Node firstWaiter; // 条件队列的尾节点 private transient Node lastWaiter; public ConditionObject() { } // 阻塞过程中不响应中断,仅设置标志位,让之后的方法处理 private static final int REINTERRUPT = 1; // 阻塞过程中响应中断,并throw InterruptedException private static final int THROW_IE = -1; }假如在阻塞过程中发生了中断,REINTERRUPT标志了中断发生在 signalled之后,
THROW_IE标志了中断发生在 signalled之前,从而决定采用那种方式响应中断并发工具类:AQS有哪些作用?(一)_Java识堂的博客-CSDN博客
并发工具类:AQS有哪些作用?(二)_Java识堂的博客-CSDN博客
关于AQS源码解读看这个------《多线程与高并发-0》-马士兵教育
同步工具类
Semaphore
Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示。假设有 n 个线程来获取Semaphore里面的资源(n>;10),n 个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如图4-2所示。CountDownLatch
使用场景:一个主线程要等待10个 Worker 线程工作完毕才退出,就能使用CountDownLatch来实现。
CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。
使用场景:10个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把10个人看作10个线程,10个线程之间的同步过程如图4-5所示。在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。具体到每个线程的run()方法中,就是下面的伪代码:CyclicBarrier基于ReentrantLock+Condition实现
说明:
(1)CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。(2)CyclicBarrier 会响应中断。10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()函数。然后count被重置为初始值(parties),重新开始。(3)上面的回调函数,barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次Phaser
用Phaser替代CyclicBarrier和CountDownLatch;从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。关于 Phaser 的详解看《Java并发实现原理:JDK源码剖析》Semaphore
别的工具类都是一次只能一个线程获得锁,而Semaphore一次可以多个线程获得锁。
信号量;可以往里面传一个数,permits是允许的数量。Semaphore的含义就是限流,比如说你在买票,Semaphore写5就是只能有5个人可以同时买票。 acquire的意思叫获得这把锁,线程如果想继续往下执行,必须得从Semaphore里面获得一个许可,他一共有5个许可用到0了你就得给我等着。默认Semaphore是非公平的,new Semaphore(2, true)第二个值传true才是设置公平。
- Semaphore(信号量)-允许多个线程同时访问: synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
- CountDownLatch (倒计时器): CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有count个线程的任务都执行完毕,才会接着执行后面的逻辑。
- 使用场景:一个主线程要等待10个 Worker 线程工作完毕才退出,就能使用
- CyclicBarrier(循环栅栏): CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。 CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
- CyclicBarrier强大的地方:
(1) CyclicBarrier是可以被重用的(CountDownLatch不可以被重用) 。以上一节的应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。(2)CyclicBarrier 会响应中断。10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()函数。然后count被重置为初始值(parties),重新开始。重要
以后一般不用这些新的锁,多数都用synchronized。只有特别特别追求效率的时候才会用到这些新的锁。
LockSupport、淘宝面试题与源码阅读方法论
在以前要阻塞和唤醒某一个具体的线程有很多限制比如:1、因为wait()方法需要释放锁,所以必须在synchronized中使用,否则会抛出异常IllegalMonitorStateException2、notify()方法也必须在synchronized中使用,并且应该指定对象3、synchronized()、wait()、notify()对象必须一致,一个synchronized()代码块中只能有一个线程调用wait()或notify()。以上诸多限制,体现出了很多的不足,所以LockSupport的好处就体现出来了。在JDK1.6中的java.util.concurrent的子包locks中引了LockSupport这个API,LockSupport是一个比较底层的工具类,用来创建锁和其他同步工具类的基本线程阻塞原语。java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park()和 LockSupport .unpark()的方法,来实现线程的阻塞和唤醒的。先来看一个小程序:
public class T13_TestLockSupport { public static void main(String[] args) { //使用lombda表达式创建一个线程t Thread t = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(i); if(i == 5) { //使用LockSupport的park()方法阻塞当前线程t LockSupport.park(); } try { //使当前线程t休眠1秒 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }); //启动当前线程t t.start(); } }从以上的小程序中,我们不难看出 LockSupport 使用起来的是比较灵灵活的,没有了所谓的限制。来分析一下代码的执行过程,首先使用lombda 表达式创建了线程对象 " t " ,然后通过 " t " 对象调用线程的启动方法start() ,然后再看线程的内容,在 for 循环中,当 i 的值等于 5 的时候,调用了LockSupport 的 .park() 方法使当前线程阻塞,注意看方法并没有加锁,就默认使当前线程阻塞了,由此可以看出LockSupprt.park() 方法并没有加锁的限制。再来看一个小程序:
public class T13_TestLockSupport { public static void main(String[] args) { //使用lombda表达式创建一个线程t Thread t = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(i); if (i == 5) { //使用LockSupport的park()方法阻塞当前线程t LockSupport.park(); } try { //使当前线程t休眠1秒 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }); //启动当前线程t t.start(); // 唤醒线程t LockSupport.unpark(t); } }我们来分析一下以上小程序,我们只需要在第一个小程序的主线程中,调用 LockSupport 的 unpark() 方法,就可以唤醒某个具体的线程,这里我们指定了线程 " t " ,代码运行以后结果显而易见,线程并没有被阻塞,我们成功唤醒了线程 " t " ,在这里还有一点,需要我们来分析一下,在主线程中线程 " t " 调用了 start() 方法以后,因为紧接着执行了 LockSupport 的 unpark() 方法,所以也就是说,在线程 " t " 还没有执行还没有被阻塞的时候,已经调用了 LockSupport 的 unpark() 方法来唤醒线程 " t " ,之后线程 "t " 才启动调用了 LockSupport 的 park() 来使线程 " t " 阻塞,但是线程 " t " 并没有被阻塞,由此可以看 出, LockSupport 的 unpark() 方法可以先于 LockSupport 的 park() 方法执行。再来看最后一个小程序:
public class T13_TestLockSupport { public static void main(String[] args) { //使用lombda表达式创建一个线程t Thread t = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(i); if (i == 5) { //调用LockSupport的park()方法阻塞当前线程t LockSupport.park(); } if (i == 8) { //调用LockSupport的park()方法阻塞当前线程t LockSupport.park(); } try { //使当前线程t休眠1秒 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }); //启动当前线程t t.start(); //唤醒线程t LockSupport.unpark(t); } }我们来分析一下以上小程序,在第二个小程序的基础上又添加了一个if判断,在i等于8的时候再次调用LockSupport的park()方法来使线程 " t " 阻塞,我们可以看到线程被阻塞了,原因是LockSupport的unpark()方法就像是获得了一个“令牌”,而LockSupport的park()方法就像是在识别“令牌”,当主线程调用了LockSupport.unpark(t)方法也就说明线程 " t " 已经获得了”令牌”,当线程 " t " 再调用LockSupport的park()方法时,线程 " t " 已经有令牌了,这样他就会马上再继续运行,也就不会被阻塞了,但是当i等于8的时候线程 " t " 再次调用了LockSupport的park()方法使线程再次进入阻塞状态,这个时候“令牌”已经被使用作废掉了,也就无法阻塞线程 " t " 了,而且如果主线程处于等待“令牌”状态时,线程 " t " 再次调用了LockSupport的park()方法,那么线程 " t "就会永远阻塞下去,即使调用unpark()方法也无法唤醒了。由以上三个小程序可以总结得出以下几点:
1、LockSupport不需要synchornized加锁就可以实现线程的阻塞和唤醒2、LockSupport.unpartk()可以先于LockSupport.park()执行,并且线程不会阻塞3、如果一个线程处于等待状态,连续调用了两次park()方法,就会使该线程永远无法被唤醒
并发容器
BlockingDeque的介绍在另一篇博客
CopyOnWrite
CopyOnWriteArrayList 相对于 ArrayList 线程安全,底层通过复制数组的方式来实现,其核心概念就是: 数据读取时直接读取,不需要锁,数据写入时,需要锁,且对副本进行操作。那么当数据的操作以读取为主时,我们便可以省去大量的读锁带来的消耗。同时为了能让多线程操作List时,一个线程的修改能被另一个线程立马发现,CopyOnWriteList采用了Volatile关键词来修饰整体数组同时在底层数组元素的add()方法中使用ReentrantLock的方式保证修改底层数组元素时是线程安全的,即每次数据读取不从缓存里面读取,而是直接从数据的内存地址中读取。1、 CopyOnWriteArrayList
CopyOnArrayList的几个“读”函数:既然这些“读”函数都没有加锁,那么是如何保证 "线程安全" 呢?答案在 "写" 函数里面
public boolean add(E e) { final ReentrantLock lock = this.lock; //加锁(读取频繁使用乐观锁,写入频繁使用悲观锁,注意这是add()欧(写入频繁)) //reentrantLock默认非公平锁,同时是悲观锁的设计思想 lock.lock(); try { //获取原有的数据数组 Object[] elements = getArray(); //数组的长度 int len = elements.length; //拷贝---长度为len + 1 Object[] newElements = Arrays.copyOf(elements, len + 1); //将新添加的数据设置到最后 newElements[len] = e; //将array指向新的newElements --内存可见(array变量被volatile修饰) setArray(newElements); return true; } finally { //释放锁 lock.unlock(); } }CopyOnWriteArrayList的线程安全保障
- 采用
ReentrantLock
保证了同一时刻只有一个写的线程在复制。- 数组变量array是核心的,因为用volatile修饰了写时复制完成后将旧的数组引用指向新的数组;根据
volatile
的happens-before
规则,线程对数组引用的修改对线程是可见的;所以,其他线程立马可以看到最新的数组。- 由于在写数据的时候,是在新的数组中插入数据的,从而保证读写实在两个不同的数据容器中进行操作。
读取频繁使用乐观锁,写入频繁使用悲观锁,由于CopyOnWriteArrayList是一个容器,写入的频率重要性高于读取;所以使用悲观锁。reentrantLock默认非公平锁,同时是悲观锁的设计思想。
注意:
由于当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。所以在往CopyOnWriteArrayList添加数据的时候不要一个个添加,最好的方式是通过带参数的构造方法或者调用addAll的方式添加数据,减少底层的每次拷贝而占用内存和频繁的导致GC操作。
优点:
- 读写是分离的
- 读线程之间是互不阻塞的
- 牺牲数据的实时性而保证了最终数据的一致性。即读线程对数据的更新是延迟感知的,因此这种情况下读线程不存在等待的情况。
缺点:
- 内存占用的问题:因为CopyOnWrite是写的时候复制机制,所以在写操作的时候。内存中会存在两个对象的内存,旧的对象和新写入的对象。注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)如果对象内存占用比较大有可能造成频繁的GC。
- CopyOnWrite容器保证最终的数据一致性而不能保证数据的实时一致性。
其实写时复制是一种很经典的解决线程冲突的方式,在Redis中也有使用:
Redis写时拷贝(COW)总结 - 掘金----很牛的一篇文章
ForkJoinPool
ForkJoinPool是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,只不过这里的并行不是多台机器并行计算,而是多个线程并行计算。相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。而利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。ForkJoinPool数据结构,不同于ThreadPoolExector,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。工作窃取队列
关于上面的全局队列,有一个关键点需要说明:它并非使用BlockingQueue,而是基于一个普通的数组得以实现。 这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务。所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。这个队列只有三个操作:(1)Worker线程自己,在队列头部,通过对queueTop指针执行加、减操作,实现入队或出队,这是单线程的。(2)其他Worker线程,在队列尾部,通过对queueBase进行累加,实现出队操作,也就是窃取,这是多线程的,需要通过CAS操作。正因为如此,在上面的数据结构定义中,queueTop 不是 volatile 的,queueBase 是 volatile类型。这个队列,在Dynamic Ci rcular Work-Stealing Deque这篇论文中被称为dynamic-cyclic-array。之所以这样命名,是因为有两个关键点:(1)整个队列是环形的,也就是一个数组实现的RingBuffer。并且queueBase会一直累加,不会减小;queueTop会累加、减小。最后,queueBase、queueTop的值都会大于整个数组的长度,只是计算数组下标的时候,会取queueTop&(queue.length-1),queueBase&(queue.length-1)。因为queue.length是2的整数次方,这里也就是对queue.length进行取模操作。当queueTop-queueBase=queue.length-1 的时候,队列为满,此时需要扩容;当queueTop=queueBase的时候,队列为空,Worker线程即将进入阻塞状态。(2)当队列满了之后会扩容,所以被称为是动态的。但这就涉及一个棘手的问题:多个线程同时在读写这个队列,如何实现在不加锁的情况下一边读写、一边扩容呢?通过分析工作窃取队列的特性,我们会发现:在 queueBase 一端,是多线程访问的,但它们只会使queueBase变大,也就是使队列中的元素变少。所以队列为满,一定发生在queueTop一端,对queueTop进行累加的时候,这一端却是单线程的!队列的扩容恰好利用了这个单线程的特性!即在扩容过程中,不可能有其他线程对queueTop进行修改,只有线程对queueBase进行修改!图7-4所示为工作窃取队列扩容示意图。扩容之后,数组长度变成之前的二倍,但queueTop、queueBase的值是不变的!通过queueTop、 queueBase对新的数组长度取模,仍然可以定位到元素在新数组中的位置。ForkJoinWorkThread状态与个数分析
在 ThreadPoolExecutor 中 , 有 corePoolSize 和 maxmiumPoolSize两个参数联合控制总的线程数,而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数。那么,ForkJoinPool在实际的运行过程中,线程数究竟是由哪些因素决定的呢?要回答这个问题,先得明白ForkJoinPool中的线程都可能有哪几种状态?可能的状态有三种:(1)空闲状态(放在Treiber Stack里面)。(2)活跃状态(正在执行某个ForkJoinTask,未阻塞)。(3)阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join,等待另外一个任务的结果返回)。ctl变量很好地反映出了三种状态:高32位:u=(int)(ctl>>>32),然后u又拆分成tc、ac 两个16位;低32位:e=(int)ctl。(1)e>0,说明Treiber Stack不为空,有空闲线程;e=0,说明没有空闲线程;(2)ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出parallelism;(3)tc>0,说明总线程数 >parallelism。 tc与 ac的差值,也就是总线程数与活跃线程数的差异,在ForkJoinPool中有另外一个变量blockedCount记录,如下:所以,通过crl和blockedCount这两个变量,可以知道在整个ForkJoinPool中,所有空闲线程、活跃线程以及阻塞线程的数量。当一个新任务到来时,发现既没有空闲线程,也没有活跃线程,所有线程都阻塞着,在等待任务返回,此时便会开新线程来执行任务。Worker线程的阻塞—唤醒机制
ForkerJoinPool 没有使用 BlockingQueue,也就不曾利用其阻塞 —唤醒机制,而是利用了park/unpark原语,并自行实现了Treiber St ack。下面进行详细分析ForkerJoinPool,在阻塞和唤醒的时候,分别 是如何入栈的。ForkJoinTask的fork/join
如果局部队列、全局中的任务全部是相互独立的,就很简单了。 但问题是,对于分治算法来说,分解出来的一个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成。这种依赖关系是通过ForkJoinTask中的join()来体现的。关于这部分知识参考------Java并发实现原理:JDK源码剖析ForkJoinPool的优雅关闭
同 ThreadPoolExecutor 一样,ForkJoinPool 的关闭也不可能是 “瞬时的”,而是需要一个平滑的过渡过程。线程池已经进入了关闭状态。但线程池进入关闭状态,不代表所有的线程都会立马关闭。为此,在ForkJoinWorkerThread里还有一个terminate变量,初始为false。当线程池要关闭的时候,会把相关线程的terminate变量置为true。这样,这些线程就会退出上面的while循环,也就会自动退出。