• 多线程难点


    目录

    synchronized锁升级的过程

    yield()、wait()、sleep()、join()四个方法的介绍 

    sleep()和wait()方法的异同:

    为什么wait()的时候必须释放锁?

    Thread.sleep(0)的作用是什么?

    异常锁

    volatile实现原理

    无锁编程  

    CAS机制详解 

    自旋与阻塞

    ABA问题与解决办法

    Concurrent

            锁的公平性与非公平性

            公平与非公平的lock()的实现差异

    读写锁  

    ReentranLlock相对于synchronized的优势

    Condition 

    await()实现分析

     signal()实现分析

    StampedLock

    AQS机制

    同步队列和等待队列

    同步工具类 

    Semaphore

    ​CountDownLatch

    CyclicBarrier  

    Phaser  

    Semaphore

    LockSupport

    LockSupport中park()和unpark()方法的实现原理

    并发容器  

    CopyOnWrite

    1、 CopyOnWriteArrayList

    ForkJoinPool

    ​编辑工作窃取队列

    ForkJoinWorkThread状态与个数分析

    Worker线程的阻塞—唤醒机制

    ForkJoinTask的fork/join

    ForkJoinPool的优雅关闭

    shutdown()与shutdownNow()的区别


    synchronized锁升级的过程

    synchronized既保证了原子性,又保证了可见性。

    深入并发-Synchronized 

    注意上图两条主线:匿名偏向锁启动和没启动两种方式。

    synchronized锁优化

    锁粒度最好小一些;即 同步代码块中的语句越少越好
    重量级锁:需要经过操作系统帮忙管理的锁
    轻量级锁:JVM自己就能进行同步的锁

    synchronized锁消除

    如果JVM通过逃逸分析,发现一个对象只能从一个线程被访问到,则访问这个对象时,可以不加同步锁。如果程序中使用了synchronized锁,则JVM会将synchronized锁消除。
    注意:这种情况针对的是synchronized锁,而对于Lock锁,则JVM并不能消除。

    synchronized锁粗化

    假如方法中首尾相接,前后相邻的都是同一个锁对象,那 JIT 编译器就会把这几个synchronized块合并成一个大块,加粗加大范围,一次申请锁依用即可,避免次次的申请和释放锁,提升了性能。因为申请锁和释放锁是有操作系统用户态和内核态的切换开销的, 使用一次锁,包括申请,持有到释放,当前进程要进行四次用户态与内核态的切换。
    1. synchronized (o) { 
    2.     System.out.println(1111);
    3. synchronized (o) { 
    4.     System.out.println(222);
    5. }

    读写锁降级

    降级是指当前把持住写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
    锁降级过程中的读锁的获取是否有必要,答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而直接释放写锁,假设此刻另一个线程获取的写锁,并修改了数据,那么当前线程就步伐感知到线程T的数据更新,如果当前线程遵循锁降级的步骤,那么线程T将会被阻塞,直到当前线程使数据并释放读锁之后,线程T才能获取写锁进行数据更新。

    对象头信息

    无论锁现在是何状态,只要调用wait()方法都会升级为重量级锁。

    yield()、wait()、sleep()、join()四个方法的介绍 

    sleep():

    是Thread类的静态方法,当前线程将睡眠n毫秒,线程进入阻塞状态。当睡眠时间到了,会解除阻塞,进入可运行状态,等待CPU的到来。睡眠不释放锁(如果有的话)。

    wait():

    是Object的方法,必须与synchronized关键字一起使用,线程进入阻塞状态,当notify或者notifyall被调用后,会解除阻塞。但是,只有重新占用互斥锁之后才会进入可运行状态。睡眠时,会释放互斥锁。

    注意:因为wait()方法使用在synchronized修饰的范围内,所以:无论锁现在是何状态,只要调用wait()方法都会升级为重量级锁。

    yield():

    yield方法可以暂停当前正在执行的线程对象,让其它有相同优先级的线程执行。它是一个静态方法而且只保证当前线程放弃CPU执行权而不能保证使其它线程一定能占用CPU,执行yield()的线程有可能在进入到就绪状态后马上又被执行。

    join():

    意思就是在自己当前线程加入调用Join的线程(),本线程等待。等调用的线程运行完了,自己再去执行。t1t2两个线程,在t1的某个点上调用了t2.join,它会跑到t2去运行,t1等待t2运行完毕继续t1运行(自己join自己没有意义)通过join()方法的源码可知,它的底层还是wait()方法,一般join() 适用于需要控制线程执行顺序的场景中。 
    新建 T1、T2、T3 三个线程,如何保证它们按顺序执行?
    join 方法。

    sleep()和wait()方法的异同:

    • sleep 方法没有释放锁,而 wait 方法释放了锁 。
    • sleep 通常被用于暂停执行;Wait 通常被用于线程间交互/通信
    • sleep() 方法执行完成后,线程会自动苏醒。或者可以使用 wait(long timeout)超时后线程会自动苏醒。wait() 方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的 notify() 或者 notifyAll() 方法
    • sleep 方法和 wait 方法都可以用来放弃 CPU 一定的时间,不同点在于如果线程持有某个对象的监视器,sleep 方法不会放弃这个对象的监视器,wait方法会放弃这个对象的监视器

    为什么wait()的时候必须释放锁?

    当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁。此时,调用wait()进入阻塞状态,一直不能退出synchronized代码块;那么,线程B永远无法进入synchronized(obj1)

    同步块里,永远没有机会调用notify(),岂不是死锁了!这就涉及一个关键的问题:在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒, 去重新拿锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁。

    Thread.sleep(0)的作用是什么?

    由于Java采用抢占式的线程调度算法,因此可能会出现某条线程常常获取到CPU控制权的情况,为了让某些优先级比较低的线程也能获取到 CPU 控制权,可以使用Thread.sleep(0)手动触发一次操作系统分配时间片的操作,这也是平衡CPU控制权的一种操作。

    异常锁

    加了锁synchronized void m()while(true)不断执行,线程启动,count++ 如果等于5的时候认为的产生异常。这时候如果产生任何异常,就会出现什么情况呢?
    就会被原来的那些个准备拿到这把锁的程序乱冲进来,程序乱入。这是异常的概念。
    1. public class T {
    2. int count = 0;
    3. synchronized void m() {
    4. System.out.println(Thread.currentThread().getName() + " start");
    5. while (true) {
    6. count++;
    7. System.out.println(Thread.currentThread().getName() + " count = " + count);
    8. try {
    9. TimeUnit.SECONDS.sleep(1);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. if (count == 5) {
    14. int i = 1 / 0;//此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续
    15. System.out.println(i);
    16. }
    17. }
    18. }
    19. public static void main(String[] args) {
    20. T t = new T();
    21. Runnable r = new Runnable() {
    22. @Override public void run() {
    23. t.m();
    24. }
    25. };
    26. new Thread(r, "t1").start();
    27. try {
    28. TimeUnit.SECONDS.sleep(3);
    29. } catch (InterruptedException e) {
    30. e.printStackTrace();
    31. }
    32. new Thread(r, "t2").start();
    33. }
    34. }

    volatile实现原理

    volatile 并不能保证多个线程共同修改 running 变量时所带来的不一致问题,也就是说 volatile 不能替代synchronized
    1 :保证线程的可见性
    2 :禁止指令重新排序
    JVM 里面规定了八种原则(happen - before),除了这些之外其他的指令都可以有重排序,保证原子性只是保证这些操作必须要么都完成之后其他才能访问,但是保证了原子性和保证重排序是两回事。

    无锁编程  

    提到多线程编程,就绕不开“锁”,在Java中就是指synchronized关键字和Lock。但锁又是性能杀手,所以很多的前辈大师们研究如何可以不用锁,也能实现线程安全。无锁编程是一个庞大而深入的话题,既涉及底层的CPU架构(例如前面讲的内存屏障),又涉及不同语言的具体实现。

    一写多读的无锁队列:volatile关键字

    多写多读的无锁队列:CAS

    CAS机制详解 

    由于某一些特别常见的操作,老是来回的加锁,加锁的情况特别多,所以干脆java就提供了这些常见的操作这么一些个类,这些类的内部就自动带了锁,当然这些锁的实现并不是synchronized重量级锁,而是CAS的操作来实现的(号称无锁);无锁的操作效率会更高。
    Atomic类都是“自旋”性质的锁
    CAS函数,其实是封装的Unsafe类中的一个native函数,如下所示。
    AtomicInteger封装过的compareAndSet有两个参数。第一个参数e xpect是指变量的旧值(是读出来的值,写回去的时候,希望没有被其 他线程修改,所以称为expect);第二个参数update是指变量的新值 (修改过的,希望写入的值)。当expect等于变量当前的值时,说明 在修改的期间,没有其他线程对此变量进行过修改,所以可以成功写 入,变量被更新为update,返回true;否则返回false。 Unsafe类是整个Concurrent包的基础,里面所有函数都是native 的。具体到compareAndSwapInt函数,如下所示。
    该函数有4个参数。在前两个参数中,第一个是对象(也就是 Ato micInteger 对象),第二个是对象的成员变量(也就是AtomictInteg er里面包的int变量value),后两个参数保持不变。
    要特别说明一下第二个参数,它是一个long型的整数,经常被称 为xxxOffset,意思是某个成员变量在对应的类中的内存偏移量(该变 量在内存中的位置),表示该成员变量本身。在Unsafe中专门有一个 函数,把成员变量转化成偏移量,如下所示
    所有调用CAS的地方,都会先通过这个函数把成员变量转换成一个 Offset。以AtomicInteger为例:

    从上面代码可以看到,无论是Unsafe还是valueOffset,都是静态 的,也就是类级别的,所有对象共用的。 在转化的时候,先通过反射(getDeclaredField)获取value成员 变量对应的Field对象,再通过objectFieldOffset函数转化成valueOf fset。此处的valueOffset就代表了value变量本身,后面执行CAS操作 的时候,不是直接操作value,而是操作valueOffset。

    Unsafe类

    几乎每个使用 Java开发的工具、软件基础设施、高性能开发库都在底层使用了sun.misc.Unsafe,比如Netty、Cassandra、Hadoop、Kafka等。JDK8的ConcurrentHashMap源码时,发现里面大量用到了Unsafe类的API。

    Unsafe类在提升Java运行效率,增强Java语言底层操作能力方面起了很大的作用。但Unsafe类在sun.misc包下,不属于Java标准。

    深入了解之后才知道,Unsafe指:该类对于普通的程序员来说是”危险“的,一般应用开发者不会也不应该用到此类。

    因为Unsafe类功能过于强大,提供了一些可以绕开JVM的更底层功能。它让Java拥有了像C语言的指针一样操作内存空间的能力,能够提升效率,但也带来了指针的问题。官方并不建议使用,也没提供文档支持,甚至计划在高版本中去掉该类。

    但对于开发者来说,了解该类提供的功能更有助于我们学习CAS、并发编程等相关的知识,还是非常有必要学习和了解的。

    自旋与阻塞

    当一个线程拿不到锁的时候,有以下两种基本的等待策略。 策略1:放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操 作系统调度。
    策略2:不放弃CPU,空转,不断重试,也就是所谓的“自旋”。
    很显然,如果是单核的CPU,只能用策略1。因为如果不放弃CPU, 那么其他线程无法运行,也就无法释放锁。但对于多CPU或者多核,策 略2就很有用了,因为没有线程切换的开销。 AtomicInteger的实现就用的是“自旋”策略,如果拿不到锁,就 会一直重试。
    有一点要说明:这两种策略并不是互斥的,可以结合使用。如果 拿不到锁,先自旋几圈;如果自旋还拿不到锁,再阻塞,synchronize d关键字就是这样的实现策略。

    ABA问题与解决办法

    CAS都是基于“值”来做比较的。但如果另外一个线 程把变量的值从A改为B,再从B改回到A,那么尽管修改过两次,可是 在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他 线程修改过,这就是所谓的ABA问题。 要解决 ABA 问题,不仅要比较“值”,还要比较“版本号”,而 这正是 AtomicStamped-Reference做的事情,其对应的CAS函数如下:
    之前的 CAS只有两个参数,这里的 CAS有四个参数,后两个参数 就是版本号的旧值和新值。 当expectedReference!=对象当前的reference时,说明该数据肯 定被其他线程修改过; 当expectedReference==对象当前的reference时,再进一步比较e xpectedStamp是否等于对象当前的版本号,以此判断数据是否被其他 线程修改过。

    Concurrent

    因为在Concurrent包中的锁都是“可重入锁”,所以一般都命名为ReentrantX,因为所有的锁。
    Concurrent 包中的与互斥锁(ReentrantLock)相关类之间的继承层次:
    I表示界面(Interface),A表示抽象类(AbstractClass),C表示类(Class),$表示内部类。实线表示继承关系,虚线表示引用关系。
    Lock是一个接口,其定义如下:
    常用的方法是lock()/unlock()。lock()不能被中断,对应 的lockInterruptibly()可以被中断。 ReentrantLock本身没有代码逻辑,实现都在其内部类Sync中。

    锁的公平性与非公平性

    Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别 对应公平锁和非公平锁。从下面的ReentrantLock构造函数可以看出, 会传入一个布尔类型的变量fair指定锁是公平的还是非公平的,默认 为非公平的。

    锁实现的基本原理  

    Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器 (AQS),这个类非常关键,下面会反复提到,该类的父类是Abstract OwnableSynchronizer。
    本章讲的锁将具备synchronized功能,也就是可以阻塞一个线程。为了实现一把具有 阻塞或唤醒功能的锁,需要几个核心要素:
    ① 需要一个state变量,标记该锁的状态。state变量至少有两个 值:0、1。对state变量的操作,要确保线程安全,也就是会用到CA S。
    ② 需要记录当前是哪个线程持有锁。
    ③ 需要底层支持对一个线程进行阻塞或唤醒操作。
    ④ 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程 安全的无锁队列,也需要用到CAS。
    针对要素①②,在上面两个类中有对应的体现
    state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可 重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调 用5次unlock,state减为0。
            当state=0时,没有线程持有锁,exclusiveOwnerThread=null;
            当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
            当state>;1时,说明该线程重入了该锁。
    针对要素③,在Unsafe类中,提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。
    在当前线程中调用park(),该线程就会被阻塞;在另外一个线 程中,调用unpark(Thread t),传入一个被阻塞的线程,就可以唤 醒阻塞在park()地方的线程。
    尤其是 unpark(Thread t),它实现了一个线程对另外一个线程 的“精准唤醒”。前面讲到的wait()/notify(),notify也只是唤 醒某一个线程,但无法指定具体唤醒哪个线程。
    针对要素④,在AQS中利用双向链表和CAS实现了一个阻塞队列。
    如下所示。 阻塞队列是整个AQS核心中的核心

    公平与非公平的lock()的实现差异

    分析基于AQS,ReentrantLock在公平性和非公平性上的实现差异

    unlock()实现分析

    unlock不区分公平还是非公平.

    读写锁  

    和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间可以不用互斥

    类继承层次

    ReadWriteLock是一个接口,内部由两个Lock接口组成

     ReentrantReadWriteLock实现了该接口,使用方式如下:

    当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock。

    读写锁实现的基本原理

    从表面来看,ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已。什么叫两个视图呢?可以理解为是一把锁,线程分成两类:读线程和写线程。读线程和读线程之间不互斥(可以同时拿到这把锁),读线程和写线程互斥,写线程和写线程也互斥。从下面的构造函数也可以看出,readerLock和writerLock实际共用同一个sync对象。sync对象同互斥锁一样,分为非公平和公平两种策略,并继承自AQS。

    读锁是共享锁,写锁是独占锁。 

    ReentranLlock相对于synchronized的优势

    1. ReentrantLock有一些功能还是要比synchronized强大的,强大的地方,可以使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行,synchronized如果搞不定的话肯定就阻塞了,但是用ReentrantLock就可以自己决定到底要不要wait。
    1. /***
    2. * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
    3. * * 可以根据tryLock的返回值来判断是否锁定
    4. * * 也可以指定tryLock的时间 */
    5. void m2() {
    6. /*
    7. boolean locked = lock.tryLock();
    8. System.out.println("m2 ..." + locked);
    9. if(locked) lock.unlock();
    10. */
    11. boolean locked = false;
    12. try {
    13. locked = lock.tryLock(5, TimeUnit.SECONDS);
    14. System.out.println("m2 ..." + locked);
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. } finally {
    18. if (locked) lock.unlock();
    19. }
    20. }
    21. public static void main(String[] args) {
    22. T03_ReentrantLock3 rl = new T03_ReentrantLock3();
    23. new Thread(rl::m1).start();
    24. try {
    25. TimeUnit.SECONDS.sleep(1);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. new Thread(rl::m2).start();
    30. }
    2. 除了这个之外,ReentrantLock还可以用lock.lockInterruptibly() 这个类,对interrupt()方法做出相应,可以被打断的加锁,如果以这种方式加锁的话我们可以调用一个 t2.interrupt(); 打断线程2的等待。 线程1 上来之后加锁,加锁之后开始睡,睡的没完没了的,被线程1拿到这把锁的话,线程2如果说在想拿到这把锁不太可能,拿不到锁他就会在哪儿等着,如果使用原来的这种lock.lock()是打断不了它的,那么就可以用另外一种方lock.lockInterruptibly() 这个类可以被打断的,当你要想停止线程2就可以用 interrupt() ,这也是ReentrantLocksynchronized好用的一个地方。
    3. ReentrantLock还可以指定为公平锁,ReentrantLock默认是非公平锁。

    Condition 

    Condition本身也是一个接口,其功能和wait/notify类似:
    wait()/notify()必须和synchronized一起使用,Condition也是如此,必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口:
    1. public interface Lock {
    2. void lock();
    3. void lockInterruptibly() throws InterruptedException;
    4. boolean tryLock();
    5. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    6. void unlock();
    7. Condition newCondition();//所有的Condition都是从Lock中构造出来的
    8. }

    Condition实现原理

    Condition的使用很简洁,避免了 wait/notify 的生产者通知生产者、消费者通知消费者的问题,这是如何做到的呢?下面进入Condition内部一探究竟。因为Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分。下面先分别看一下互斥锁和读写锁中Condition的构造。
    首先,读写锁中的 ReadLock 是不支持Condition 的,写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承自AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的newCondition。

     

    每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表组成的队列

    await()实现分析

    关于await()的关键点:

    (1)线程调用 await()的时候,肯定已经先拿到了锁。所以,在 addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程天生是安全的,代码如下:

    (2)在线程执行await操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。

    (3)线程从await中被唤醒后,必须用acquireQueued(node,savedState)函数重新拿锁。

    (4)checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号。这里的await()函数是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()函数也会返回。

    (5)isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只在Condition的队列里,而不在AQS的队列里。但执行notity操作的时候,会放进AQS的同步队列。

    awaitUninterruptibly()实现分析

    与await()不同,awaitUninterruptibly()不会响应中断,其函数的定义中不会有中断异常抛出,整体代码和 await()类似,区别在于收到异常后,不会抛出异常,而是继续执行while循环。

     signal()实现分析

    同 await()一样,在调用 signal()的时候,必须先拿到锁 (否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了。 然后,从队列中取出firstWait,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)函数把这个Node放入AQS的锁对应的阻塞队中。也正因为如此,才有了await()函数里面的判断条件while(!isOnSyncQueue(node)),这个判断条件被满足,说明await线程不是被中断,而是被unpark唤醒的。

    知道了notify()实现原理,notifyAll()与此类似。

    StampedLock

    从ReentrantLock到StampedLock,并发度依次提高。因为ReentrantLock采用的是“悲观读”的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程“饿死”。虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。

    关于StampedLock的详解看《Java并发实现原理:JDK源码剖析》

    AQS机制

    reentrantlock、CountDownLatch、CyclicBarrier、Phaser、ReadWriteLock、Semaphore、Exchanger都是用同一个队列,同一个类来实现的,这个类叫AQS。
    AQS最核心的是它的一个共享的int类型值叫做state,这个state主要是看他的子类是怎么实现的,比如ReentrantLock这个state是用来拿这个state来记录这个线程到底重入了多少次,比如有一个线程拿到state这个把锁了,state的值就从0变成了1,这个线程又重入了一次,state就变成2了,又重入一次就变成3等等,什么时候释放了呢?从3变成2变成1变成0就释放了,这个就是AQS核心的东西,一个数,这个数代表了什么要看子类怎么去实现它,那么在这个 state核心上还会有一堆的线程节点,当然这个节点是node,每个node里面包含一个线程,称为线程节点,这么多的线程节点去争用这个state,谁拿到了state,就表示谁得到了这把锁,AQS得核心就是一个共享的数据,一堆互相抢夺竞争的线程,这个就是AQS
    1. //JDK源码
    2. final boolean nonfairTrytAcquire(int acquire) {
    3. //获取当前线程
    4. final Thread current = Thread.currentThread();
    5. //拿到AQS核心数值
    6. state int c getState();
    7. //如果数值为0说明没人上锁
    8. if (c == 0) {
    9. //给当线程上锁
    10. if (compareAndSetState(0, acquires)) {
    11. //设置当前线程为独一无二拥有这把锁的线程
    12. setExclusiveOwnerThread(current);
    13. return true
    14. }
    15. }
    16. //判断当前线程是否拥有这个把锁
    17. else if (current == getExclusiveOwnerThread) {
    18. //设置重入
    19. int nextc = c + acquires;
    20. if (nextc < 0) throw new Error("Maximum lock count wxceeded");
    21. setState(nextc);
    22. return true;
    23. }
    24. return false;
    25. }
    跟进到tryAcquire(arg)是拿到了这把锁以后的操作,如果拿不到呢?如果拿不到它实际上是调用了acquireQueued()方法,acquireQueued()方法里又调用了addWaiter(Node.EXCLUSIVE)然后后面写一个arg(数值1),方法结构是这样的acquireQueued(addWaiter(Node.EXCLUSIVE),arg)通过acquireQueued这个方法名字猜一下这是干什么的,如果得到这把锁,后面的acquireQueued是不用运行的,如果没有得到这把锁,后面的acquireQueued()才需要运行,那么没有得到这把锁的时候它会运行acquireQueuedQueued队列,acquire获得,跑到队列里去获得,那意思是什么?排队去,那排队的时候需要传递两个参数,第一个参数是某个方法的返回值addWaiter(Node.EXCLUSIVE),这个方法的名字addWaiterWaiter等待者,addWaiter添加一个等待者,用什么样的方式呢?Node.EXCLUSIVE排他形式,意思就是把当线程作为排他形式扔到队列里边。
    1. //JDK源码
    2. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    3. public final void acquire(int arg) {
    4. //判断是否得到锁
    5. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
    6. selfInterrupt();
    7. }
    8. }
    9. }
    addWaiter() 方法,这个方法意思是说你添加等待者的时候,使用的是什么类型,如果这个线程是Node.EXCLUSIVE 那么就是排他锁, Node.SHARED 就是共享锁,首先是获得当前要加进等待者队列的线程的节点,然后是一个死循环,这意思就是说我不干成这件事我誓不罢休,那它干了一件什么事呢
    1. //JDK源码
    2. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    3. public final void acquire(int arg) { //判断是否得到锁
    4. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
    5. }
    6. private Node addWaiter(Node mode) {
    7. //获取当前要加进来的线程的node(节点)
    8. Node node = new Node(mode);
    9. for (; ; ) {
    10. //回想一下AQS数据结构图
    11. Node oldTail = tail;
    12. if (oldTail != null) {
    13. //把我们这个新节点的前置节点设置在等待队列的末端
    14. node.setPrevRelaved(oldTail);//CAS操作,把我们这个新节点设置为tail末端
    15. if (compareAndAetTail(oldTail, node)) {
    16. oldTail.next = node;
    17. return node;
    18. }
    19. } else {
    20. initializeSuncQueue();
    21. }
    22. }
    23. }
    24. }

     AQS数据结构图,就是他有一个int类型的数叫state,然后在state下面排了一个队列,这个队列是个双向的链表有一个head和一个tail,现在你要往这个队列中加一个节点上来, 要排队嘛,我们仔细想一下加节点的话,应该得加到这个队列的末端是不是?它是怎么做到的呢?首先把tail记录在oldTail里,oldTail指向这个tail了,如果oldTail不等于空,它会把我们这个新节点的前置节点设置在这个队列的末端,接下来再次用到CAS操作,把我们这个新的节点设置为tail,整段代码看似繁琐,其实很简单,就是要把当前要加进等待者队列的线程的节点加到等待队列的末端,这里提一点,加到末端为什么要用CAS操作呢?因为CAS效率高,这个问题关系到AQS的核心操作,理解了这一点,你就理解了AQS为什么效率高,我们接着讲源码,这个增加线程节点操作,如果没有成功,那么就会不断的试,一直试到我们的这个node节点被加到线程队列末端为止,意思就是说,其它的节点也加到线程

    队列末端了,我无非就是等着你其它的线程都加到末端了,我加最后一个,不管怎么样我都要加到线程末端去为止。
    源码读总结得出,AQS(AbstractQueuedSynchronizer)的核心就是用
    CAS(compareAndSet)去操作head和tail,就是说用CAS操作代替了锁整条双向链表的操作

    同步队列和等待队列

    在之前的AQS文章中,我们提到了同步队列,本节我们又提到了等待队列,那他们两者是如何协同工作的?

    AbstractQueuedSynchronizer内部维护着一个同步队列(双向链表实现),多个条件队列(单向链表实现),条件队列由AbstractQueuedSynchronizer的内部类ConditionObject来维护,new一个ConditonObject ,则多一个条件队列,当一个线程执行await方法是,会把当线程包装成一个Node节点,放到执行await方法的ConditionObject的条件队列中,释放锁并被阻塞,当执行signal方式时,会把条件队列的第一个节点移除,并转移到同步队列中,获取到锁即可继续执行

    ConditionObject 是AbstractQueuedSynchronizer的一个内部类,用来实现条件队列,属性如下:

    1. public class ConditionObject implements Condition, java.io.Serializable {
    2.     // 条件队列的头节点
    3.     private transient Node firstWaiter;
    4.     // 条件队列的尾节点
    5.     private transient Node lastWaiter;
    6.     public ConditionObject() { }
    7.     // 阻塞过程中不响应中断,仅设置标志位,让之后的方法处理
    8.     private static final int REINTERRUPT =  1;
    9.     // 阻塞过程中响应中断,并throw InterruptedException
    10.     private static final int THROW_IE    = -1;
    11. }

    假如在阻塞过程中发生了中断,REINTERRUPT标志了中断发生在 signalled之后,
    THROW_IE标志了中断发生在 signalled之前,从而决定采用那种方式响应中断

    并发工具类:AQS有哪些作用?(一)_Java识堂的博客-CSDN博客

    并发工具类:AQS有哪些作用?(二)_Java识堂的博客-CSDN博客
    关于AQS源码解读看这个------《多线程与高并发-0》-马士兵教育

    同步工具类 

    Semaphore

    Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示。

    假设有 n 个线程来获取Semaphore里面的资源(n>;10),n 个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

    当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如图4-2所示。

     CountDownLatch

     使用场景:一个主线程要等待10个 Worker 线程工作完毕才退出,就能使用CountDownLatch来实现。

    CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平
    之分。

    使用场景:10个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把10个人看作10个线程,10个线程之间的同步过程如图4-5所示。

    在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。具体到每个线程的run()方法中,就是下面的伪代码:

    CyclicBarrier基于ReentrantLock+Condition实现 

     

    说明:

    (1)CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
    (2)CyclicBarrier 会响应中断。10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()函数。然后count被重置为初始值(parties),重新开始。
    (3)上面的回调函数,barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次

    Phaser  

    用Phaser替代CyclicBarrier和CountDownLatch;从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。
    关于 Phaser   的详解看《Java并发实现原理:JDK源码剖析》

    Semaphore

    别的工具类都是一次只能一个线程获得锁,而Semaphore一次可以多个线程获得锁。

    信号量;可以往里面传一个数,permits是允许的数量。
    Semaphore的含义就是限流,比如说你在买票,Semaphore5就是只能有5个人可以同时买票。 acquire的意思叫获得这把锁,线程如果想继续往下执行,必须得从Semaphore里面获得一个许可,他一共有5个许可用到0了你就得给我等着。
    默认Semaphore是非公平的,new Semaphore(2, true)第二个值传true才是设置公平。
    • Semaphore(信号量)-允许多个线程同时访问: synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
    • CountDownLatch (倒计时器): CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有count个线程的任务都执行完毕,才会接着执行后面的逻辑。
    • 使用场景:一个主线程要等待10个 Worker 线程工作完毕才退出,就能使用
    • CyclicBarrier(循环栅栏): CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。 CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
    • CyclicBarrier强大的地方:
    • (1) CyclicBarrier是可以被重用的(CountDownLatch不可以被重用) 。以上一节的应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
      (2)CyclicBarrier 会响应中断。10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()函数。然后count被重置为初始值(parties),重新开始。

    重要

    以后一般不用这些新的锁,多数都用synchronized。只有特别特别追求效率的时候才会用到这些新的锁。

    LockSupport、淘宝面试题与源码阅读方法论

    在以前要阻塞和唤醒某一个具体的线程有很多限制比如:
    1、因为wait()方法需要释放锁,所以必须在synchronized中使用,否则会抛出异常
    IllegalMonitorStateException
    2notify()方法也必须在synchronized中使用,并且应该指定对象
    3synchronized()wait()notify()对象必须一致,一个synchronized()代码块中只能有一个线程调用wait()notify()。
    以上诸多限制,体现出了很多的不足,所以LockSupport的好处就体现出来了。
    JDK1.6中的java.util.concurrent的子包locks中引了LockSupport这个APILockSupport是一个比较底层的工具类,用来创建锁和其他同步工具类的基本线程阻塞原语。java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park() LockSupport .unpark()的方法,来实现线程的阻塞和唤醒的。先来看一个小程序:
    1. public class T13_TestLockSupport {
    2. public static void main(String[] args) {
    3. //使用lombda表达式创建一个线程t
    4. Thread t = new Thread(() -> {
    5. for (int i = 0; i < 10; i++) {
    6. System.out.println(i);
    7. if(i == 5) {
    8. //使用LockSupport的park()方法阻塞当前线程t
    9. LockSupport.park();
    10. }
    11. try {
    12. //使当前线程t休眠1秒
    13. TimeUnit.SECONDS.sleep(1);
    14. } catch (InterruptedException e) {
    15. e.printStackTrace();
    16. }
    17. }
    18. });
    19. //启动当前线程t
    20. t.start();
    21. }
    22. }
    从以上的小程序中,我们不难看出 LockSupport 使用起来的是比较灵灵活的,没有了所谓的限制。来分析一下代码的执行过程,首先使用lombda 表达式创建了线程对象 " t " ,然后通过 " t " 对象调用线程的启动方法start() ,然后再看线程的内容,在 for 循环中,当 i 的值等于 5 的时候,调用了LockSupport .park() 方法使当前线程阻塞,注意看方法并没有加锁,就默认使当前线程阻塞了,由此可以看出LockSupprt.park() 方法并没有加锁的限制。
    再来看一个小程序:

    1. public class T13_TestLockSupport {
    2. public static void main(String[] args) {
    3. //使用lombda表达式创建一个线程t
    4. Thread t = new Thread(() -> {
    5. for (int i = 0; i < 10; i++) {
    6. System.out.println(i);
    7. if (i == 5) {
    8. //使用LockSupport的park()方法阻塞当前线程t
    9. LockSupport.park();
    10. }
    11. try {
    12. //使当前线程t休眠1秒
    13. TimeUnit.SECONDS.sleep(1);
    14. } catch (InterruptedException e) {
    15. e.printStackTrace();
    16. }
    17. }
    18. });
    19. //启动当前线程t
    20. t.start();
    21. // 唤醒线程t
    22. LockSupport.unpark(t);
    23. }
    24. }
    我们来分析一下以上小程序,我们只需要在第一个小程序的主线程中,调用 LockSupport unpark()
    法,就可以唤醒某个具体的线程,这里我们指定了线程 " t " ,代码运行以后结果显而易见,线程并没有
    被阻塞,我们成功唤醒了线程 " t " ,在这里还有一点,需要我们来分析一下,在主线程中线程 " t "
    用了 start() 方法以后,因为紧接着执行了 LockSupport unpark() 方法,所以也就是说,在线程 " t "
    没有执行还没有被阻塞的时候,已经调用了 LockSupport unpark() 方法来唤醒线程 " t " ,之后线程 "t " 才启动调用了 LockSupport park() 来使线程 " t " 阻塞,但是线程 " t " 并没有被阻塞,由此可以看 出, LockSupport unpark() 方法可以先于 LockSupport park() 方法执行。
    再来看最后一个小程序:
    1. public class T13_TestLockSupport {
    2. public static void main(String[] args) {
    3. //使用lombda表达式创建一个线程t
    4. Thread t = new Thread(() -> {
    5. for (int i = 0; i < 10; i++) {
    6. System.out.println(i);
    7. if (i == 5) {
    8. //调用LockSupport的park()方法阻塞当前线程t
    9. LockSupport.park();
    10. }
    11. if (i == 8) {
    12. //调用LockSupport的park()方法阻塞当前线程t
    13. LockSupport.park();
    14. }
    15. try {
    16. //使当前线程t休眠1秒
    17. TimeUnit.SECONDS.sleep(1);
    18. } catch (InterruptedException e) {
    19. e.printStackTrace();
    20. }
    21. }
    22. });
    23. //启动当前线程t
    24. t.start();
    25. //唤醒线程t
    26. LockSupport.unpark(t);
    27. }
    28. }
    我们来分析一下以上小程序,在第二个小程序的基础上又添加了一个if判断,在i等于8的时候再次调用LockSupport的park()方法来使线程 " t " 阻塞,我们可以看到线程被阻塞了,原因是LockSupport的unpark()方法就像是获得了一个令牌,而LockSupportpark()方法就像是在识别令牌,当主线程调用了LockSupport.unpark(t)方法也就说明线程 " t " 已经获得了令牌,当线程 " t " 再调用LockSupport的park()方法时,线程 " t " 已经有令牌了,这样他就会马上再继续运行,也就不会被阻塞了,但是当i等于8的时候线程 " t " 再次调用了LockSupportpark()方法使线程再次进入阻塞状态,这个时候“令牌已经被使用作废掉了,也就无法阻塞线程 " t " 了,而且如果主线程处于等待令牌状态时,线程 " t " 再次调用了LockSupportpark()方法,那么线程 " t "就会永远阻塞下去,即使调用unpark()方法也无法唤醒了。

    由以上三个小程序可以总结得出以下几点:

    1LockSupport不需要synchornized加锁就可以实现线程的阻塞和唤醒
    2LockSupport.unpartk()可以先于LockSupport.park()执行,并且线程不会阻塞
    3、如果一个线程处于等待状态,连续调用了两次park()方法,就会使该线程永远无法被唤醒

    LockSupportpark()unpark()方法的实现原理

    park()unpark()方法的实现是由Unsefa类提供的,而Unsefa类是由CC++语言完成的,其实原理也是比较好理解的,它主要通过一个变量作为一个标识,变量值在01之间来回切换,当这个变量大于0的时候线程就获得了“令牌,从这一点我们不难知道,其实park()unpark()方法就是在改变这个变量的值,来达到线程的阻塞和唤醒的,具体实现不做赘述了。

    并发容器  

    BlockingDeque的介绍在另一篇博客

    CopyOnWrite

    CopyOnWriteArrayList 相对于 ArrayList 线程安全,底层通过复制数组的方式来实现,其核心概念就是: 数据读取时直接读取,不需要锁,数据写入时,需要锁,且对副本进行操作。那么当数据的操作以读取为主时,我们便可以省去大量的读锁带来的消耗。同时为了能让多线程操作List时,一个线程的修改能被另一个线程立马发现,CopyOnWriteList采用了Volatile关键词来修饰整体数组同时在底层数组元素的add()方法中使用ReentrantLock的方式保证修改底层数组元素时是线程安全的,即每次数据读取不从缓存里面读取,而是直接从数据的内存地址中读取。

    1、 CopyOnWriteArrayList

    CopyOnArrayList的几个“读”函数:
    既然这些“读”函数都没有加锁,那么是如何保证 "线程安全" 呢?答案在 "写" 函数里面
    1. public boolean add(E e) {
    2. final ReentrantLock lock = this.lock;
    3. //加锁(读取频繁使用乐观锁,写入频繁使用悲观锁,注意这是add()欧(写入频繁))
    4. //reentrantLock默认非公平锁,同时是悲观锁的设计思想
    5. lock.lock();
    6. try {
    7. //获取原有的数据数组
    8. Object[] elements = getArray();
    9. //数组的长度
    10. int len = elements.length;
    11. //拷贝---长度为len + 1
    12. Object[] newElements = Arrays.copyOf(elements, len + 1);
    13. //将新添加的数据设置到最后
    14. newElements[len] = e;
    15. //将array指向新的newElements --内存可见(array变量被volatile修饰)
    16. setArray(newElements);
    17. return true;
    18. } finally {
    19. //释放锁
    20. lock.unlock();
    21. }
    22. }

    CopyOnWriteArrayList的线程安全保障

    • 采用ReentrantLock保证了同一时刻只有一个写的线程在复制。
    • 数组变量array是核心的,因为用volatile修饰了写时复制完成后将旧的数组引用指向新的数组;根据volatilehappens-before规则,线程对数组引用的修改对线程是可见的;所以,其他线程立马可以看到最新的数组。
    • 由于在写数据的时候,是在新的数组中插入数据的,从而保证读写实在两个不同的数据容器中进行操作。

    读取频繁使用乐观锁,写入频繁使用悲观锁,由于CopyOnWriteArrayList是一个容器,写入的频率重要性高于读取;所以使用悲观锁。reentrantLock默认非公平锁,同时是悲观锁的设计思想。

    注意

    由于当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。所以在往CopyOnWriteArrayList添加数据的时候不要一个个添加,最好的方式是通过带参数的构造方法或者调用addAll的方式添加数据,减少底层的每次拷贝而占用内存和频繁的导致GC操作

    优点:

    • 读写是分离的
    • 读线程之间是互不阻塞的
    • 牺牲数据的实时性而保证了最终数据的一致性。即读线程对数据的更新是延迟感知的,因此这种情况下读线程不存在等待的情况。

    缺点:

    • 内存占用的问题:因为CopyOnWrite是写的时候复制机制,所以在写操作的时候。内存中会存在两个对象的内存,旧的对象和新写入的对象。注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)如果对象内存占用比较大有可能造成频繁的GC。
    • CopyOnWrite容器保证最终的数据一致性而不能保证数据的实时一致性。

    其实写时复制是一种很经典的解决线程冲突的方式,在Redis中也有使用:

    Redis写时拷贝(COW)总结 - 掘金----很牛的一篇文章

    ForkJoinPool

    ForkJoinPool是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,只不过这里的并行不是多台机器并行计算,而是多个线程并行计算。
    相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。而利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。

    核心数据结构

    ForkJoinPool数据结构,不同于ThreadPoolExector,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。

    工作窃取队列

    关于上面的全局队列,有一个关键点需要说明:它并非使用BlockingQueue,而是基于一个普通的数组得以实现。 这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务。所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。
    这个队列只有三个操作:
    (1)Worker线程自己,在队列头部,通过对queueTop指针执行加、减操作,实现入队或出队,这是单线程的。
    (2)其他Worker线程,在队列尾部,通过对queueBase进行累加,实现出队操作,也就是窃取,这是多线程的,需要通过CAS操作。
    正因为如此,在上面的数据结构定义中,queueTop 不是 volatile 的,queueBase 是 volatile类型。
    这个队列,在Dynamic Ci rcular Work-Stealing Deque这篇论文中被称为dynamic-cyclic-array。之所以这样命名,是因为有两个关键点:
    (1)整个队列是环形的,也就是一个数组实现的RingBuffer。并且queueBase会一直累加,不会减小;queueTop会累加、减小。最后,queueBase、queueTop的值都会大于整个数组的长度,只是计算数组下标的时候,会取queueTop&(queue.length-1),queueBase&(queue.length-1)。因为queue.length是2的整数次方,这里也就是对queue.length进行取模操作。
    当queueTop-queueBase=queue.length-1 的时候,队列为满,此时需要扩容;当queueTop=queueBase的时候,队列为空,Worker线程即将进入阻塞状态。
    (2)当队列满了之后会扩容,所以被称为是动态的。但这就涉及一个棘手的问题:多个线程同时在读写这个队列,如何实现在不加锁的情况下一边读写、一边扩容呢?
    通过分析工作窃取队列的特性,我们会发现:在 queueBase 一端,是多线程访问的,但它们只会使queueBase变大,也就是使队列中的元素变少。所以队列为满,一定发生在queueTop一端,对queueTop进行累加的时候,这一端却是单线程的!队列的扩容恰好利用了这个
    单线程的特性!即在扩容过程中,不可能有其他线程对queueTop进行修改,只有线程对queueBase进行修改!
    图7-4所示为工作窃取队列扩容示意图。扩容之后,数组长度变成之前的二倍,但queueTop、queueBase的值是不变的!通过queueTop、 queueBase对新的数组长度取模,仍然可以定位到元素在新数组中的位置。

    ForkJoinWorkThread状态与个数分析

    在 ThreadPoolExecutor 中 , 有 corePoolSize 和 maxmiumPoolSize两个参数联合控制总的线程数,而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数。那么,ForkJoinPool在实际的运行过程中,线程数究竟是由哪些因素决定的呢?
    要回答这个问题,先得明白ForkJoinPool中的线程都可能有哪几种状态?可能的状态有三种:
    (1)空闲状态(放在Treiber Stack里面)。
    (2)活跃状态(正在执行某个ForkJoinTask,未阻塞)。
    (3)阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join,等待另外一个任务的结果返回)。
    ctl变量很好地反映出了三种状态:
    高32位:u=(int)(ctl>>>32),然后u又拆分成tc、ac 两个16位;
    低32位:e=(int)ctl。
    (1)e>0,说明Treiber Stack不为空,有空闲线程;e=0,说明没有空闲线程;
    (2)ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出parallelism;
    (3)tc>0,说明总线程数 >parallelism。 tc与 ac的差值,也就是总线程数与活跃线程数的差异,在ForkJoinPool中有另外一个变量blockedCount记录,如下:
    所以,通过crl和blockedCount这两个变量,可以知道在整个ForkJoinPool中,所有空闲线程、活跃线程以及阻塞线程的数量。
    当一个新任务到来时,发现既没有空闲线程,也没有活跃线程,所有线程都阻塞着,在等待任务返回,此时便会开新线程来执行任务。

    Worker线程的阻塞—唤醒机制

    ForkerJoinPool 没有使用 BlockingQueue,也就不曾利用其阻塞 —唤醒机制,而是利用了park/unpark原语,并自行实现了Treiber St ack。下面进行详细分析ForkerJoinPool,在阻塞和唤醒的时候,分别 是如何入栈的。

    ForkJoinTask的fork/join

    如果局部队列、全局中的任务全部是相互独立的,就很简单了。 但问题是,对于分治算法来说,分解出来的一个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成。这种依赖关系是通过ForkJoinTask中的join()来体现的。
    关于这部分知识参考------Java并发实现原理:JDK源码剖析

    ForkJoinPool的优雅关闭

    同 ThreadPoolExecutor 一样,ForkJoinPool 的关闭也不可能是 “瞬时的”,而是需要一个平滑的过渡过程。
    线程池已经进入了关闭状态。但线程池进入关闭状态,不代表所有的线程都会立马关闭。
    为此,在ForkJoinWorkerThread里还有一个terminate变量,初始为false。当线程池要关闭的时候,会把相关线程的terminate变量置为true。这样,这些线程就会退出上面的while循环,也就会自动退出。
    当线程池关闭的时候,什么样的线程可以立马关闭,什么样的线程不能立马关闭呢?这就涉及下面两个函数的区别。

    shutdown()与shutdownNow()的区别

    二者的代码基本相同,都是调用tryTerminate(boolean)函数,其中一个传入的是false,另一个传入的是true。tryTerminate意为试图关闭ForkJoinPool,并不保证一定可以关闭成功。
    最后总结一下:shutdown()只拒绝新提交的任务;shutdownNow()会取消现有的全局队列和局部队列中的任务,同时唤醒所有空闲的线程,让这些线程自动退出。

  • 相关阅读:
    小DEMO:在vue中自定义range组件
    【JS】JavaScript入门笔记第四弹之函数、作用域~
    运算符及正则
    天空卫士C++ 一面(技术面、61min)
    【Prometheus】什么是prometheus?prometheus简介
    “的修“报修工单管理系统有哪些功能和作用?
    如何运营好技术相关的自媒体?
    Leetcode—137.只出现一次的数字II【中等】
    4G通信电子标签
    Golang内存对齐
  • 原文地址:https://blog.csdn.net/weixin_49329785/article/details/126374568