| 类型 | @since | 关键字段 | 父类 |
|---|---|---|---|
| AtomicBoolean | 1.5 | private volatile int value; private static final long valueOffset; | |
| AtomicInteger | 1.5 | private volatile int value; private static final long valueOffset; | abstract class Number |
| AtomicLong | 1.5 | private volatile long value; private static final long valueOffset; static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); //底层JVM是否支持CAS | abstract class Number |
| 类型 | @since | 关键字段 | 父类 |
|---|---|---|---|
| AtomicIntegerArray | 1.5 | private final int[] array; private static final int base = unsafe.arrayBaseOffset(int[].class); private static final int shift = 31 - Integer.numberOfLeadingZeros(unsafe.arrayIndexScale(int[].class)); | |
| AtomicLongArray | 1.5 | private final long[] array; private static final int base = unsafe.arrayBaseOffset(long[].class); private static final int shift = 31 - Integer.numberOfLeadingZeros(unsafe.arrayIndexScale(long[].class)); |
| 类型 | @since | 关键字段 | 备注 |
|---|---|---|---|
| AtomicReference | 1.5 | private volatile V value; private static final long valueOffset; | |
| AtomicReferenceArray | 1.5 | private final Object[] array; private static final long arrayFieldOffset = unsafe.objectFieldOffset(AtomicReferenceArray.class.getDeclaredField(“array”)); private static final int base = unsafe.arrayBaseOffset(Object[].class); private static final int shift= 31 - Integer.numberOfLeadingZeros(unsafe.arrayIndexScale(Object[].class)); | |
| AtomicMarkableReference | 1.5 | private volatile Pair pair; private static final long pairOffset = objectFieldOffset(UNSAFE, “pair”, AtomicMarkableReference.class); | 使用boolean mark来标记reference是否被修改过 |
| AtomicStampedReference | 1.5 | private volatile Pair pair; private static final long pairOffset = objectFieldOffset(UNSAFE, “pair”, AtomicStampedReference.class); | 使用int stamp来标记reference是否被修改过 |
| 类型 | @since | 关键字段 | 备注 |
|---|---|---|---|
| AtomicIntegerFieldUpdater | 1.5 | private final long offset; private final Class<?> cclass; private final Class tclass; | 内部实现类AtomicIntegerFieldUpdaterImpl |
| AtomicLongFieldUpdater | 1.5 | private final long offset; private final Class<?> cclass; private final Class tclass; | 内部实现类CASUpdater(底层CAS)、LockedUpdater |
| AtomicReferenceFieldUpdater | 1.5 | private final long offset; private final Class<?> cclass; private final Class tclass; /** class holding the field / /* field value type */ private final Class vclass; | 内部实现类AtomicReferenceFieldUpdaterImpl |
| 类型 | @since | 关键字段 | 父类 |
|---|---|---|---|
| Striped64 | // size is a power of 2. transient volatile Cell[] cells; transient volatile long base; // Spinlock (locked via CAS) used when resizing and/or creating Cells. transient volatile int cellsBusy; | Number | |
| LongAccumulator | 1.8 | private final LongBinaryOperator function; private final long identity; | Striped64 |
| LongAdder | 1.8 | Striped64 | |
| DoubleAccumulator | 1.8 | private final DoubleBinaryOperator function; private final long identity; | Striped64 |
| DoubleAdder | 1.8 | Striped64 |
Striped64中两个重要方法:
// 处理涉及初始化、扩容、cell初始化、竞争的情况
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended)
Adder是Accumulater的特殊情况–累加运算,Accumulater可自定义运算。
DoubleXXX基于LongXXX的区别在于,内部会将Double转换成Long,其余一样。
相较于AtomicXXX,使用了“热点分离”的思想,用空间换时间,减少了并发冲突。
| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| AbstractOwnableSynchronizer | 1.6 | /** The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread; | get/setExclusiveOwnerThread | ||
| AbstractQueuedSynchronizer | 1.5 | private transient volatile Node head; private transient volatile Node tail; /** The synchronization state. */ private volatile int state; | // 独占式获取同步状态 boolean tryAcquire(int arg) // 独占式释放同步状态 boolean tryRelease(int arg) // 共享式获取同步状态 int tryAcquireShared(int arg) // 共享式私房同步状态 boolean tryReleaseShared(int arg) // 检测当前线程是否获取独占锁 boolean isHeldExclusively() // ---------------------------- acquireXXX/releaseXXX | AbstractOwnableSynchronizer | 模板方法,子类根据需要实现独占/共享获取和释放状态。入口方法是acquireXXX和releaseXXX方法。 |
| AbstractQueuedLongSynchronizer | 1.6 | private transient volatile Node head; private transient volatile Node tail; private volatile long state; | 同上 | AbstractOwnableSynchronizer | |
| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| Condition | 1.5 | await XXX signal/signalAll | |||
| ConditionObject | private transient Node firstWaiter; private transient Node lastWaiter; | Condition | AbstractQueuedSynchronizer、AbstractQueuedLongSynchronizer内部类。 |
| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| LockSupport | public static void unpark(Thread thread) public static void park XXX | 内部调用unsafe.park和unsafe.unpark方法 |
| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| Lock | 1.5 | lock XXX void unlock(); Condition newCondition(); | |||
| ReadWriteLock | 1.5 | Lock readLock(); Lock writeLock(); | |||
| ReentrantLock | 1.5 | private final Sync sync; | Lock | 内部自定义AQS实现类,分为公平锁和非公平锁。实现AQS的tryAcquire/tryRelease方法组 | |
| ReentrantReadWriteLock | 1.5 | private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; | public int getReadLockCount() public boolean isWriteLocked() public boolean isWriteLockedByCurrentThread() public int getWriteHoldCount() public int getReadHoldCount() | ReadWriteLock | 采用读写分离的策略,允许多个线程可以同时获取读锁,满足写少读多的场景。Sync实现AQS的独占、共享模板方法。通过readLock和writeLock获取不同的锁调用对应方法。其中state标识分为两个short,高16位位读锁计数,低16位为写锁计数 https://cloud.tencent.com/developer/article/1469555 |
| StampedLock | 1.8 | /** Head of CLH queue / private transient volatile WNode whead; /* Tail (last) of CLH queue / private transient volatile WNode wtail; // views transient ReadLockView readLockView; transient WriteLockView writeLockView; transient ReadWriteLockView readWriteLockView; /* Lock sequence/state / private transient volatile long state; /* extra reader count when state read count saturated */ private transient int readerOverflow; | public long writeLock() public long tryWriteLock() public long readLock() public long tryReadLock() public long tryOptimisticRead() public boolean validate(long stamp) public void unlockWrite(long stamp) public void unlockRead(long stamp) public void unlock(long stamp) public long tryConvertToWriteLock(long stamp) public long tryConvertToReadLock(long stamp) public long tryConvertToOptimisticRead(long stamp) public boolean tryUnlockRead() public boolean tryUnlockWrite() private long cancelWaiter(WNode node, WNode group, boolean interrupted) private long acquireRead(boolean interruptible, long deadline) private long acquireWrite(boolean interruptible, long deadline) | 对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。 StampedLockd的内部实现是基于CLH锁的(锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程)。 写锁被占用的标志是第8位为1,读锁使用0-7位,正常情况下读锁数目为1-126,超过126时,使用一个名为 readerOverflow的int整型保存超出数。同时高56为用于版本校验(validate方法)https://cloud.tencent.com/developer/article/1470988 |

| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| Queue | 1.5 | // true或抛出异常 boolean add(E e) // true或false boolean offer(E e) // 空队列,抛出异常 E remove() // 空队列,返回null E poll() // 检索队列头,空队列,抛出异常 E element() // 检索队列头,空队列返回null E peek() | Collection | interface java.util.Queue | |
| BlockingQueue | 1.5 | // 等待插入 void put(E e) throws InterruptedException; // 等待获取 E take() throws InterruptedException; int remainingCapacity(); int drainTo(Collection<? super E> c, int maxElements); | Queue | interface | |
| Deque | 1.6 | XXXFirst/XXXLast // 检索数据,空队列抛出异常 E getFirst();/E getLast(); // 删除指定元素 boolean removeFirstOccurrence(Object o); // 同addFirst void push(E e); // 同removeFirst E pop(); Iterator iterator(); Iterator descendingIterator(); | Queue | interface java.util.Queue | |
| BlockingDeque | 1.6 | // 等待获取 E takeFirst() throws InterruptedException; E takeLast() throws InterruptedException; | BlockingQueue、Deque | interface | |
| AbstractQueue | 1.5 | AbstractCollection Queue | abstract add内部调用offer remove内部调用poll element内部调用peek | ||
| ArrayBlockingQueue | 1.5 | /** The queued items / final Object[] items; /* items index for next take, poll, peek or remove / int takeIndex; /* items index for next put, offer, or add / int putIndex; /* Number of elements in the queue / int count; /* Main lock guarding all access / final ReentrantLock lock; /* Condition for waiting takes / private final Condition notEmpty; /* Condition for waiting puts */ private final Condition notFull; | private static void checkNotNull(Object v) private void enqueue(E x) private E dequeue() | AbstractQueue BlockingQueue | Itrs迭代器管理器将所有的迭代器都用链表的形式进行保存,每一次有新的迭代器被注册时都将其加到该迭代器链表的头节点 IteratorSpliterator的迭代实现forEachRemaining是直接采用传统的Iterator迭代根数据源ArrayBlockingQueue(就是利用的ArrayBlockingQueue本身的Iterator实现),所以在迭代的过程中,队列如果发生更新,迭代器也会同步更新状态。但是拆分后的迭代器ArraySpliterator是在拆分的时候将顶层迭代器的数据的一半直接拷贝到一个新的数组中,在迭代的时候直接通过数组下标访问该数组,所以这已经脱离了源数据ArrayBlockingQueue,其内部元素并不会随着队列的变化而被更新。 ArrayBlockingQueue内部的几乎每一个操作方法都需要先获取同一个ReentrantLock独占锁才能进行,这极大的降低了吞吐量,几乎每个操作都会阻塞其它操作,最主要是插入操作和取出操作相互之间互斥。所以ArrayBlockingQueue不适用于需要高吞吐量的高效率数据生成与消费场景。 |
| LinkedBlockingQueue | 1.5 | /** The capacity bound, or Integer.MAX_VALUE if none / private final int capacity; /* Current number of elements / private final AtomicInteger count = new AtomicInteger(); /* Head of linked list.Invariant: head.item == null / transient Node head; /* Tail of linked list. Invariant: last.next == null / private transient Node last; /* Lock held by take, poll, etc / private final ReentrantLock takeLock = new ReentrantLock(); /* Wait queue for waiting takes / private final Condition notEmpty = takeLock.newCondition(); /* Lock held by put, offer, etc / private final ReentrantLock putLock = new ReentrantLock(); /* Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); | AbstractQueue BlockingQueue | 迭代器保证了其弱一致性,除了首个有效节点在创建迭代器实例的时候就已经被保留下来之外(所以在获取迭代器实例之后,就算移除了头节点it.next也会返回该节点),队列中其它节点的变更都能被迭代器同步更新。LinkedBlockimgQueue的迭代器少了ArrayBlockingQueue那样很多精密的实现例如对于GC的友好性,所以使用多个迭代器实例可能内存性能有不可预测性。 LBQSpliterator,第一次只拆一个元素,第二次拆2个,第三次拆三个,依次内推,拆分的次数越多,后面的迭代器分的得元素越多,直到一个很大的数MAX_BATCH(33554432) ,后面的迭代器每次都分到这么多的元素。tryAdvance、forEachRemaining、trySplit方法都是需要获取fullLock的,所以注意对吞吐量的影响 有一个专门的head节点,所有的有效节点都移除挂载到head节点之后,采用两个独立的可重入锁分别对出入队进行加锁,所以吞吐量有了很大的提高,但是它还是有很多方法(remove ,clear ,toArray, toString以及迭代器相关的方法)需要同时获取两个锁才能操作,这无疑会影响吞吐量,所以要合理使用。另外它在实现的时候创建了额外的Node节点实例来绑定真实数据,所以对内存的消耗稍微要多一些。最后它通过将从头部出队的节点的下一个节点指向自身的方法不但保障了迭代器的弱一致性而且解决了GC的跨代引用无法回收的问题,但是remove方法依然可能会导致GC无法释放,所以需要慎重使用。 | |
| LinkedBlockingDeque | 1.6 | /** Pointer to first node. Invariant: (first == null && last == null) or (first.prev == null && first.item != null) / transient Node first; /* Pointer to last node. Invariant: (first == null && last == null) or (last.next == null && last.item != null) / transient Node last; /* Number of items in the deque / private transient int count; /* Maximum number of items in the deque / private final int capacity; /* Main lock guarding all access / final ReentrantLock lock = new ReentrantLock(); /* Condition for waiting takes / private final Condition notEmpty = lock.newCondition(); /* Condition for waiting puts */ private final Condition notFull = lock.newCondition(); | AbstractQueue BlockingDeque | 以单个锁实现的基于链表的可选容量的双端队列,可以从对头、对尾两端插入和移除元素,这主要是通过节点同时持有前驱节点和后继节点的引用来达到同时支持正向反向的操作,它的迭代器支持正向和反向两种迭代方式。但是由于它的所有方法(几乎100%)都需要获取同一把独占锁,因此在竞争激烈的多线程并发环境下吞吐量非常底下。 | |
| DelayQueue | 1.5 | private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue q = new PriorityQueue(); private Thread leader = null; private final Condition available = lock.newCondition(); | AbstractQueue BlockingQueue | 无界阻塞队列,它的队列元素只能在该元素的延迟已经结束或者说过期才能被出队。| DelayQueue队列元素必须是实现了Delayed接口的实例,该接口有一个getDelay方法需要实现,延迟队列就是通过实时的调用元素的该方法来判断当前元素是否延迟已经结束。 DelayQueue是基于优先级队列来实现的,那肯定元素也要实现Comparable接口,所以实现Delayed的队列元素也必须要实现Comparable的compareTo方法。 | |
| PriorityBlockingQueue | 1.5 | private transient Object[] queue; private transient int size; private transient Comparator<? super E> comparator; private final ReentrantLock lock; private final Condition notEmpty; private transient volatile int allocationSpinLock; private PriorityQueue q; | AbstractQueue BlockingQueue | 无限容量的阻塞队列,容量是随着放入的元素空间不够来逐步扩容的,只支持放入可比较的对象即直接或间接实现了Comparator接口的元素,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError。 虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。 使用了基于数组的平衡二叉堆(小的在上,大的在下方,即最小堆),使用单个ReentrantLock锁来保护公共操作 | |
| SynchronousQueue | 1.5 | private transient volatile Transferer transferer; | AbstractQueue BlockingQueue | SynchronousQueue是一种特殊的阻塞队列,我们可以把它看作是一种容量为0的集合,因为所有Collection的相关方法(包括迭代器)都将把SynchronousQueue当成一个空队列处理,SynchronousQueue的主要特点就是当一个操作试图放入数据或取走数据的时候,只有出现相应的互补操作才能成功返回,SynchronousQueue可以看作是一种数据交接的工具 SynchronousQueue的实现分别针对公平/非公平性的支持采用双队列/双堆栈来实现数据存储,虽然采用的数据结构不同,但是中心思想是一致的:当相同类型的线程(都是数据提供方或者数据接收者)到来时都将它们排队进入队列/堆栈,当与上一次入队的操作互补的线程(一方是数据提供者,另一方是数据接收者)到来时,就完成数据传递,然后它们双双出队返回。 | |
| TransferQueue | 1.7 | boolean tryTransfer(E e); void transfer(E e) throws InterruptedException; boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; boolean hasWaitingConsumer(); int getWaitingConsumerCount(); | BlockingQueue | interface | |
| LinkedTransferQueue | 1.7 | /** head of the queue; null until first enqueue / transient volatile Node head; /* tail of the queue; null until first append / private transient volatile Node tail; /* The number of apparent failures to unsplice removed nodes */ private transient volatile int sweepVotes; | private E xfer(E e, boolean haveData, int how, long nanos) private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) | AbstractQueue TransferQueue | LinkedTransferQueue是Java并发包中最强大的基于链表的无界FIFO阻塞传输队列。LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。 是一种双队列,即节点存在数据节点,请求节点这种互补模式,这两种模式代表可以完成数据交换,但大多时候其实现的集合方法在没有特别强调的情况下都是对有效数据节点的数据进行处理而非请求节点,因为请求节点携带的数据为null |
| ConcurrentLinkedQueue | 1.5 | AbstractQueue Queue | CAS代替加锁来实现的高效的非阻塞队列。虽然也有head、tail节点的概念,但是不同于LinkedBlockingQueue,它的head并不是总是指向第一个节点,tail也不一定总是指向最后一个节点,只有当当前指针距离第一个/最后一个节点有两个或更多步时,才将更新head/tail,这种减少CAS次数的设计是一种优化 | ||
| ConcurrentLinkedDeque | 1.7 | AbstractCollection Deque | 基于链表的无界的同时支持FIFO、LIFO的非阻塞并发双端队列 双端队列家族中对LinkedBlockingDeque的一种高并发优化,ConcurrentLinkedDeque采用了CAS的方法来处理所以的竞争问题,保留了双端队列的所有特性,可以从对头、对尾两端插入和移除元素,它的内部实现非常精妙,既采用了ConcurrentLinkedQueue实现中用到过松弛阈值处理(即并不每一次都更新head/tail指针),又独特的针对队列中被逻辑删除节点(CAS将数据item置为null)的进行了淤积阀值合并处理和分三个阶段的节点删除步骤,同时还针对多次volatile写、普通写,多次连续的CAS操作单次生效等一系列的措施减少volatile写和CAS的次数,提高ConcurrentLinkedDeque的运行效率。 |
| 名称/@since | 作用 | 使用AQS | fair | 可重用 | 关键字段 | 源码解析 |
|---|---|---|---|---|---|---|
| Semaphore 信号量 1.5 | 提供了资源数量的并发访问控制 初始一个最大资源数,每次请求资源数-1 | Y 内部实现AQS | both | Y acquire/release | ||
| CountDownLatch 计数门闩 1.5 | 允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行 | Y 内部实现AQS | shard | N | ||
| CyclicBarrier 循环栅栏 1.5 | 所有线程都等待完成后才会继续下一步行动 | Y ReentrantLock | exclusive | Y reset | ||
| Exchanger 1.5 | 线程之间交换数据 | N | N | ASHIFT=7 // 避免为共享 bound,记录最大有效的arena索引,动态变化,竞争激烈时(槽位全满)增加, 槽位空旷时减小。bound + SEQ +/- 1,其高位+ 1(SEQ,oxff + 1)确定其版本唯一性 | https://www.cnblogs.com/aniao/p/aniao_exchanger.html | |
| Phaser 阶段器 1.7 | 可复用的同步屏障,与CyclicBarrier和CountDownLatch类似,但更强大 | N | state unarrived – 还没有抵达屏障的参与者的个数 (bits 0-15) parties – 需要等待的参与者的个数 (bits 16-31) phase – 屏障所处的阶段 (bits 32-62) terminated – 屏障的结束标记 (bit 63 / sign) 初始时,state的值为1,称为EMPTY,也即是unarrived = 1,其余都为0,这是一个标记,表示此phaser还没有线程来注册过 | https://www.cnblogs.com/aniao/p/aniao_phaser.html https://juejin.cn/post/6982588567428005901#heading-1 |

| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| CopyOnWriteArrayList | 1.5 | /** The lock protecting all mutators / final transient ReentrantLock lock = new ReentrantLock(); /* The array, accessed only via getArray/setArray. */ private transient volatile Object[] array; | List RandomAccess Cloneable | 数据结构改变时加锁,通过Arrays.copyOf复制原始数据变更后重置 | |
| CopyOnWriteArraySet | 1.5 | private final CopyOnWriteArrayList al; | AbstractSet | 底层为CopyOnWriteArrayList,添加元素时调用CopyOnWriteArrayList的XXX Absent方法 | |
| ConcurrentMap | 1.5 | default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) default V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) default V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) | Map | ||
| ConcurrentHashMap | 1.5 | /** The array of bins. Lazily initialized upon first insertion. Size is always a power of two.*/ transient volatile Node<K,V>[] table; /** The next table to use; non-null only while resizing. / private transient volatile Node<K,V>[] nextTable; /* Base counter value, used mainly when there is no contention, but also as a fallback during table initialization races. Updated via CAS. / private transient volatile long baseCount; /* Table initialization and resizing control. / private transient volatile int sizeCtl; /* The next table index (plus one) to split while resizing. / private transient volatile int transferIndex; /* Spinlock (locked via CAS) used when resizing and/or creating CounterCells. / private transient volatile int cellsBusy; /* Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells; | AbstractMap ConcurrentMap | 通过synchronized+cas来实现了。在JDK8中只有一个数组,就是Node数组,Node就是key,value,hashcode封装出来的对象,和HashMap中的Entry一样,在JDK8中通过对Node数组的某个index位置的元素进行同步,达到该index位置的并发安全。同时内部也利用了CAS对数组的某个位置进行并发安全的赋值。 https://cloud.tencent.com/developer/article/1873182 | |
| ConcurrentNavigableMap | 1.6 | ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive); ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive); ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive); ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey); ConcurrentNavigableMap<K,V> headMap(K toKey); ConcurrentNavigableMap<K,V> tailMap(K fromKey); ConcurrentNavigableMap<K,V> descendingMap(); public NavigableSet navigableKeySet(); NavigableSet keySet(); public NavigableSet descendingKeySet(); | ConcurrentMap NavigableMap | ||
| ConcurrentSkipListMap | 1.6 | /** The topmost head index of the skiplist. / private transient volatile HeadIndex<K,V> head; /* The comparator used to maintain order in this map, or null if using natural ordering. (Non-private to simplify access in nested classes.) / final Comparator<? super K> comparator; /* Lazily initialized key set / private transient KeySet keySet; /* Lazily initialized entry set / private transient EntrySet<K,V> entrySet; /* Lazily initialized values collection / private transient Values values; /* Lazily initialized descending key set */ private transient ConcurrentNavigableMap<K,V> descendingMap; | public Map.Entry<K,V> ceilingEntry(K key) public K ceilingKey(K key) public Map.Entry<K,V> floorEntry(K key) public K floorKey(K key) public Map.Entry<K,V> lowerEntry(K key) public K lowerKey(K key) public K firstKey() public K lastKey() public Map.Entry<K,V> pollFirstEntry() public Map.Entry<K,V> pollLastEntry() | AbstractMap ConcurrentNavigableMap Cloneable | 通过跳表(以空间换时间)实现的 key是有序的 |
| ConcurrentSkipListSet | 1.6 | private final ConcurrentNavigableMap<E,Object> m; | AbstractSet NavigableSet Cloneable | 调用ConcurrentSkipListMap中对应方法 |

| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| Callable | 1.5 | V call() throws Exception; | @FunctionalInterface interface | ||
| Future | 1.5 | boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; | interface | ||
| Runnable | 1.0 | public abstract void run(); | @FunctionalInterface interface | ||
| RunnableFuture | 1.6 | Runnable Future | interface | ||
| FutureTask | 1.5 | /** The run state of this task / private volatile int state; /* The underlying callable; nulled out after running / private Callable callable; /* The result to return or exception to throw from get() / private Object outcome; // non-volatile, protected by state reads/writes /* The thread running the callable; CASed during run() / private volatile Thread runner; /* Treiber stack of waiting threads */ private volatile WaitNode waiters; | private V report(int s) throws ExecutionException protected void set(V v) protected void setException(Throwable t) protected boolean runAndReset() private void handlePossibleCancellationInterrupt(int s) private void finishCompletion() private int awaitDone(boolean timed, long nanos) throws InterruptedException private void removeWaiter(WaitNode node) | RunnableFuture | Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED 内部比较中要的是awaitDone方法。https://www.jianshu.com/p/55221d045f39 [Doug Lea在J.U.C包里面写的BUG又被网友发现了 |
| Delayed | 1.5 | long getDelay(TimeUnit unit); | Comparable | interface | |
| ScheduledFuture | 1.5 | Delayed Future | interface | ||
| RunnableScheduledFuture | 1.6 | boolean isPeriodic(); | RunnableFuture ScheduledFuture | interface | |
| ScheduledFutureTask | /** Sequence number to break ties FIFO / private final long sequenceNumber; /* The time the task is enabled to execute in nanoTime units / private long time; /* Period in nanoseconds for repeating tasks. ·*/ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic / RunnableScheduledFuture outerTask = this; /* Index into delay queue, to support faster cancellation. */ int heapIndex; | private void setNextRunTime() | FutureTask RunnableScheduledFuture | ScheduledThreadPoolExecutor内部类 | |
| ForkJoinTask | 1.7 | volatile int status; // accessed directly by pool and workers private static final ExceptionNode[] exceptionTable; private static final ReentrantLock exceptionTableLock; private static final ReferenceQueue exceptionTableRefQueue; | private int setCompletion(int completion) final int doExec() final void internalWait(long timeout) private int externalAwaitDone() private int externalInterruptibleAwaitDone() throws InterruptedException private int doJoin() private int doInvoke() final int recordExceptionalCompletion(Throwable ex) private int setExceptionalCompletion(Throwable ex) void internalPropagateException(Throwable ex) static final void cancelIgnoringExceptions(ForkJoinTask t) private void clearExceptionalCompletion() private Throwable getThrowableException() private static void expungeStaleExceptions() static final void helpExpungeStaleExceptions() static void rethrow(Throwable ex) private void reportException(int s) public final ForkJoinTask fork() public final V join() public final V invoke() public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<> t2) public static void invokeAll(ForkJoinTask<>… tasks) public static <T extends ForkJoinTask<>> Collection invokeAll(Collection tasks) public abstract V getRawResult(); protected abstract void setRawResult(V value); protected abstract boolean exec(); | Future | 内部封装类: static final class AdaptedRunnable extends ForkJoinTask implements RunnableFuture static final class AdaptedRunnableAction extends ForkJoinTask implements RunnableFuture static final class RunnableExecuteAction extends ForkJoinTask static final class AdaptedCallable extends ForkJoinTask implements RunnableFuture [ForkJoin框架之ForkJoinTask] |
| RecursiveAction | 1.7 | protected abstract void compute(); | ForkJoinTask | getResult结果为null | |
| RecursiveTask | 1.7 | V result; | protected abstract V compute(); | ForkJoinTask | |
| CountedCompleter | 1.8 | /** This task’s completer, or null if none / final CountedCompleter<?> completer; /* The number of pending tasks until completion */ volatile int pending; | public abstract void compute(); public void onCompletion(CountedCompleter<> caller) public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<> caller) public final CountedCompleter<> getRoot() public final void tryComplete() public final void propagateCompletion() public void complete(T rawResult) public final CountedCompleter<> firstComplete() public final CountedCompleter<?> nextComplete() public final void quietlyCompleteRoot() public final void helpComplete(int maxTasks) | ForkJoinTask | ForkJoin框架之CountedCompleter,工作线程及并行流 CountedCompleter使用普通树的结构存放动作,但是它又是另类的树,因为子节点能找到父节点,父节点却找不到子节点,而只知道子节点代表的动作未执行的数量,因此或许从访问方式的角度来看还是用栈来理解更好.在这里树既是数据结构,也是一个另类的操作栈 |
| CompletionStage | 1.8 | public CompletionStage thenAcceptAsync(Consumer<? super T> action, Executor executor); public CompletionStage thenRunAsync(Runnable action, Executor executor); public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor); public CompletionStage runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor); public CompletionStage runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor); public CompletionStage thenApplyAsync (Function<? super T,? extends U> fn, Executor executor); public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor); public CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor); public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor); public <U,V> CompletionStage thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor); public CompletionStage whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); public CompletionStage exceptionally(Function<Throwable, ? extends T> fn); public CompletableFuture toCompletableFuture(); | interface | ||
| CompletableFuture | 1.8 | // Either the result or boxed AltResult volatile Object result; // Top of Treiber stack of dependent actions volatile Completion stack; private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1); private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); | final boolean internalComplete(Object r) final boolean tryPushStack(Completion c) final Object encodeXXX final boolean completeXXX private static T reportGet(Object r) throws InterruptedException, ExecutionException private static T reportJoin(Object r) static Executor screenExecutor(Executor e) final void postComplete() final void cleanStack() final void push(UniCompletion<?,?> c) final CompletableFuture postFire(CompletableFuture<?> a, int mode) public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) public static CompletableFuture runAsync(Runnable runnable, Executor executor) public static CompletableFuture completedFuture(U value) public static CompletableFuture allOf(CompletableFuture<?>… cfs) public static CompletableFuture anyOf(CompletableFuture<?>… cfs) | Future CompletionStage | ForkJoin框架之CompletableFuture |

