我在上一篇文章Java - HashMap原理分析中讲解了HashMap中主要的几个知识点。但是我们知道在高并发的情况下,HashMap并不是线性安全的。因此也就产生了与之对应的线性安全的Map,也就是ConcurrentHashMap。
我之前讲过了HashMap的一个结构,ConcurrentHashMap 的架构和它一样,也是数组+链表+红黑树的结构。
HashMap 中:哈希桶中的对象是一个由Node节点组成的链表。或者是TreeNode节点组成的红黑树。ConcurrentHashMap 中:在Node类型的节点基础上做了扩展:ForwardingNode 和 ReservationNode。因为ConcurrentHashMap仅仅通过加锁的方式是无法实现线性安全的。因此也衍生出不同的节点,其作用如下:
ForwardingNode:用于解决当进行扩容的时候,进行查询的问题。ReservationNode:用于解决当进行计算时,计算的对象为空的问题。那么ConcurrentHashMap如何保证高并发的情况下的扩容和元素的插入呢?我们先从put函数开始看。
// 代表是ForwardingNode节点
static final int MOVED = -1;
// 红黑树节点
static final int TREEBIN = -2;
// ReservationNode节点
static final int RESERVED = -3;
// HASH_BITS为int最大值,最高位为0
static final int HASH_BITS = 0x7fffffff;
// 代表容器在初始化或者扩容时候的锁,必须拿到这个锁,才能够进行相关的操作。
// 如果是负数,代表现在容器在初始化或者扩容阶段。 默认是0
private transient volatile int sizeCtl;
再来看下Node节点,这里和HashMap中的Node节点有几个不同:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
我们可以发现,关于val还有next指针,都是经过volatile修饰的,说明线程间可见。
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 首先,ConcurrentHashMap不允许存储null,无论是key还是value
if (key == null || value == null) throw new NullPointerException();
// 获取当前key对应的哈希值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果哈希槽数组为null或者长度为0,那么进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 否则,根据哈希值计算出对应的哈希槽下标,如果这个哈希槽是null,那么就直接添加一个节点。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// HashMap中是直接赋值,而ConcurrentHashMap中则通过CAS来进行值的替换
// 替换成功则跳出循环,否则进行下一次循环
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果遇到了ForwardingNode节点(代表此时正在进行扩容操作),那么帮助程序进行扩容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 否则正常的情况下,即槽位非空,且不在扩容状态下
else {
V oldVal = null;
// 那么对整个槽进行加锁
synchronized (f) {
// 在判断一次,避免槽中的数据发生变化
if (tabAt(tab, i) == f) {
// 如果是链表
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现某一个节点key重复,那么覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 若不存在相同的key,则链表的末尾加上新节点
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树,添加一个树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 添加元素完毕后
if (binCount != 0) {
// 如果槽中的元素数量 >= 8 ,那么转化为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 添加一个成员个数,同时检查扩容操作
addCount(1L, binCount);
return null;
}
从源码上来看,我们能够发现,ConcurrentHashMap 和 HashMap 的元素插入操作流程是非常相似的。最大的几个不同点在于:
ConcurrentHashMap 是通过 CAS操作来完成的。HashMap则是直接赋值。对比图如下:
ConcurrentHashMap 有通过 synchronized 将整个Node节点给加锁(也就是所谓的分段锁)。HashMap则没有。
ConcurrentHashMap 中还有 tab = helpTransfer(tab, f);这个操作。那么接下来我们就来对几个细节做讲解。
我们看下代码:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果 sizeCtl 为-1,代表此时容器处于初始化或者扩容阶段。
if ((sc = sizeCtl) < 0)
// 此时放弃CPU的使用权,让出时间片,线程进入就绪状态参与锁的竞争
Thread.yield();
// 通过原子操作来修改sizeCtl的值,即只允许某一个线程能够对该容器进行初始化,因为sizeCtl的默认值是0.此时需要将其改为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 双重校验,避免容器被多次
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化哈希槽数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算出下次扩容的阈值,相当于n - 0.25n = 0.75。和HashMap一样的阈值
sc = n - (n >>> 2);
}
} finally {
// 扩容完毕,将下一次扩容的阈值进行赋值。
sizeCtl = sc;
}
break;
}
}
return tab;
}
总结下在高并发的情况下,是如何避免重复初始化的:
sizeCtl这个变量。它是被volatile修饰的,因此能被多线程看到。防止容器被多次初始化或者扩容。sizeCtl的值是负数,代表容器处于初始化或者扩容阶段。那么此时走到这里的线程都会让出CPU,进入就绪状态。CAS操作,改变sizeCtl的值,默认是0,将其改为-1,那么其他线程看到的时候,就是第二步。扩容完毕,sizeCtl值成正数(当前容器的一个扩容阈值大小)。讲完这里,按照代码的顺序来看,就是要讲helpTransfer这个函数了。但是,根据我在代码中写的注释,只有识别到当前的哈希槽为ForwardingNode的时候,才会进行协助扩容。即:
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
那么根据在哪里?我们就应该先看看扩容操作:扩容和ForwardingNode有什么关联?
我们来回顾下HashMap中,在添加完元素之后的操作:

