• 并发知识点复习


    一、并发编程基础

    1.共享资源

    堆和方法区都是各个线程共享的内存区域,堆存放对象实例,方法区存放已经被虚拟机加载的类型信息、常量、静态变量、即时编译器编译后产生的代码缓存

    2.核心问题

    2.1 原子性问题:操作系统做任务切换,可以发生在任何一条CPU指令执行完;CPU能保证的原子操作是CPU指令级别的,而不是高级语言的操作符

    2.2 可见性问题:指的是一个线程对共享变量的修改,另外一个线程可以立刻看见;可见性问题是由CPU的缓存导致的,多核CPU均有各自的缓存,各自的缓存均要和内存进行同步

    2.3 有序性问题:为了提高性能,编译器和处理器为了优化性能,常常会对程序指令做重排序;这种优化不会导致单线程运行的结果,但在并发环境下会导致bug

    三个问题的解决方案

    • 可见性:

      volatile

    • 有序性:

      hanpens-before

    • 原子性:

      ​ 监视器

      ​ 加锁

      ​ 死锁:

      ​ 产生条件

      ​ 破坏条件

      ​ 实现

      ​ 信号量

      ​ semaphore

    3.JMM 内存模型

    3.1 并发编程要解决的两个问题:

    ​ 通信问题:线程之间通过那种机制来交换信息

    ​ 同步问题:程序中用于控制不同线程之间操作发生的相对顺序的机制

    3.2 并发编程的内存模型:

    ​ 共享内存模型和消息传递模型,java用的共享内存模型

    ​ 在共享内存模型下,线程之间共享程序的公共状态,通过读-写内存中的公共状态进行隐式通信;同步是显示进行的,程序员必须显示指定某段代码需要在线程之间互斥执行

    3.3 (java memory model) java线程之间的通信由JMM控制,即JMM决定一个线程对共享变量的写入何时对另一个线程可见。JMM定义了线程与主内存之间的抽象关系,通过控制主内存与每个本地内存之间的交互,JMM为程序员提供了内存可见性的保证,也就是说解决了内存可见性问题

    3.4源代码与指令间的重排序

    源代码 – 编译器优化重排序 – 指令级并发重排序 – 内存系统重排序 – 最终执行的指令序列

    编译器优化重排序:编译器在不改变单线程程序语义的前提下可以重新安排语句的执行顺序

    指令级并发重排序:现代处理器采用了指令级并发技术来将多条指令重叠执行,如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序

    内存系统重排序:由于处理器采用缓存和读/写缓冲区,这使得加载和存储操作看上去是乱序执行

    3.5重排序对可见性的影响

    由于现代处理器都会使用写缓冲区,因此他们都会对读-写操作执行重排序

    3.6如何解决重排序带来的问题

    JMM解决了内存的可见性问题,另外也解决了编译器的指令重排序问题(三个重排序问题的第一个,通过happens-before规则来解决的),后面两个是CPU级别的重排序:指令级并行重排序和内存系统重排序 这两个是会要求编译器在生成指令序列时,插入内存屏障指令来禁止处理器重排序从而解决重排序带来的问题

    happens-before:

    JMM使用happens-before规则来阐述操作之间的可见性,以及什么时候不能重排序。

    如果一个操作的执行结果需要对另外一个操作可见,那么这两个操作之间必然要存在happens-before关系;

    如果A happens-before B,那么意味着A的执行结果必须对B可见,也就保证了跨线程之间的内存可见性。

    happens-before的6条规则

    1.程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作;(操作顺序执行)

    2.volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读;(先写,后读)

    3.synchronized规则:对一个锁的解锁,happens-before于随后对这个锁的加锁;(先解锁,后加锁)

    4.传递性:若A happens-before B,且B happens-before C,则A happens-before C;

    5.start()规则:若线程A 执行ThreadB.start(),则线程A的操作happens-before与线程B中的任意操作;

    6.join()规则:若线程A 执行ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()的成功返回

    在这里插入图片描述

    CPU级别的重排序带来的问题通过插入内存屏障指令解决:

    CPU层面的内存屏障:

    1.loadload禁止读和读的重排序

    2.storestore禁止写和写的重排序

    3.loadstore 禁止读和写的重排序

    4.storeload 禁止写和读的重排序

    java层面具体实现是通过unsafe类(sun.misc.unsafe)中的三个方法来实现

        public native void loadFence(); //禁止读-读;读-写 的重排序
    
        public native void storeFence(); //禁止写-写;读-写 的重排序
    
        public native void fullFence(); //禁止读-读 写-写 读-写 写-读 (全部禁止)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.volatile

    4.1基本特性

    • 可见性:对一个volatile变量的读,总是可以看见对这个变量最后的写入
    • 原子性:对任意单个volatile变量的读/写具有原子性,但是像volatile++这种复合操作不具有原子性

    4.2内存语义

    • 写内存语义:当写一个volatile变量时,JMM会把该线程本地内存中的共享变量的值刷新到主内存中

    • 读内存语义:当读一个volatile变量时,JMM会把该线程本地内存中的共享变量置为无效,使其从主内存中读取共享变量

    4.3实现机制

    编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序(后两种)

    • 在每个volatile写操作的前面插入一个StoreStore屏障;
    • 在每个volatile写操作的后面插入一个StoreLoad屏障;
    • 在每个volatile读操作的后面插入一个LoadLoad屏障;
    • 在每个volatile读操作的后面插入一个LoadStore屏障

    4.4volatile与锁对比:

    volatile仅仅保证对单个volatile变量的读写具有原子性(eg:int age 如果是get那就是 i load,如果是set那就是 i store),而锁的互斥执行的特性可以保证对整个临界区代码的执行具有原子性;功能上锁比volatile强大,可伸缩性和执行性能上volatile更有优势

    5.锁

    内存语义:

    • 当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中;
    • 当线程获取锁时,JMM会把该线程对应的本地内存置为无效

    实现机制:

    • synchronized:采用CAS+ Mark Word 实现,存在锁升级的情况
    • lock:采用CAS + volatile实现,存在锁降级的过程,核心是AQS

    lock的优点:

    1、支持响应中断 2、支持超时机制 3、支持以非阻塞的方式获取锁 4、支持多个条件变量(阻塞队列)

    JDK9开始不支持32位操作系统…

    二、原子类

    所有的原子类都是基于unsafe类中提供的三个CAS方法来实现的:

        public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);//(对象,成员变量/值在内存中的偏移量,期望的值,要更新的值)
    
        public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    
        public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1、原子更新基本类型

    包含三个类:AtomicInteger、AtomicLong、AtomicBoolean

    其他基本类型的变量如char、float、double、可以先转为整型,再进行原子操作;比如AtomicBoolean类是先把Boolean类型先转为整型,再使用compareAndSwapInt进行CAS操作

    AtomicInteger类 常用方法:

    getAndAdd(); addAndGet(); getAndIncrement(); incrementAndGet(); compareAndSet();

    上述方法底层均使用unsafe类来实现,涉及主要方法:getIntVolatile(); compareAndSwapInt(); 他们都是通过操作系统本地API实现原子操作

    2、原子更新引用类型

    包含三个类:

    AtomicReference 原子更新引用类型

    AtomicStampedReference:原子更新带有标记位的引用类型 (标记位为整数类型)

    AtomicMarkableReference:原子更新带有标记位的引用类型 (标记位为布尔类型)

    后两个类用于解决ABA问题,标记位的类型不同 打个标记看到底变没变

    3、原子更新属性

    如果是已经有的一个类,在不能更改其源代码的情况下,想要实现对其成员变量的原子操作,则需要使用这三个类

    AtomicIntegerFiledUpdater、AtomicLongFieldUpdater、AtomicReferenceUpdater

    4、原子更新数组

    AtmoicIntegerArray、AtomicLongArray、AtmicReferenceArray

    通过原子的方式更改数组中的元素

    5、Striped64类 *

    包含四个子类:LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator

    为了解决在高并发场景下多个线程对同一个变量进行CAS操作的性能问题

    LongAdder类 :是对并发场景下多个线程操作数字,采用对多个线程进行累加操作(原子的)最后求和(分治)

    LongAccumulator类:也是操作两个数值,但是不一定是累加,而是一种更加通用的操作,里面有一个接口可以自己实现其中的逻辑,是自定义的一个二元运算符

    6、原子操作三个问题

    ABA问题:

    如果一个值原来是A,变成了B、然后又变回了A,那么使用CAS操作进行检查的时候会发现它的值没有发生变化,但实际上却发生了改变。ABA问题的解决思路就是加版本号,在变量前面加版本号,每次变量更新就在前面+1,比如A - B - A ==> 1A - 2B - 3A

    atomic包中提供了两个类AtomicStampedReference和AtmoicMarkableReference来解决ABA问题

    自旋开销大的问题:

    CAS操作往往伴随着自旋,而自旋CAS如果长时间不成功,会给CPU造成很大的开销(怎么造成?),这个时候可以先自旋几次,如果依然拿不到锁再进行阻塞,以这样的方式来降低开销,synchronized就是这样的实现策略

    只能保证一个变量的原子操作

    对多个共享变量进行操作时,循环CAS就无法保证操作的原子性;这个时候可以把多个共享变量合并成一个共享变量,然后采用原子更新引用类型的方式来操作,也可以采用加锁的方式来解决

    三、锁

    1.Synchronized

    1.1锁的粒度(从小到大)

    • 对于同步代码块,锁是synchronized括号里配置的对象(同步监视器)
    • 对于同步方法,锁是当前实例对象this
    • 对于静态同步方法,锁是当前类的Class对象

    1.2实现机制

    synchronized用的锁信息存放在Java对象头里。如果对象是数组类型,则虚拟机用3个字宽存储对象头;如果不是数组类型,则用2字宽存储对象头。其中Mark word里存储了锁的信息,包括锁的标志位和状态。

    对象头分三段:

    1、Mark word 中存放的是锁的信息

    锁标志位:01表示无锁&&偏向锁,不偏向为0

    2、指针指向的是方法区中的地址3、

    在这里插入图片描述

    当有线程在访问临界区的时候,对象的Mark word中就有了线程ID,就变成为偏向锁;

    当有第二个线程访问临界区的时候,此时偏向锁升级为轻量级锁,此时轻量级锁存放在了线程的栈帧里,锁标志位变为00;此时其他线程就以CAS自旋的方式来获取锁,如果自旋次数过多,轻量级锁就升级为重量级锁,锁标志位11;其他线程就阻塞

    无锁的时候存放对象的hashcode和对象分带年龄;

    偏向锁的时候存放对象的线程ID+epoch+对象分带年龄

    轻量级锁的时候存放指向栈中锁记录的指针

    重量级锁的时候存放指向重量级锁(互斥量)的指针

    当没有竞争的时候当然没有锁,

    当有一个线程竞争锁的时候,此时是偏向锁

    当有两个或者多个线程和第一个线程竞争的时候,此时竞争比较小,线程1撤销偏向锁为升级到轻量级锁做好准备,

    此时多个线程竞争轻量级锁,是先把对象头中的信息Mark word复制到各自的栈帧中,同时各自以CAS的方式去抢着往对象头里写入自己的栈帧指针,谁先替换成功谁就升级为轻量级锁,此时没有抢到的线程会以自旋的方式来重新获取锁,自旋到一定次数,轻量级锁膨胀为重量级锁,其他线程阻塞

    1.3锁升级过程

    在这里插入图片描述

    偏向锁:

    java1.6之后引入了锁升级的过程,目的是为了减少加锁和释放锁的性能消耗

    轻量级锁:

    加锁时,JVM先在 ==> 当前线程的栈帧中 ==>创建一个空间(用于存储锁记录) ==>复制对象头中的Mark word到锁记录 ;

    然后线程尝试用CAS的方式来将对象头中的Mark word替换为指向锁记录的指针;此时的线程都是并发的动作,如果成功则获取锁;如果失败则其他某个线程获取锁,失败的线程通过自旋的方式来重新获取锁;

    解锁时,也就是某个轻量级锁线程同步块执行完毕后,会CAS将锁记录替换回对象头 。如果成功,说明当前没有锁竞争;如果失败说明有锁竞争,锁膨胀为重量级锁。

    在这里插入图片描述

    2.AQS

    在这里插入图片描述

    维护了一个volatile int state共享资源和一个FIFO线程等待队列,state访问的三种方式:

    protected final void setState(int newState){state = newState;}
    protected final int getState(){return state;}
    protected final boolean compareAndSetState(int expect,int update){
        return unsafe.compareAndSwapInt(this,stateOffset,expect,update);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    lock的实现基于AQS

    AbstractQueuedSynchronizer

    是用来构建锁或者其他同步组件的基础框架;它使用了一个int成员变量表示同步状态;通过内置FIFO队列来完成资源获取线程的排队工作。同步器的设计是基于模板方法模式,使用者需要继承同步器并重写指定方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法会调用使用者重写的方法。

    模板方法模式:给出一个解决特定问题的一个模板,流程固话,但中间有些局部的方法现在不能确定,因为具体的业务场景不同,所以子类需要做一个单独的实现 eg:

    void foo{
     a();
     b();
     c();
    }
    //B方法必须在一个具体的业务场景里才已知,可以把B方法定义为抽象方法
    //写一个子类把B实现了就可以
    void a(){};
    void c(){};
    abstract void b(){};
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.1核心代码

    //独占线程 存在于父类中
    private transient Thread exclusiveOwnerThread;
    
    //同步队列
    static final class Node(){}
    private transient volatile Node head;
    private transient volatile Node tail;
    
    //同步状态
    private volatile int state;
    protected final int getState(){return state;}
    protected final int setState(int newState){state = newState;}
    protected final boolean compareAndSetState(int expect,int update){
        return unsafe.compareAndSwapInt(this,stateOffset,expect,update);
    }
    
    //重写方法
    protected boolean tryAquire(int arg){throw new UnsupportedOperationException();}
    protected boolean tryRelease(int arg){throw new UnsupportedOperationEception();}
    protected int tryAquireShared(int arg){throw new unsupportedOperationException();}
    protected boolean tryReleaseShared(int arg){throw new unsupportedOperationException();}
    
    //模板方法
    public final void acquire(int arg){} //独占式获取同步状态
    public final boolean release(int arg){} //独占式释放同步状态
    public final void acquiredShared(int arg){} //共享式获取同步状态
    public final boolean releaseShared(int arg){} //共享式释放同步状态
    public final Collections<Thread> getQueuedThreads(){} 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.2同步队列

    AQS双向链表 同步器head指向头 tail指向尾,修改尾部要CAS原子操作,修改头部不用原子操作;

    在这里插入图片描述

    2.3源码阅读:

    独占:只有一个线程能执行,如ReentrantLock

    共享:多个线程可同时执行,如Semaphore、CountDownLatch

    1.独占式获取同步状态:acquire(); addWaiter(); enq(); acquireQueued();

    2.独占式释放同步状态:release();

    3.共享式获取同步状态:acquireShared(); doAcquireShared(); setHeadAndPropagate()

    4.共享式释放同步状态:releaseShared(); doReleaseShared();

    //acquire() 流程:(也是ReentrantLock.lock()的流程)

    1、调用自定义同步器tryAcquire方法尝试直接去获取资源,如果成功直接返回;

    2、没成功,则addWriter()将该线程加入到队列尾,并标记位独占模式

    3、acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,被unpark())会去尝试资源,获取到资源才返回;如果在这个过程中被中断过,则返回true,没有被中断过返回false

    4、如果线程在等待过程中被中断过,它是不响应的;只有再获取到资源后再进行自我中断selfInterrupt(),将中断补上

    在这里插入图片描述

    /*独占模式下获取锁,忽略中断;至少调用一次tryAcquire来实现,成功时返回;
      否则,线程将排队,可能重复阻塞和解除阻塞,直到tryAcquire成功;
      tryAcquire方法是我们需要用的时候再重写它,否则抛异常
    */
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    //将线程加入等待队列的尾部,并标记位独占模式,返回当前线程所在的节点
    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // 尝试以快速的方式直接放在队尾
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
        	//上一步失败则直接以enq()入队
            enq(node);
            return node;
        }
    //将节点插入队列,必要时初始化
    private Node enq(final Node node) {
        //CAS自旋,直到成功加入队尾
            for (;;) {
                Node t = tail;
                //如果队列为空,创建一个空的标志节点作为head节点,并将tail指向它
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {//队列不为空,加入队尾
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    //acquireQueued()流程

    1、节点进入队尾后,检查状态,找到安全休息点

    2、调用park()进入waiting状态,等待unpark()或interrput()唤醒自己

    3、被唤醒后,查看自己时候有资格拿到号。如果拿到,head指向当前节点,并返回从入队到拿到号过程中是否被中断过,如果没拿到,继续流程1

    //通过tryAcquire()和addWriter()方法执行完,该线程获取资源失败,进入等待队列;下一步:
    //等待其他线程释放资源后唤醒自己,
    //使线程阻塞在等待队列中获取资源,一直到获取到资源才返回;
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;//标记是否成功拿到资源
            try {
                boolean interrupted = false;//标记等待过程中是否被中断过
                //CAS自旋
                for (;;) {
                    final Node p = node.predecessor(); //拿到前驱节点
                    //如果前驱是head,则说明当前节点是第二个节点,则有资格去获取资源(可能是首节点释放资源唤醒自己,或者是被中断);
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);//拿到资源后,将head指向该节点,head所指的节点就是当前获取到资源的那个节点或者是null
                        p.next = null; //set(head)时已经将node.prev置为null,这里再将head.next置为空,是为了帮助GC回收head节点,意味着之前拿完资源的节点出队了
                        failed = false; //成功获取资源
                        return interrupted;//返回等待过程中是否被中断过
                    }
                    //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark();如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源则会继续进入park()状态
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;//如果等待过程中被中断过,哪怕只有一次也将interrupt置为true
                }
            } finally {
                if (failed)//如果等待过程中没有成功获取到资源(如timeout或者可中断的情况下被中断了),那么取消节点在队列中的等待
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    //shouldParkAfterFailedAcquire()

    //检查自己是否真的可以去休息了(线程进入waiting状态)

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;//拿到前驱的状态
            if (ws == Node.SIGNAL)
    			//如果已经告诉前驱拿完好后通知自己一下,则return true 进入waiting状态
                return true;
            if (ws > 0) {
    			//如果前驱放弃了,那就一直往前找,直到找到一个正常等待的状态,并排在它后面;
                //那么被自己加塞的节点,相当于形成了一个无引用链,会被GC回收
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
    			//如果前驱是正常等待状态,那就把前驱的状态设置为SIGNAL,告诉它拿完号通知一下自己(有可能失败(刚释放完))
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    //parkAndCheckInterrupt()

    //让线程去休息,进入真正等待状态

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);//调用park()使线程进入waiting状态
            return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的
        }
    
    • 1
    • 2
    • 3
    • 4

    //release() (也是unlock的语义) 释放定量的资源,如果彻底释放了即(state = 0),它会唤醒等待队列中的其他线程来获取资源

    //调用tryRelease()来释放资源;根据tryRelease()的返回值来判断该线程是否已经完成释放资源了

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;//找到头节点
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);//唤醒等待队列里的下一个线程
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    //unparkSuccessor() 唤醒等待队列中的下一个线程

    1、unpark唤醒等待队列中最前面的那个未放弃线程

    private void unparkSuccessor(Node node) {
    		
            int ws = node.waitStatus;//当前线程所在节点
            if (ws < 0)
                //将当前线程状态置0,可以失败
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;//找到下一个要唤醒的节点
            if (s == null || s.waitStatus > 0) {//如果为空或已取消
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)//从后往前找
                    if (t.waitStatus <= 0) // <= 0的节点都是有效的节点
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);//唤醒
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2、s被唤醒后,进入if(p == head && tryAcquire(arg))判断;这里即使p != head 也没有关系,会进入shouldParkAfterFailedAcquire()寻找一个安全点,这里既然s已经是等待队列中最前面的那个未放弃线程,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑道head.next节点,下一次p == head就成立了,然后s把自己设置为head标杆节点,表示自己已经获取到资源了,acquire()返回

    //acquireShared() 共享模式下获取资源,忽略中断;和acquire相比多了自己拿到资源还要去唤醒后继节点(共享)

    负数表示获取失败;0表示获取成功,但没有剩余资源;正数表示获取成功,但还有剩余资源

    1、tryAcquireShared尝试获取资源,成功直接返回

    2、失败则通过doAcquireShared()进入等待队列,直到获取到资源才返回

    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    • 1
    • 2
    • 3
    • 4

    //doAcquireShared() 将当前线程加入到等待队列,直到其他线程释放资源唤醒自己,自己成功拿到资源才返回

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED); //加入队列尾部
            boolean failed = true;//是否成功标志
            try {
                boolean interrupted = false;//等待过程中是否被中断标志
                //CAS自旋
                for (;;) {
                    final Node p = node.predecessor();//前驱
                    if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {//成功
                            setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                            p.next = null; // help GC
                            if (interrupted) //如果等待过程中被打断过,此时将中断补上
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    //判断状态,寻找安全点,进入waiting状态,等待被unpark()或interrupt()
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    //setHeadAndPropagate()

    自己苏醒时,如果还有剩余资源就去唤醒后继节点(共享模式)

    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);//head指向自己
         	//如果还有剩余量,继续唤醒下一个邻居线程
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    //releaseShared() 共享模式释放资源

    释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) { //尝试释放资源
                doReleaseShared(); //唤醒后继节点
                return true;
            }
            return false;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    //doReleaseShraed() 唤醒后继

    private void doReleaseShared() {
    		
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);//唤醒后继
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.Lock

    3.1 lock接口

    //加锁
    void lock();
    //解锁
    void unlock();
    //加锁(可中断) 当获取到锁的线程被中断时,中断异常会被抛出,同时锁会被释放
    void lockInterruptibly() throws InterruptedException;
    //加锁(非阻塞) 尝试获取锁,若此时锁没有被其他线程获取到,则成功获取锁
    boolean trylock();
    //加锁(可超时) 在指定截止时间之前获取锁,若时间到仍无法获取锁,则返回
    boolean tryLock(long time,TimeUnit unit) throws InterruptedException;
    //获取等待通知组件,该组件与当前的锁绑定,当前线程只有获取了锁,才能调用该组件
    Condition newCondion();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2ReentrantLock实现

    ReentrantLock基于类内部的Sync类实现的, 而 Sync extends AbstractQueuedSynchronizer;

    所以ReentrantLock基于AQS实现

    //锁
    abstract static class Sync extends AbstractQueuedSynchronizer{}
    //非公平锁
    static final class NonfairSync extends Sync{}
    //公平锁
    static final class FairSync extend Sync{}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.3源码阅读

    • 锁的构造 ReentrantLock
    • 锁的实现 Sync NonfairSync FairSync

    4.ReadWriterLock

    4.1ReadWriterLock接口

    Lock readLock();
    Lock writeLock();
    
    • 1
    • 2

    4.2ReentrantReadWriterLock实现

    //锁
    abstract static class Sync extends AbstractQueuedSynchronizer{}
    //非公平锁
    static final class NonfairSync extends Sync{}
    //公平锁
    static final class FairSync extend Sync{}
    //读锁
    public static class ReadLock implements Lock,java.io.serializable{}
    //写锁
    public static class WriteLock implements Lock,java.io.serializable{}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.3源码阅读

    • 锁的构造 ReentrantReadWriteLock
    • 锁的实现 Sync NonfairSync FairSync ReadLock WriteLock

    4.4锁降级

    锁降级是指把持住当前拥有的写锁,再获取到读锁,随后释放先前拥有的写锁的过程。

    5.Condition

    condition对象由lock对象创建,condition里面有等待/通知两种类型的方法

    当前线程调用Condition.await() ==> 当前线程释放锁并等待

    其他线程调用Condition.signal() ==> 通知当前线程 ==>当前线程拿到锁并结束Condition.await()方法

    5.1线程通信

    Monitor(Synchronized)Conditon(Lock)
    等待wait()await()
    通知一个notify()signal()
    通知所有notifyAll()signalAll()

    5.2通信机制

    调用等待方法,当前线程进入等待队列;

    调用通知方法,将等待队列中的线程转移到同步队列

    Synchronized:只能有一个Monitor,所以只有一个等待队列

    Lock:可以创建多个Condition,所以有多个等待队列

    5.3ArrayBlockingQueue源码

    生产者要有一个等待队列,消费者也得有一个等待队列

    • 目标:生产者通知消费者;消费者通知生产者
    • 避免:生产者通知生产者;消费者通知消费者

    5.4ConditionObject源码

    在这里插入图片描述

    四、线程池

    1.ThreadPoolExcutor

    1.1实现机制

    在这里插入图片描述

    1.2核心参数

    public ThreadPoolExcutor(
    	int corePoolSize, //核心线程数
        int maximumPoolSize, //最大线程数
        long keepAliveTime, //空闲线程存活时间
        TimeUnit unit, //存活时间单位
        BlockingQueue<Runnable> workQueue, //线程阻塞队列
        ThreadFactory threadFactory, //线程工厂
        RejectedExecutionHandler handler //拒绝策略 
    ){......}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.3生命周期

    在这里插入图片描述

    1、线程的五种状态,只能由小到大迁移,-1 > 0 > 1 > 2 > 3

    2、shutdown() 不会清空任务队列,会等队列中的线程执行完;showdownNow()会清空队列,不等他们完成。

    ​ shutdown() 只会中断空闲的线程,showdownNow() 会中断所有的线程

    3、TIDYING 和 TERMINATED 二者之间执行了一个钩子函数terminated(),是一个空的实现

    1.4源码阅读

    • 实现机制:execute()
    • 核心参数:实例变量
    • 生命周期:静态常量
    • 拒绝策略:CallerRunsPolicy, AbortPolicy, DiscardPolicy, DiscardOldestPolicy
    • 提交方法:execute(), submit(), newTaskFor(), FutureTask

    2.ScheduledThreadPoolExcutor

    2.1核心功能

    //延迟执行任务
    public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit){}
    
    //延迟执行任务
    public ScheduledFuture<?> schedule(Callable<V> callable,long delay,TimeUnit unit){}
    
    //以固定频率执行任务
    public ScheduledFuture<?> scheduleAtFixedRate(
    	Runnable command,long initialDelay,long period,TimeUnit unit){}
    
    //以固定间隔执行任务(从前一个任务结束开始)
    public ScheduledFuture<?> scheduleWithFixedDelay(
    	Runnable command,long initialDelay,long period,TimeUnit unit){}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.2源码阅读

    • 延迟队列任务:DelayedWorkQueue,ScheduledThreadPoolExcutor(),schedule()
    • 周期执行任务:scheduleAtFixedRate(),scheduleWithFixedDelay(),ScheduledFutureTask

    3.Excutors

    Excutors采用无界队列,容易造成OOM,不建议使用

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    五、并发工具

    1.Semaphore

    假设初始化Semaphore时指定的资源数量是10,那么如果有N个线程来获取semaphore里面的资源;N个线程中只有10个线程可以获取到,其他线程都会阻塞,直到有线程释放了资源,其他线程才能获取到

    当初始化资源为1的时候,semaphore退化为排它锁;semaphore的实现原理和锁十分相似,也是基于AQS,也有公平和不公平之分

    2.CountDownLatch

    主线程要等待10个worker线程工作完毕才退出,就可以使用contdownlatch实现;

    基于AQS,但没有公平和不公平之分

    3.CycleBarrier

    ​ CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await()和countDown()方法都会立刻返回,此时的CountDownLatch就起不到线程同步的效果了。
    ​ 为了满足计数器可以重置的需要,JDK开发组提供了CyclicBarrier类,并且CyclicBarrier类的功能并
    不限于CountDownLatch的功能。从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。
    之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。
    之所以叫作屏障是因为线程调用await()方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都
    调用了await()方法后,线程们就会冲破屏障,继续向下运行。

  • 相关阅读:
    《设计模式》适配器模式
    模板模式【Java设计模式】
    Spring 框架下如何调用kafka
    create® 3入门教程-创建Create3 Docker映像
    Prometheus监控PHP应用
    Nginx的X-Accel-Redirect实现大文件下载
    高等数学(第七版)同济大学 习题4-3 个人解答
    如何用IDEA创建SpringBoot项目
    【011】C++选择控制语句 if 和 switch 详解
    Android 12 源码分析 —— 应用层 四(SystemUI的基本布局设计及其基本概念)
  • 原文地址:https://blog.csdn.net/weixin_44015158/article/details/126366673