| 类/接口 | 版本 | 主要字段 | 主要方法 | 父类 | 备注 |
|---|---|---|---|---|---|
| Executor | 1.5 | void execute(Runnable command); | interface | ||
| ExecutorService | 1.5 | void shutdown(); List shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; Future submit(Callable task); Future submit(Runnable task, T result); Future<?> submit(Runnable task); List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException; List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException; T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException; T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; | Executor | interface | |
| AbstractExecutorService | 1.5 | protected RunnableFuture newTaskFor(Runnable runnable, T value) protected RunnableFuture newTaskFor(Callable callable) private T doInvokeAny(Collection<? extends Callable> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException | ExecutorService | abstract class | |
| ThreadPoolExecutor | 1.5 | /** runState is stored in the high-order bits / private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); /* The queue used for holding tasks and handing off to worker threads. */ private final BlockingQueue workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet workers = new HashSet(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; | private void advanceRunState(int targetState) final void tryTerminate() private void checkShutdownAccess() private void interruptWorkers() private void interruptIdleWorkers(boolean onlyOne) private void interruptIdleWorkers() final void reject(Runnable command) void onShutdown() final boolean isRunningOrShutdown(boolean shutdownOK) private List drainQueue() private boolean addWorker(Runnable firstTask, boolean core) private void addWorkerFailed(Worker w) private void processWorkerExit(Worker w, boolean completedAbruptly) private Runnable getTask() final void runWorker(Worker w) public void execute(Runnable command) public void shutdown() public List shutdownNow() public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException protected void finalize() public boolean prestartCoreThread() void ensurePrestart() public int prestartAllCoreThreads() public boolean remove(Runnable task) public void purge() protected void beforeExecute(Thread t, Runnable r) protected void afterExecute(Runnable r, Throwable t) protected void terminated() | AbstractExecutorService | ThreadPoolExecutor源码解析 核心线程数和最大线程数与阻塞队列相关联。 根据提供的getter/setter方法,可以动态调整线程池 内置4个拒绝策略,可实现RejectedExecutionHandler接口自定义拒绝策略 线程池大小可根据CPU密集/IO密集、任务平均执行时长决定。最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目 |
| ScheduledExecutorService | 1.5 | public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); /** initialDelay -> initialDelay+period -> initialDelay + 2 * period */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** initialDelay -> over+period*/ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); | ExecutorService | interface | |
| ScheduledThreadPoolExecutor | 1.5 | /** False if should cancel/suppress periodic tasks on shutdown. / private volatile boolean continueExistingPeriodicTasksAfterShutdown; /* False if should cancel non-periodic tasks on shutdown. / private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /* True if ScheduledFutureTask.cancel should remove from queue / private volatile boolean removeOnCancel = false; /* Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied entries. */ private static final AtomicLong sequencer = new AtomicLong(); | boolean canRunInCurrentRunState(boolean periodic) private void delayedExecute(RunnableScheduledFuture<?> task) void reExecutePeriodic(RunnableScheduledFuture<?> task) protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) private long triggerTime(long delay, TimeUnit unit) long triggerTime(long delay) private long overflowFree(long delay) | ThreadPoolExecutor ScheduledExecutorService | 将任务封装成ScheduledFutureTask对象,ScheduledFutureTask基于相对时间,不受系统时间的改变所影响 内部定义了一个DelayedWorkQueue,它是一个有序队列,会通过每个任务按照距离下次执行时间间隔的大小来排序 https://www.jianshu.com/p/925dba9f5969 |
| ForkJoinPool | 1.7 | public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; static final ForkJoinPool common; static final int commonParallelism; private static int commonMaxSpares; private static int poolNumberSequence; // main pool control volatile long ctl; // lockable status volatile int runState; // parallelism, mode final int config; // to generate worker index int indexSeed; // main registry volatile WorkQueue[] workQueues; final ForkJoinWorkerThreadFactory factory; // per-worker UEH final UncaughtExceptionHandler ueh; // to create worker name string final String workerNamePrefix; // also used as sync monitor volatile AtomicLong stealCounter; | private int lockRunState() private int awaitRunStateLock() private void unlockRunState(int oldRunState, int newRunState) private boolean createWorker() private void tryAddWorker(long c) final WorkQueue registerWorker(ForkJoinWorkerThread wt) final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) final void signalWork(WorkQueue[] ws, WorkQueue q) private boolean tryRelease(long c, WorkQueue v, long inc) final void runWorker(WorkQueue w) private ForkJoinTask<?> scan(WorkQueue w, int r) private boolean awaitWork(WorkQueue w, int r) final int helpComplete(WorkQueue w, CountedCompleter<?> task, int maxTasks) private void helpStealer(WorkQueue w, ForkJoinTask<?> task) private boolean tryCompensate(WorkQueue w) final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) private WorkQueue findNonEmptyStealQueue() final void helpQuiescePool(WorkQueue w) final ForkJoinTask<?> nextTaskFor(WorkQueue w) static int getSurplusQueuedTaskCount() private boolean tryTerminate(boolean now, boolean enable) private void externalSubmit(ForkJoinTask<?> task) final void externalPush(ForkJoinTask<?> task) static WorkQueue commonSubmitterQueue() final boolean tryExternalUnpush(ForkJoinTask<?> task) final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) public T invoke(ForkJoinTask task) public void execute(ForkJoinTask<?> task) public long getStealCount() public long getQueuedTaskCount() | AbstractExecutorService | ForkJoin框架之ForkJoinPool 不同于其他线程池,它的构建不需要提供核心线程数,最大线程数,阻塞队列等,还增加了未捕获异常处理器,而该处理器会交给工作线程,由该线程处理,这样的好处在于当一个线程的工作队列上的某个任务出现异常时,不至于结束掉线程,而是让它继续运行队列上的其他任务。 依托于并行度(或默认根据核数计算)来决定最大线程数,它内部维护了WorkQueue数组ws取代了阻塞队列,ws中下标为奇数的为工作线程的所属队列,偶数的为共享队列,虽然名称有所区分,但重要的区别只有一点:共享队列不存在工作线程。 维护了一个ctl控制信号,前16位表示活跃worker数,33至48位表示worker总数,后32位可以粗略理解用于表示worker等待队列的栈顶 对全局全状的修改需要加锁进行,这些操作如修改ctl(改变栈顶,增删活跃数或总数等),处理ws中的元素,扩容ws,关闭线程池,初始化(包含ws的初始化),注册线程入池等。而这个锁就是runState,它除了当锁,也间接表示了运行状态,相应的线程池的SHUTDOWN,STOP,TERMINATED等状态均与其相应的位有关 关于工作窃取,线程池外的提交者在join一个任务或get结果时,如果发现没有完成,它不会干等着工作线程,而是尝试自行执行,当执行方法结束,任务还没有完成的情况,它可以帮助工作线程做一些其他工作 |
| CompletionService | Future submit(Callable task); Future submit(Runnable task, V result); Future take() throws InterruptedException; Future poll(); Future poll(long timeout, TimeUnit unit) throws InterruptedException; | 一种将新异步任务的生产与已完成任务的结果消耗相分离的服务。 生产者submit执行任务。 消费者take完成任务并按照完成的顺序处理其结果。 | |||
| ExecutorCompletionService | private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future> completionQueue; |
| 方法名 | 入参 | 实际调用 |
|---|---|---|
| public static ExecutorService newFixedThreadPool | int nThreads | ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) |
| public static ExecutorService newFixedThreadPool | int nThreads, ThreadFactory threadFactory | ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory) |
| public static ExecutorService newSingleThreadExecutor | FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())) | |
| public static ExecutorService newSingleThreadExecutor | ThreadFactory threadFactory | FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)) |
| public static ExecutorService newCachedThreadPool | ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()) | |
| public static ExecutorService newCachedThreadPool | ThreadFactory threadFactory | ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory) |
| public static ExecutorService unconfigurableExecutorService | ExecutorService executor | DelegatedExecutorService(executor) |
| public static ScheduledExecutorService newSingleThreadScheduledExecutor | DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)) | |
| public static ScheduledExecutorService newSingleThreadScheduledExecutor | ThreadFactory threadFactory | DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory)) |
| public static ScheduledExecutorService newScheduledThreadPool | int corePoolSize | ScheduledThreadPoolExecutor(corePoolSize) |
| public static ScheduledExecutorService newScheduledThreadPool | int corePoolSize, ThreadFactory threadFactory | ScheduledThreadPoolExecutor(corePoolSize, threadFactory) |
| public static ScheduledExecutorService unconfigurableScheduledExecutorService | ScheduledExecutorService executor | DelegatedScheduledExecutorService(executor) |
| public static ExecutorService newWorkStealingPool | int parallelism | ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true) |
| public static ExecutorService newWorkStealingPool | ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory,vnull, true) | |
| public static ThreadFactory defaultThreadFactory | DefaultThreadFactory() | |
| public static ThreadFactory privilegedThreadFactory | PrivilegedThreadFactory() | |
| public static Callable callable | Runnable task, T result | RunnableAdapter(task, result) |
| public static Callable callable | Runnable task | RunnableAdapter(task, null) |
| public static Callable callable | final PrivilegedAction<?> action | Callable() { public Object call() { return action.run(); }} |
| public static Callable callable | final PrivilegedExceptionAction<?> action | Callable() { public Object call() throws Exception { return action.run(); }} |
| public static Callable privilegedCallable | Callable callable | PrivilegedCallable(callable) |
| public static Callable privilegedCallableUsingCurrentClassLoader | Callable callable | PrivilegedCallableUsingCurrentClassLoader(callable) |