ConcurrentHashMap 中:却只有短短的addCount()函数。

addCount其实整合了元素的统计和扩容操作,再讲源码之前,我想先说两个属性:
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
baseCount:ConcurrentHashMap中元素个数,但返回的不一定是当前容器的真实元素个数。基于CAS无锁更新。CounterCell数组:一个计数盒子。一般在存在锁竞争的情况下,用它来统计元素个数。我们来看下它的源码:
// 外界传入的参数是 1 和 binCount。 binCount指的是哈希槽中的元素个数。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// (as = counterCells) != null 表示cells数组已经初始化,当前线程使用hash去寻址找到合适的cell,去累加数据
// 或者 如果修改basecount失败了,即与其它线程发生了冲突,当前线程尝试去创建cells数组
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// true代表没有竞争
boolean uncontended = true;
// 三种情况满足任意一种:
// 1. cells数组未初始化,但是写basecount失败
// 2. cells已经初始化了,但是哈希寻址的cell为null
// 3.当前cell单元格不为null,去进行CAS累加操作,但是竞争失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 创建cells,进行数量的统计
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// ...扩容部分
}
计数部分:
baseCount或者CounterCell来计算得出的。baseCount就可以解决。但是如果竞争激烈的话,比如10个线程去put数据,但是在计数的时候,由于只允许一个线程CAS操作成功,就会导致其他的9个线程自旋等待机会。CPU空转。baseCount进行拆分,拆成多个CounterCell,让每个线程绑定一个计数盒子,进行自身的元素统计。就可以进行分散的统计。如图:

