• JUC源码笔记


    Atomic

    Java魔法类:Unsafe应用解析

    • 包装类型
    类型@since关键字段父类
    AtomicBoolean1.5private volatile int value;
    private static final long valueOffset;
    AtomicInteger1.5private volatile int value;
    private static final long valueOffset;
    abstract class Number
    AtomicLong1.5private volatile long value;
    private static final long valueOffset;
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); //底层JVM是否支持CAS
    abstract class Number
    • 包装类型数组
    类型@since关键字段父类
    AtomicIntegerArray1.5private final int[] array;
    private static final int base = unsafe.arrayBaseOffset(int[].class);
    private static final int shift = 31 - Integer.numberOfLeadingZeros(unsafe.arrayIndexScale(int[].class));
    AtomicLongArray1.5private final long[] array;
    private static final int base = unsafe.arrayBaseOffset(long[].class);
    private static final int shift = 31 - Integer.numberOfLeadingZeros(unsafe.arrayIndexScale(long[].class));
    • 引用类型
    类型@since关键字段备注
    AtomicReference1.5private volatile V value;
    private static final long valueOffset;
    AtomicReferenceArray1.5private final Object[] array;
    private static final long arrayFieldOffset = unsafe.objectFieldOffset(AtomicReferenceArray.class.getDeclaredField(“array”));
    private static final int base = unsafe.arrayBaseOffset(Object[].class);
    private static final int shift= 31 - Integer.numberOfLeadingZeros(unsafe.arrayIndexScale(Object[].class));
    AtomicMarkableReference1.5private volatile Pair pair;
    private static final long pairOffset = objectFieldOffset(UNSAFE, “pair”, AtomicMarkableReference.class);
    使用boolean mark来标记reference是否被修改过
    AtomicStampedReference1.5private volatile Pair pair;
    private static final long pairOffset = objectFieldOffset(UNSAFE, “pair”, AtomicStampedReference.class);
    使用int stamp来标记reference是否被修改过
    • field updater
    类型@since关键字段备注
    AtomicIntegerFieldUpdater1.5private final long offset;
    private final Class<?> cclass;
    private final Class tclass;
    内部实现类AtomicIntegerFieldUpdaterImpl
    AtomicLongFieldUpdater1.5private final long offset;
    private final Class<?> cclass;
    private final Class tclass;
    内部实现类CASUpdater(底层CAS)、LockedUpdater
    AtomicReferenceFieldUpdater1.5private final long offset;
    private final Class<?> cclass;
    private final Class tclass;
    /** class holding the field /
    /
    * field value type */
    private final Class vclass;
    内部实现类AtomicReferenceFieldUpdaterImpl
    • 累加器
    类型@since关键字段父类
    Striped64// size is a power of 2.
    transient volatile Cell[] cells;
    transient volatile long base;
    // Spinlock (locked via CAS) used when resizing and/or creating Cells.
    transient volatile int cellsBusy;
    Number
    LongAccumulator1.8private final LongBinaryOperator function;
    private final long identity;
    Striped64
    LongAdder1.8Striped64
    DoubleAccumulator1.8private final DoubleBinaryOperator function;
    private final long identity;
    Striped64
    DoubleAdder1.8Striped64

    Striped64中两个重要方法:

    // 处理涉及初始化、扩容、cell初始化、竞争的情况
    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
    final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended)
    
    • 1
    • 2
    • 3

    Adder是Accumulater的特殊情况–累加运算,Accumulater可自定义运算。

    DoubleXXX基于LongXXX的区别在于,内部会将Double转换成Long,其余一样。

    相较于AtomicXXX,使用了“热点分离”的思想,用空间换时间,减少了并发冲突。

    locks

    • 同步器Synchronizer
    类/接口版本主要字段主要方法父类备注
    AbstractOwnableSynchronizer1.6/** The current owner of exclusive mode synchronization. */
    private transient Thread exclusiveOwnerThread;
    get/setExclusiveOwnerThread
    AbstractQueuedSynchronizer1.5private transient volatile Node head;
    private transient volatile Node tail;
    /** The synchronization state. */
    private volatile int state;
    // 独占式获取同步状态
    boolean tryAcquire(int arg)
    // 独占式释放同步状态
    boolean tryRelease(int arg)
    // 共享式获取同步状态
    int tryAcquireShared(int arg)
    // 共享式私房同步状态
    boolean tryReleaseShared(int arg)
    // 检测当前线程是否获取独占锁
    boolean isHeldExclusively()
    // ----------------------------
    acquireXXX/releaseXXX
    AbstractOwnableSynchronizer模板方法,子类根据需要实现独占/共享获取和释放状态。入口方法是acquireXXX和releaseXXX方法。
    AbstractQueuedLongSynchronizer1.6private transient volatile Node head;
    private transient volatile Node tail;
    private volatile long state;
    同上AbstractOwnableSynchronizer
    • Condition监听器
    类/接口版本主要字段主要方法父类备注
    Condition1.5await XXX
    signal/signalAll
    ConditionObjectprivate transient Node firstWaiter;
    private transient Node lastWaiter;
    ConditionAbstractQueuedSynchronizer、AbstractQueuedLongSynchronizer内部类。
    • LockSupport
    类/接口版本主要字段主要方法父类备注
    LockSupportpublic static void unpark(Thread thread)
    public static void park XXX
    内部调用unsafe.park和unsafe.unpark方法
    • Lock
    类/接口版本主要字段主要方法父类备注
    Lock1.5lock XXX
    void unlock();
    Condition newCondition();
    ReadWriteLock1.5Lock readLock();
    Lock writeLock();
    ReentrantLock1.5private final Sync sync;Lock内部自定义AQS实现类,分为公平锁和非公平锁。实现AQS的tryAcquire/tryRelease方法组
    ReentrantReadWriteLock1.5private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;
    public int getReadLockCount()
    public boolean isWriteLocked()
    public boolean isWriteLockedByCurrentThread()
    public int getWriteHoldCount()
    public int getReadHoldCount()
    ReadWriteLock采用读写分离的策略,允许多个线程可以同时获取读锁,满足写少读多的场景。Sync实现AQS的独占、共享模板方法。通过readLock和writeLock获取不同的锁调用对应方法。其中state标识分为两个short,高16位位读锁计数,低16位为写锁计数
    https://cloud.tencent.com/developer/article/1469555
    StampedLock1.8/** Head of CLH queue / private transient volatile WNode whead; /* Tail (last) of CLH queue / private transient volatile WNode wtail; // views transient ReadLockView readLockView; transient WriteLockView writeLockView; transient ReadWriteLockView readWriteLockView; /* Lock sequence/state / private transient volatile long state; /* extra reader count when state read count saturated */ private transient int readerOverflow;public long writeLock()
    public long tryWriteLock()
    public long readLock()
    public long tryReadLock()
    public long tryOptimisticRead()
    public boolean validate(long stamp)
    public void unlockWrite(long stamp)
    public void unlockRead(long stamp)
    public void unlock(long stamp)
    public long tryConvertToWriteLock(long stamp)
    public long tryConvertToReadLock(long stamp)
    public long tryConvertToOptimisticRead(long stamp)
    public boolean tryUnlockRead()
    public boolean tryUnlockWrite()
    private long cancelWaiter(WNode node, WNode group, boolean interrupted)
    private long acquireRead(boolean interruptible, long deadline)
    private long acquireWrite(boolean interruptible, long deadline)
    对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。
    StampedLockd的内部实现是基于CLH锁的(锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程)。
    写锁被占用的标志是第8位为1,读锁使用0-7位,正常情况下读锁数目为1-126,超过126时,使用一个名为readerOverflow的int整型保存超出数。同时高56为用于版本校验(validate方法)
    https://cloud.tencent.com/developer/article/1470988

    阻塞队列

    在这里插入图片描述

    类/接口版本主要字段主要方法父类备注
    Queue1.5// true或抛出异常
    boolean add(E e)
    // true或false
    boolean offer(E e)
    // 空队列,抛出异常
    E remove()
    // 空队列,返回null
    E poll()
    // 检索队列头,空队列,抛出异常
    E element()
    // 检索队列头,空队列返回null
    E peek()
    Collectioninterface
    java.util.Queue
    BlockingQueue1.5// 等待插入
    void put(E e) throws InterruptedException;
    // 等待获取
    E take() throws InterruptedException;
    int remainingCapacity();
    int drainTo(Collection<? super E> c, int maxElements);
    Queueinterface
    Deque1.6XXXFirst/XXXLast
    // 检索数据,空队列抛出异常
    E getFirst();/E getLast();
    // 删除指定元素
    boolean removeFirstOccurrence(Object o);
    // 同addFirst
    void push(E e);
    // 同removeFirst
    E pop();
    Iterator iterator();
    Iterator descendingIterator();
    Queueinterface
    java.util.Queue
    BlockingDeque1.6// 等待获取
    E takeFirst() throws InterruptedException;
    E takeLast() throws InterruptedException;
    BlockingQueue、Dequeinterface
    AbstractQueue1.5AbstractCollection
    Queue
    abstract
    add内部调用offer
    remove内部调用poll
    element内部调用peek
    ArrayBlockingQueue1.5/** The queued items /
    final Object[] items;
    /
    * items index for next take, poll, peek or remove /
    int takeIndex;
    /
    * items index for next put, offer, or add /
    int putIndex;
    /
    * Number of elements in the queue /
    int count;
    /
    * Main lock guarding all access /
    final ReentrantLock lock;
    /
    * Condition for waiting takes /
    private final Condition notEmpty;
    /
    * Condition for waiting puts */
    private final Condition notFull;
    private static void checkNotNull(Object v)
    private void enqueue(E x)
    private E dequeue()
    AbstractQueue
    BlockingQueue
    Itrs迭代器管理器将所有的迭代器都用链表的形式进行保存,每一次有新的迭代器被注册时都将其加到该迭代器链表的头节点
    IteratorSpliterator的迭代实现forEachRemaining是直接采用传统的Iterator迭代根数据源ArrayBlockingQueue(就是利用的ArrayBlockingQueue本身的Iterator实现),所以在迭代的过程中,队列如果发生更新,迭代器也会同步更新状态。但是拆分后的迭代器ArraySpliterator是在拆分的时候将顶层迭代器的数据的一半直接拷贝到一个新的数组中,在迭代的时候直接通过数组下标访问该数组,所以这已经脱离了源数据ArrayBlockingQueue,其内部元素并不会随着队列的变化而被更新。
    ArrayBlockingQueue内部的几乎每一个操作方法都需要先获取同一个ReentrantLock独占锁才能进行,这极大的降低了吞吐量,几乎每个操作都会阻塞其它操作,最主要是插入操作和取出操作相互之间互斥。所以ArrayBlockingQueue不适用于需要高吞吐量的高效率数据生成与消费场景。
    LinkedBlockingQueue1.5/** The capacity bound, or Integer.MAX_VALUE if none /
    private final int capacity;
    /
    * Current number of elements /
    private final AtomicInteger count = new AtomicInteger();
    /
    * Head of linked list.Invariant: head.item == null /
    transient Node head;
    /
    * Tail of linked list. Invariant: last.next == null /
    private transient Node last;
    /
    * Lock held by take, poll, etc /
    private final ReentrantLock takeLock = new ReentrantLock();
    /
    * Wait queue for waiting takes /
    private final Condition notEmpty = takeLock.newCondition();
    /
    * Lock held by put, offer, etc /
    private final ReentrantLock putLock = new ReentrantLock();
    /
    * Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    AbstractQueue
    BlockingQueue
    迭代器保证了其弱一致性,除了首个有效节点在创建迭代器实例的时候就已经被保留下来之外(所以在获取迭代器实例之后,就算移除了头节点it.next也会返回该节点),队列中其它节点的变更都能被迭代器同步更新。LinkedBlockimgQueue的迭代器少了ArrayBlockingQueue那样很多精密的实现例如对于GC的友好性,所以使用多个迭代器实例可能内存性能有不可预测性。
    LBQSpliterator,第一次只拆一个元素,第二次拆2个,第三次拆三个,依次内推,拆分的次数越多,后面的迭代器分的得元素越多,直到一个很大的数MAX_BATCH(33554432) ,后面的迭代器每次都分到这么多的元素。tryAdvance、forEachRemaining、trySplit方法都是需要获取fullLock的,所以注意对吞吐量的影响
    有一个专门的head节点,所有的有效节点都移除挂载到head节点之后,采用两个独立的可重入锁分别对出入队进行加锁,所以吞吐量有了很大的提高,但是它还是有很多方法(remove ,clear ,toArray, toString以及迭代器相关的方法)需要同时获取两个锁才能操作,这无疑会影响吞吐量,所以要合理使用。另外它在实现的时候创建了额外的Node节点实例来绑定真实数据,所以对内存的消耗稍微要多一些。最后它通过将从头部出队的节点的下一个节点指向自身的方法不但保障了迭代器的弱一致性而且解决了GC的跨代引用无法回收的问题,但是remove方法依然可能会导致GC无法释放,所以需要慎重使用。
    LinkedBlockingDeque1.6/** Pointer to first node. Invariant: (first == null && last == null) or (first.prev == null && first.item != null) /
    transient Node first;
    /
    * Pointer to last node. Invariant: (first == null && last == null) or (last.next == null && last.item != null) /
    transient Node last;
    /
    * Number of items in the deque /
    private transient int count;
    /
    * Maximum number of items in the deque /
    private final int capacity;
    /
    * Main lock guarding all access /
    final ReentrantLock lock = new ReentrantLock();
    /
    * Condition for waiting takes /
    private final Condition notEmpty = lock.newCondition();
    /
    * Condition for waiting puts */
    private final Condition notFull = lock.newCondition();
    AbstractQueue
    BlockingDeque
    以单个锁实现的基于链表的可选容量的双端队列,可以从对头、对尾两端插入和移除元素,这主要是通过节点同时持有前驱节点和后继节点的引用来达到同时支持正向反向的操作,它的迭代器支持正向和反向两种迭代方式。但是由于它的所有方法(几乎100%)都需要获取同一把独占锁,因此在竞争激烈的多线程并发环境下吞吐量非常底下。
    DelayQueue1.5private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue q = new PriorityQueue();
    private Thread leader = null;
    private final Condition available = lock.newCondition();
    AbstractQueue
    BlockingQueue
    无界阻塞队列,它的队列元素只能在该元素的延迟已经结束或者说过期才能被出队。|
    DelayQueue队列元素必须是实现了Delayed接口的实例,该接口有一个getDelay方法需要实现,延迟队列就是通过实时的调用元素的该方法来判断当前元素是否延迟已经结束。
    DelayQueue是基于优先级队列来实现的,那肯定元素也要实现Comparable接口,所以实现Delayed的队列元素也必须要实现Comparable的compareTo方法。
    PriorityBlockingQueue1.5private transient Object[] queue;
    private transient int size;
    private transient Comparator<? super E> comparator;
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private transient volatile int allocationSpinLock;
    private PriorityQueue q;
    AbstractQueue
    BlockingQueue
    无限容量的阻塞队列,容量是随着放入的元素空间不够来逐步扩容的,只支持放入可比较的对象即直接或间接实现了Comparator接口的元素,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError。
    虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。
    使用了
    基于数组的平衡二叉堆
    (小的在上,大的在下方,即最小堆),使用单个ReentrantLock锁来保护公共操作
    SynchronousQueue1.5private transient volatile Transferer transferer;AbstractQueue
    BlockingQueue
    SynchronousQueue是一种特殊的阻塞队列,我们可以把它看作是一种容量为0的集合,因为所有Collection的相关方法(包括迭代器)都将把SynchronousQueue当成一个空队列处理,SynchronousQueue的主要特点就是当一个操作试图放入数据或取走数据的时候,只有出现相应的互补操作才能成功返回,SynchronousQueue可以看作是一种数据交接的工具
    SynchronousQueue的实现分别针对公平/非公平性的支持采用双队列/双堆栈来实现数据存储,虽然采用的数据结构不同,但是中心思想是一致的:当相同类型的线程(都是数据提供方或者数据接收者)到来时都将它们排队进入队列/堆栈,当与上一次入队的操作互补的线程(一方是数据提供者,另一方是数据接收者)到来时,就完成数据传递,然后它们双双出队返回。
    TransferQueue1.7boolean tryTransfer(E e);
    void transfer(E e) throws InterruptedException;
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    boolean hasWaitingConsumer();
    int getWaitingConsumerCount();
    BlockingQueueinterface
    LinkedTransferQueue1.7/** head of the queue; null until first enqueue /
    transient volatile Node head;
    /
    * tail of the queue; null until first append /
    private transient volatile Node tail;
    /
    * The number of apparent failures to unsplice removed nodes */
    private transient volatile int sweepVotes;
    private E xfer(E e, boolean haveData, int how, long nanos)
    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)
    AbstractQueue
    TransferQueue
    LinkedTransferQueue是Java并发包中最强大的基于链表的无界FIFO阻塞传输队列。LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。
    是一种双队列,即节点存在数据节点,请求节点这种互补模式,这两种模式代表可以完成数据交换,但大多时候其实现的集合方法在没有特别强调的情况下都是对有效数据节点的数据进行处理而非请求节点,因为请求节点携带的数据为null
    ConcurrentLinkedQueue1.5AbstractQueue
    Queue
    CAS代替加锁来实现的高效的非阻塞队列。虽然也有head、tail节点的概念,但是不同于LinkedBlockingQueue,它的head并不是总是指向第一个节点,tail也不一定总是指向最后一个节点,只有当当前指针距离第一个/最后一个节点有两个或更多步时,才将更新head/tail,这种减少CAS次数的设计是一种优化
    ConcurrentLinkedDeque1.7AbstractCollection
    Deque
    基于链表的无界的同时支持FIFO、LIFO的非阻塞并发双端队列
    双端队列家族中对LinkedBlockingDeque的一种高并发优化,ConcurrentLinkedDeque采用了CAS的方法来处理所以的竞争问题,保留了双端队列的所有特性,可以从对头、对尾两端插入和移除元素,它的内部实现非常精妙,既采用了ConcurrentLinkedQueue实现中用到过松弛阈值处理(即并不每一次都更新head/tail指针),又独特的针对队列中被逻辑删除节点(CAS将数据item置为null)的进行了淤积阀值合并处理和分三个阶段的节点删除步骤,同时还针对多次volatile写、普通写,多次连续的CAS操作单次生效等一系列的措施减少volatile写和CAS的次数,提高ConcurrentLinkedDeque的运行效率。
    • 注意ConcurrentLinkedDeque,ConcurrentLinkedQueue都没有继承BlockingDeque、BlockingQueue,所以它们没有阻塞等待的相关方法。

    同步工具类

    名称/@since作用使用AQSfair可重用关键字段源码解析
    Semaphore
    信号量
    1.5
    提供了资源数量的并发访问控制
    初始一个最大资源数,每次请求资源数-1
    Y
    内部实现AQS
    bothY
    acquire/release
    CountDownLatch
    计数门闩
    1.5
    允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行Y
    内部实现AQS
    shardN
    CyclicBarrier
    循环栅栏
    1.5
    所有线程都等待完成后才会继续下一步行动Y
    ReentrantLock
    exclusiveY
    reset
    Exchanger
    1.5
    线程之间交换数据NNASHIFT=7 // 避免为共享
    bound,记录最大有效的arena索引,动态变化,竞争激烈时(槽位全满)增加, 槽位空旷时减小。bound + SEQ +/- 1,其高位+ 1(SEQ,oxff + 1)确定其版本唯一性
    https://www.cnblogs.com/aniao/p/aniao_exchanger.html
    Phaser
    阶段器
    1.7
    可复用的同步屏障,与CyclicBarrier和CountDownLatch类似,但更强大Nstate
    unarrived – 还没有抵达屏障的参与者的个数 (bits 0-15)
    parties – 需要等待的参与者的个数 (bits 16-31)
    phase – 屏障所处的阶段 (bits 32-62)
    terminated – 屏障的结束标记 (bit 63 / sign)
    初始时,state的值为1,称为EMPTY,也即是unarrived = 1,其余都为0,这是一个标记,表示此phaser还没有线程来注册过
    https://www.cnblogs.com/aniao/p/aniao_phaser.html
    https://juejin.cn/post/6982588567428005901#heading-1

    在这里插入图片描述

    数据结构

    类/接口版本主要字段主要方法父类备注
    CopyOnWriteArrayList1.5/** The lock protecting all mutators /
    final transient ReentrantLock lock = new ReentrantLock();
    /
    * The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
    List
    RandomAccess
    Cloneable
    数据结构改变时加锁,通过Arrays.copyOf复制原始数据变更后重置
    CopyOnWriteArraySet1.5private final CopyOnWriteArrayList al;AbstractSet底层为CopyOnWriteArrayList,添加元素时调用CopyOnWriteArrayList的XXX Absent方法
    ConcurrentMap1.5default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction)
    default V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)
    default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)
    default V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)
    Map
    ConcurrentHashMap1.5/** The array of bins. Lazily initialized upon first insertion. Size is always a power of two.*/
    transient volatile Node<K,V>[] table;
    /** The next table to use; non-null only while resizing. /
    private transient volatile Node<K,V>[] nextTable;
    /
    * Base counter value, used mainly when there is no contention, but also as a fallback during table initialization races. Updated via CAS. /
    private transient volatile long baseCount;
    /
    * Table initialization and resizing control. /
    private transient volatile int sizeCtl;
    /
    * The next table index (plus one) to split while resizing. /
    private transient volatile int transferIndex;
    /
    * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. /
    private transient volatile int cellsBusy;
    /
    * Table of counter cells. When non-null, size is a power of 2. */
    private transient volatile CounterCell[] counterCells;
    AbstractMap
    ConcurrentMap
    通过synchronized+cas来实现了。在JDK8中只有一个数组,就是Node数组,Node就是key,value,hashcode封装出来的对象,和HashMap中的Entry一样,在JDK8中通过对Node数组的某个index位置的元素进行同步,达到该index位置的并发安全。同时内部也利用了CAS对数组的某个位置进行并发安全的赋值。
    https://cloud.tencent.com/developer/article/1873182
    ConcurrentNavigableMap1.6ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive);
    ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive);
    ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive);
    ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey);
    ConcurrentNavigableMap<K,V> headMap(K toKey);
    ConcurrentNavigableMap<K,V> tailMap(K fromKey);
    ConcurrentNavigableMap<K,V> descendingMap();
    public NavigableSet navigableKeySet();
    NavigableSet keySet();
    public NavigableSet descendingKeySet();
    ConcurrentMap
    NavigableMap
    ConcurrentSkipListMap1.6/** The topmost head index of the skiplist. /
    private transient volatile HeadIndex<K,V> head;
    /
    * The comparator used to maintain order in this map, or null if using natural ordering. (Non-private to simplify access in nested classes.) /
    final Comparator<? super K> comparator;
    /
    * Lazily initialized key set /
    private transient KeySet keySet;
    /
    * Lazily initialized entry set /
    private transient EntrySet<K,V> entrySet;
    /
    * Lazily initialized values collection /
    private transient Values values;
    /
    * Lazily initialized descending key set */
    private transient ConcurrentNavigableMap<K,V> descendingMap;
    public Map.Entry<K,V> ceilingEntry(K key)
    public K ceilingKey(K key)
    public Map.Entry<K,V> floorEntry(K key)
    public K floorKey(K key)
    public Map.Entry<K,V> lowerEntry(K key)
    public K lowerKey(K key)
    public K firstKey()
    public K lastKey()
    public Map.Entry<K,V> pollFirstEntry()
    public Map.Entry<K,V> pollLastEntry()
    AbstractMap
    ConcurrentNavigableMap
    Cloneable
    通过跳表(以空间换时间)实现的
    key是有序
    ConcurrentSkipListSet1.6private final ConcurrentNavigableMap<E,Object> m;AbstractSet
    NavigableSet
    Cloneable
    调用ConcurrentSkipListMap中对应方法

    Future

    在这里插入图片描述

    类/接口版本主要字段主要方法父类备注
    Callable1.5V call() throws Exception;@FunctionalInterface
    interface
    Future1.5boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    interface
    Runnable1.0public abstract void run();@FunctionalInterface
    interface
    RunnableFuture1.6Runnable
    Future
    interface
    FutureTask1.5/** The run state of this task /
    private volatile int state;
    /
    * The underlying callable; nulled out after running /
    private Callable callable;
    /
    * The result to return or exception to throw from get() /
    private Object outcome; // non-volatile, protected by state reads/writes
    /
    * The thread running the callable; CASed during run() /
    private volatile Thread runner;
    /
    * Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    private V report(int s) throws ExecutionException
    protected void set(V v)
    protected void setException(Throwable t)
    protected boolean runAndReset()
    private void handlePossibleCancellationInterrupt(int s)
    private void finishCompletion()
    private int awaitDone(boolean timed, long nanos) throws InterruptedException
    private void removeWaiter(WaitNode node)
    RunnableFuturePossible state transitions:
    * NEW -> COMPLETING -> NORMAL
    * NEW -> COMPLETING -> EXCEPTIONAL
    * NEW -> CANCELLED
    * NEW -> INTERRUPTING -> INTERRUPTED
    内部比较中要的是awaitDone方法。https://www.jianshu.com/p/55221d045f39
    [Doug Lea在J.U.C包里面写的BUG又被网友发现了
    Delayed1.5long getDelay(TimeUnit unit);Comparableinterface
    ScheduledFuture1.5Delayed
    Future
    interface
    RunnableScheduledFuture1.6boolean isPeriodic();RunnableFuture
    ScheduledFuture
    interface
    ScheduledFutureTask/** Sequence number to break ties FIFO /
    private final long sequenceNumber;
    /
    * The time the task is enabled to execute in nanoTime units /
    private long time;
    /
    * Period in nanoseconds for repeating tasks. ·*/
    private final long period;
    /** The actual task to be re-enqueued by reExecutePeriodic /
    RunnableScheduledFuture outerTask = this;
    /
    * Index into delay queue, to support faster cancellation. */
    int heapIndex;
    private void setNextRunTime()FutureTask
    RunnableScheduledFuture
    ScheduledThreadPoolExecutor内部类
    ForkJoinTask1.7volatile int status; // accessed directly by pool and workers
    private static final ExceptionNode[] exceptionTable;
    private static final ReentrantLock exceptionTableLock;
    private static final ReferenceQueue exceptionTableRefQueue;
    private int setCompletion(int completion)
    final int doExec()
    final void internalWait(long timeout)
    private int externalAwaitDone()
    private int externalInterruptibleAwaitDone() throws InterruptedException
    private int doJoin()
    private int doInvoke()
    final int recordExceptionalCompletion(Throwable ex)
    private int setExceptionalCompletion(Throwable ex)
    void internalPropagateException(Throwable ex)
    static final void cancelIgnoringExceptions(ForkJoinTask t)
    private void clearExceptionalCompletion()
    private Throwable getThrowableException()
    private static void expungeStaleExceptions()
    static final void helpExpungeStaleExceptions()
    static void rethrow(Throwable ex)
    private void reportException(int s)
    public final ForkJoinTask fork()
    public final V join()
    public final V invoke()
    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<> t2)
    public static void invokeAll(ForkJoinTask<>… tasks)
    public static <T extends ForkJoinTask<>> Collection invokeAll(Collection tasks)
    public abstract V getRawResult();
    protected abstract void setRawResult(V value);
    protected abstract boolean exec();
    Future内部封装类:
    static final class AdaptedRunnable extends ForkJoinTask implements RunnableFuture
    static final class AdaptedRunnableAction extends ForkJoinTask implements RunnableFuture
    static final class RunnableExecuteAction extends ForkJoinTask
    static final class AdaptedCallable extends ForkJoinTask implements RunnableFuture

    [ForkJoin框架之ForkJoinTask]
    RecursiveAction1.7protected abstract void compute();ForkJoinTaskgetResult结果为null
    RecursiveTask1.7V result;protected abstract V compute();ForkJoinTask
    CountedCompleter1.8/** This task’s completer, or null if none /
    final CountedCompleter<?> completer;
    /
    * The number of pending tasks until completion */
    volatile int pending;
    public abstract void compute();
    public void onCompletion(CountedCompleter<> caller)
    public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<> caller)
    public final CountedCompleter<> getRoot()
    public final void tryComplete()
    public final void propagateCompletion()
    public void complete(T rawResult)
    public final CountedCompleter<> firstComplete()
    public final CountedCompleter<?> nextComplete()
    public final void quietlyCompleteRoot()
    public final void helpComplete(int maxTasks)
    ForkJoinTaskForkJoin框架之CountedCompleter,工作线程及并行流
    CountedCompleter使用普通树的结构存放动作,但是它又是另类的树,因为子节点能找到父节点,父节点却找不到子节点,而只知道子节点代表的动作未执行的数量,因此或许从访问方式的角度来看还是用栈来理解更好.在这里树既是数据结构,也是一个另类的操作栈
    CompletionStage1.8public CompletionStage thenAcceptAsync(Consumer<? super T> action, Executor executor);
    public CompletionStage thenRunAsync(Runnable action, Executor executor);
    public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
    public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
    public CompletionStage runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
    public CompletionStage runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
    public CompletionStage thenApplyAsync (Function<? super T,? extends U> fn, Executor executor);
    public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);
    public CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor);
    public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
    public <U,V> CompletionStage thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
    public CompletionStage whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
    public CompletionStage exceptionally(Function<Throwable, ? extends T> fn);
    public CompletableFuture toCompletableFuture();
    interface
    CompletableFuture1.8// Either the result or boxed AltResult
    volatile Object result;
    // Top of Treiber stack of dependent actions
    volatile Completion stack;
    private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    final boolean internalComplete(Object r)
    final boolean tryPushStack(Completion c)
    final Object encodeXXX
    final boolean completeXXX
    private static T reportGet(Object r) throws InterruptedException, ExecutionException
    private static T reportJoin(Object r)
    static Executor screenExecutor(Executor e)
    final void postComplete()
    final void cleanStack()
    final void push(UniCompletion<?,?> c)
    final CompletableFuture postFire(CompletableFuture<?> a, int mode)

    public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
    public static CompletableFuture runAsync(Runnable runnable, Executor executor)
    public static CompletableFuture completedFuture(U value)
    public static CompletableFuture allOf(CompletableFuture<?>… cfs)
    public static CompletableFuture anyOf(CompletableFuture<?>… cfs)
    Future
    CompletionStage
    ForkJoin框架之CompletableFuture

    Executor

    在这里插入图片描述

    类/接口版本主要字段主要方法父类备注
    Executor1.5void execute(Runnable command);interface
    ExecutorService1.5void shutdown();
    List shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    Future submit(Callable task);
    Future submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException;
    List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException;
    T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    Executorinterface
    AbstractExecutorService1.5protected RunnableFuture newTaskFor(Runnable runnable, T value)
    protected RunnableFuture newTaskFor(Callable callable)
    private T doInvokeAny(Collection<? extends Callable> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException
    ExecutorServiceabstract class
    ThreadPoolExecutor1.5/** runState is stored in the high-order bits /
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    /
    * The queue used for holding tasks and handing off to worker threads. */
    private final BlockingQueue workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet workers = new HashSet();
    private final Condition termination = mainLock.newCondition();
    private int largestPoolSize;
    private long completedTaskCount;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private void advanceRunState(int targetState)
    final void tryTerminate()
    private void checkShutdownAccess()
    private void interruptWorkers()
    private void interruptIdleWorkers(boolean onlyOne)
    private void interruptIdleWorkers()
    final void reject(Runnable command)
    void onShutdown()
    final boolean isRunningOrShutdown(boolean shutdownOK)
    private List drainQueue()
    private boolean addWorker(Runnable firstTask, boolean core)
    private void addWorkerFailed(Worker w)
    private void processWorkerExit(Worker w, boolean completedAbruptly)
    private Runnable getTask()
    final void runWorker(Worker w)
    public void execute(Runnable command)
    public void shutdown()
    public List shutdownNow()
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
    protected void finalize()
    public boolean prestartCoreThread()
    void ensurePrestart()
    public int prestartAllCoreThreads()
    public boolean remove(Runnable task)
    public void purge()
    protected void beforeExecute(Thread t, Runnable r)
    protected void afterExecute(Runnable r, Throwable t)
    protected void terminated()
    AbstractExecutorServiceThreadPoolExecutor源码解析
    核心线程数和最大线程数与阻塞队列相关联。
    根据提供的getter/setter方法,可以动态调整线程池
    内置4个拒绝策略,可实现RejectedExecutionHandler接口自定义拒绝策略
    线程池大小可根据CPU密集/IO密集、任务平均执行时长决定。最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
    ScheduledExecutorService1.5public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
    /** initialDelay -> initialDelay+period -> initialDelay + 2 * period */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    /** initialDelay -> over+period*/
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
    ExecutorServiceinterface
    ScheduledThreadPoolExecutor1.5/** False if should cancel/suppress periodic tasks on shutdown. /
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;
    /
    * False if should cancel non-periodic tasks on shutdown. /
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
    /
    * True if ScheduledFutureTask.cancel should remove from queue /
    private volatile boolean removeOnCancel = false;
    /
    * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied entries. */
    private static final AtomicLong sequencer = new AtomicLong();
    boolean canRunInCurrentRunState(boolean periodic)
    private void delayedExecute(RunnableScheduledFuture<?> task)
    void reExecutePeriodic(RunnableScheduledFuture<?> task)
    protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task)
    protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task)
    private long triggerTime(long delay, TimeUnit unit)
    long triggerTime(long delay)
    private long overflowFree(long delay)
    ThreadPoolExecutor
    ScheduledExecutorService
    将任务封装成ScheduledFutureTask对象,ScheduledFutureTask基于相对时间,不受系统时间的改变所影响
    内部定义了一个DelayedWorkQueue,它是一个有序队列,会通过每个任务按照距离下次执行时间间隔的大小来排序
    https://www.jianshu.com/p/925dba9f5969
    ForkJoinPool1.7public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
    static final ForkJoinPool common;
    static final int commonParallelism;
    private static int commonMaxSpares;
    private static int poolNumberSequence;
    // main pool control
    volatile long ctl;
    // lockable status
    volatile int runState;
    // parallelism, mode
    final int config;
    // to generate worker index
    int indexSeed;
    // main registry
    volatile WorkQueue[] workQueues;
    final ForkJoinWorkerThreadFactory factory;
    // per-worker UEH
    final UncaughtExceptionHandler ueh;
    // to create worker name string
    final String workerNamePrefix;
    // also used as sync monitor
    volatile AtomicLong stealCounter;
    private int lockRunState()
    private int awaitRunStateLock()
    private void unlockRunState(int oldRunState, int newRunState)
    private boolean createWorker()
    private void tryAddWorker(long c)
    final WorkQueue registerWorker(ForkJoinWorkerThread wt)
    final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)
    final void signalWork(WorkQueue[] ws, WorkQueue q)
    private boolean tryRelease(long c, WorkQueue v, long inc)
    final void runWorker(WorkQueue w)
    private ForkJoinTask<?> scan(WorkQueue w, int r)
    private boolean awaitWork(WorkQueue w, int r)
    final int helpComplete(WorkQueue w, CountedCompleter<?> task, int maxTasks)
    private void helpStealer(WorkQueue w, ForkJoinTask<?> task)
    private boolean tryCompensate(WorkQueue w)
    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline)
    private WorkQueue findNonEmptyStealQueue()
    final void helpQuiescePool(WorkQueue w)
    final ForkJoinTask<?> nextTaskFor(WorkQueue w)
    static int getSurplusQueuedTaskCount()
    private boolean tryTerminate(boolean now, boolean enable)
    private void externalSubmit(ForkJoinTask<?> task)
    final void externalPush(ForkJoinTask<?> task)
    static WorkQueue commonSubmitterQueue()
    final boolean tryExternalUnpush(ForkJoinTask<?> task)
    final int externalHelpComplete(CountedCompleter<?> task, int maxTasks)
    public T invoke(ForkJoinTask task)
    public void execute(ForkJoinTask<?> task)
    public long getStealCount()
    public long getQueuedTaskCount()
    AbstractExecutorServiceForkJoin框架之ForkJoinPool
    不同于其他线程池,它的构建不需要提供核心线程数,最大线程数,阻塞队列等,还增加了未捕获异常处理器,而该处理器会交给工作线程,由该线程处理,这样的好处在于当一个线程的工作队列上的某个任务出现异常时,不至于结束掉线程,而是让它继续运行队列上的其他任务。
    依托于并行度(或默认根据核数计算)来决定最大线程数,它内部维护了WorkQueue数组ws取代了阻塞队列,ws中下标为奇数的为工作线程的所属队列,偶数的为共享队列,虽然名称有所区分,但重要的区别只有一点:共享队列不存在工作线程。
    维护了一个ctl控制信号,前16位表示活跃worker数,33至48位表示worker总数,后32位可以粗略理解用于表示worker等待队列的栈顶
    对全局全状的修改需要加锁进行,这些操作如修改ctl(改变栈顶,增删活跃数或总数等),处理ws中的元素,扩容ws,关闭线程池,初始化(包含ws的初始化),注册线程入池等。而这个锁就是runState,它除了当锁,也间接表示了运行状态,相应的线程池的SHUTDOWN,STOP,TERMINATED等状态均与其相应的位有关
    关于工作窃取,线程池外的提交者在join一个任务或get结果时,如果发现没有完成,它不会干等着工作线程,而是尝试自行执行,当执行方法结束,任务还没有完成的情况,它可以帮助工作线程做一些其他工作
    CompletionServiceFuture submit(Callable task);
    Future submit(Runnable task, V result);
    Future take() throws InterruptedException;
    Future poll();
    Future poll(long timeout, TimeUnit unit) throws InterruptedException;
    一种将新异步任务的生产与已完成任务的结果消耗相分离的服务。 生产者submit执行任务。 消费者take完成任务并按照完成的顺序处理其结果。
    ExecutorCompletionServiceprivate final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future> completionQueue;

    Executors

    方法名入参实际调用
    public static ExecutorService newFixedThreadPoolint nThreadsThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
    public static ExecutorService newFixedThreadPoolint nThreads, ThreadFactory threadFactoryThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)
    public static ExecutorService newSingleThreadExecutorFinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()))
    public static ExecutorService newSingleThreadExecutorThreadFactory threadFactoryFinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory))
    public static ExecutorService newCachedThreadPoolThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())
    public static ExecutorService newCachedThreadPoolThreadFactory threadFactoryThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory)
    public static ExecutorService unconfigurableExecutorServiceExecutorService executorDelegatedExecutorService(executor)
    public static ScheduledExecutorService newSingleThreadScheduledExecutorDelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1))
    public static ScheduledExecutorService newSingleThreadScheduledExecutorThreadFactory threadFactoryDelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory))
    public static ScheduledExecutorService newScheduledThreadPoolint corePoolSizeScheduledThreadPoolExecutor(corePoolSize)
    public static ScheduledExecutorService newScheduledThreadPoolint corePoolSize, ThreadFactory threadFactoryScheduledThreadPoolExecutor(corePoolSize, threadFactory)
    public static ScheduledExecutorService unconfigurableScheduledExecutorServiceScheduledExecutorService executorDelegatedScheduledExecutorService(executor)
    public static ExecutorService newWorkStealingPoolint parallelismForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)
    public static ExecutorService newWorkStealingPoolForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory,vnull, true)
    public static ThreadFactory defaultThreadFactoryDefaultThreadFactory()
    public static ThreadFactory privilegedThreadFactoryPrivilegedThreadFactory()
    public static Callable callableRunnable task, T resultRunnableAdapter(task, result)
    public static Callable callableRunnable taskRunnableAdapter(task, null)
    public static Callable callablefinal PrivilegedAction<?> actionCallable() { public Object call() { return action.run(); }}
    public static Callable callablefinal PrivilegedExceptionAction<?> actionCallable() { public Object call() throws Exception { return action.run(); }}
    public static Callable privilegedCallableCallable callablePrivilegedCallable(callable)
    public static Callable privilegedCallableUsingCurrentClassLoaderCallable callablePrivilegedCallableUsingCurrentClassLoader(callable)
  • 相关阅读:
    【导入】spice In 导入网表-device map的设置
    【Redis】第1讲 互联网架构的演变历程
    Python Project Getting started with Django
    【JVM技术专题】「原理专题」让我们一起探索一下Netty(Java)底层的“零拷贝Zero-Copy”技术(上)
    网工内推 | 运维专场,厂商、软考证书优先,五险一金,节日福利
    四. node小工具(nodemon/supervisor)
    使用Specification与Example方式实现动态条件查询案例
    生命周期,vue中的axios,ref引用($refs),$nextTick
    小波神经网络的基本原理,小波神经网络算法原理
    Java多线程(5)----浅谈读写锁
  • 原文地址:https://blog.csdn.net/love__guo/article/details/125448211