我们继续看addCount这个函数的第二部分:
// 外界传入的参数是 1 和 binCount。 binCount指的是哈希槽中的元素个数。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// ...计数部分
// 扩容部分
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 如果sizeCtl小于0,即代表当前哈希槽处于扩容阶段。那么需要协助Map扩容
if (sc < 0) {
// 判断当前线程是否可以协助扩容,不满足则直接跳出
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 若满足,再通过CAS进行竞争。协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
扩容部分:
sizeCtl),并且数组长度小于最大限制。sizeCtl是否小于0。break。sizeCtl + 1,表示多一个线程帮助容器扩容。校验逻辑:
(sc >>> RESIZE_STAMP_SHIFT) != rs:sc的低16 位不等于标识符,代表sizeCtl发生了变化。sc == rs + 1:代表扩容结束。因为第一个线程进行扩容的时候,sc ==rs 左移 16 位 + 2。在它结束扩容之后,sc会减1,那么此时sc = rs +1。sc == rs + MAX_RESIZERS:代表帮助线程数已经达到了上限。(nt = nextTable) == null:扩容结束。transferIndex <= 0:转移状态发生了变化。 transferIndex 相当于扩容过程中的一个任务执行索引,代表扩容操作执行到哪里了。 这里也是扩容结束的意思。如果校验通过,才会进行扩容操作,我们看下核心的扩容函数transfer:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算当前线程在当下扩容过程中,分配到的槽位个数。即步长,一般是16
//目的是为了让每个CPU处理的哈希桶个数一样多,避免出现转移任务不均匀的现象。默认一个线程处理16个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 如果新容器是空,那么创建一个新的容器
if (nextTab == null) {
try {
// 扩容两倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
// 扩容失败
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 设置数据转移的开始下标,因为是从后往前进行处理的,因此这里的开始下标就是老表的长度
transferIndex = n;
}
// 新数组长度
int nextn = nextTab.length;
// 创建一个ForwardingNode节点,如果别的线程发现这个槽位是ForwardingNode类型的,就会跳过这个节点,该对象在后文会被用到
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 一个初始值,如果为true,当前线程就会不断的推进任务的下标,即做扩容操作
// 因为这个时候可能只有当前线程在做扩容操作,那么就需要进行单干,不断地截取固定步长的槽位,进行数据迁移
boolean advance = true;
boolean finishing = false; // 完成状态,如果为true,代表当前任务完成
// 遍历每个哈希槽,从后往前遍历,bound指的是当前线程负责迁移的哈希桶的最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 即当前线程负责的哈希桶是否都处理完毕,如果完毕了,跳出循环
while (advance) {
int nextIndex, nextBound;
// --i表示下一个待处理的bucket,如果它不在自己负责的段内,或者是自己的扩容任务完成了,那么下次循环的时候,就结束操作。
if (--i >= bound || finishing)
advance = false;
// 槽位被其它线程选择完了,transferIndex这个下标之后的哈希桶都是已经被处理完毕的。
// 新的线程进来的时候,需要截取transferIndex之前的哈希桶来重新分配数据。
else if ((nextIndex = transferIndex) <= 0) {
// 赋值i = -1 代表 旧表中的桶都已经转移完毕
i = -1;
advance = false;
}
// 尝试获取槽位的操作权,CAS操作
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// CAS成功后,更新当前线程负责的哈希桶的边界下标索引。
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 通过遍历,设置结束条件
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果扩容完毕
if (finishing) {
nextTable = null;// 清空扩容的过程中创建出来的临时表。
table = nextTab;// 将当前表指向临时表。
sizeCtl = (n << 1) - (n >>> 1);//
return;
}
// 使用CAS操作,对 sizeCtl 的低16位进行减 1,代表当前线程做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 说明此时没有线程在扩容操作了,扩容结束。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果当前位置没有任何节点,那么放入之前初始化的ForwardingNode节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 表示该位置已经完成了迁移,ForwardingNode类型节点的hash值就是MOVED
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
// 扩容操作,元素的重新分配
}
}
}
}
从上述代码,我们可以得知这么几个重点:
1.首先,当前线程在处理哈希槽的时候,如果发现当前的哈希槽没有被处理,那么就会塞一个ForwardingNode对象。我们看下它的构造函数:
ForwardingNode(Node<K,V>[] tab) {
// MOVED 的赋值对象就是hash,就是-1
super(MOVED, null, null, null);
this.nextTable = tab;
}
那么其他线程在遍历的时候,遍历到对应槽的时候,就通过以下代码判断当前槽已经被处理过了:
else if ((fh = f.hash) == MOVED)
第二点:每个线程,在分段处理哈希槽的时候,默认的步长是16(如果计算出的步长小于最小值,那么就用最小值作为步长):
private static final int MIN_TRANSFER_STRIDE = 16;
整个transfer步骤可以分为几个大步骤:
while (advance)循环。我们再看下最后一个步骤:对哈希槽中元素的处理:这里只说链表元素的处理。
// 给当前的哈希桶加锁
synchronized (f) {
// 双重检索下。因为当前哈希桶f,在未加锁之前,额能有别的线程调用了put函数,更新了桶内的数据
// 若不做这层校验,可能导致新加入到桶的元素没有被处理而丢失。
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 如果哈希桶对应的结构是链表
if (fh >= 0) {
// 由于我们put一个元素的时候,哈希桶下标的计算是hash&(n-1),同时表容量是2的幂。因此这里计算出的结果即:
// hash&n的结果只有两个。要么是n要么是0。
int runBit = fh & n;
Node<K,V> lastRun = f;
// 老链表由于需要被拆分到新链表中。即rehash。这里的目的是为了计算最长的不需要改变next指针的最长链。
// 遍历出的最长链就可以直接放到新表中,
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 逐个遍历节点
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 0表示仍然放到下标为i的桶内的链表
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
// 否则就是放到下标为i+n的桶内的链表
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);// 设置新表的下标为i的桶内数据
setTabAt(nextTab, i + n, hn);// 设置新表的下标为i+n的桶内数据
setTabAt(tab, i, fwd);// 将老表下标为i的桶内放上ForwardingNode对象,用来标识当前处于扩容过程
advance = true;
}
else if (f instanceof TreeBin) {
// 如果是红黑树的处理
}
}
}
我们来看下相关的源码:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 如果相关数组没有被并发修改,并且当前容器处于扩容阶段。
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 如果扩容结束了,或者超过了最大帮助线程数数量,跳出循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 否则将sizeCtl+1,代表多一个线程帮助扩容。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
废话不多说,上代码:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 拿到对应的哈希值
int h = spread(key.hashCode());
// 只有哈希槽数组不是null的时候,才可以获取。不然都没数据,肯定返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果哈希槽上的Node头节点就是要找的,那么直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 否则,此时有两种情况,1:该节点是TreeBin节点,hash值固定为-2.
// 2.此时容器处于扩容阶段,同时该Hash槽已经迁移完毕,并且对应类型是ForwardingNode,对应hash值为-1
// Node和TreeBin都有对应的find函数。具体函数具体分析。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 否则,对应槽上的是普通的链表结构,通过while循环遍历,寻找该节点
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
本质:取模运算在 n 是2的次幂时 &(n-1) 等价于 % n。
Map扩容的时候,我们知道,扩容的大小是原本的2倍。而每个哈希槽中的数据要重新计算它的hash值。因为数组长度发生了改变。hash & (n-1) 的方式来获取哈希桶的下标的。我们举个例子,一个元素的哈希值是20(十进制)。当前Map数组长度是16,扩容后32。那么两次的哈希结果:
20&15=4。20&31=20。两次结果相差16,正好是扩容之前的数组长度。那么我们如果将链表分为高低位,我们只需要将高位的节点,将其哈希值加上数组长度,就能定位到新Map中哈希槽数组的下标了。无需重复计算hash值。
我们再来看下代码中的片段:高位的链表,直接插到 i+n 的位置。

变量sizeCtl几乎出现在源码的各个地方,而其值又运用于多种不同的运用场景:
sizeCtl = 0:表示没有指定初始容量。sizeCtl > 0:表示初始容量(可以使用阶段)。sizeCtl = -1:标记作用,告知其他线程,正在初始化或者扩容。(不能使用阶段)sizeCtl = 0.75n :初始化完毕。sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容。通过感知节点ForwardingNode:
ForwardingNode节点。ForwardingNode节点的hash值是-1.put操作的时候,会看当前节点的hash值,如果对应值是-1,就代表遇到了ForwardingNode节点。代表容器目前处于扩容或者初始化阶段。那么帮助程序进行扩容。transferIndex来控制每个线程的一个执行区间。transferIndex 由 volatile修饰,因此线程之间可见。CAS操作,获得新数组中,自己负责的那一块区间。一般默认的区间步长是16。从数组的末尾向前处理。transferIndex代表已经分配过的区域下标。在这个值之后的所有哈希槽都代表:已经被对应的线程处理过。 同时对应的哈希槽的节点为ForwardingNode类型。
同时,在ConcurrentHashMap中可以这么总结:
CAS原子操作来完成。advance:推进标记,如果为true,那么会不断的给当前线程分配任务。同时控制着整个容器的扩容进度。finishing:扩容完成标记。(指当前线程分配到的扩容任务)。transferIndex:转换的索引,标识着扩容到哪一个哈希槽了,值从数组长度-1到0逐渐变小。sizeCtl:控制着ConcurrentHashMap中table数组的初始化和扩容操作。因为put操作,有可能是值的覆盖或者是插入操作,如果是覆盖操作。此时容器中的元素总数并没有改变。

因为Node节点中的val变量通过volatile修饰。所以在多线程的环境下,即便value的值被修改了,在线程之间也是可见的。
我们知道,以链表为例,在扩容的时候,会将链表分为高低位链表:ln和hn。那么原本的链表能正常访问吗?
回答:可以。
fwd,那么之后的get请求,就会将请求转发到扩容后的数组中。