在 jdk1.7
版本及其以下版本中, ConcurrentHashMap
的数据结构是由 Segments
数组 + HashEntry
数组 + 链表实现的
不同的是, ConcurrentHashMap
中的数组被分为大数组和小数组,大数组是 Segment
,小数组是 HashEntry
, Segment
本身是基于 ReentrantLock
可重入锁来实现加锁和释放锁,这样就能保证多线程同时访问 ConcurrentHashMap
的时候,同一时间只能有一个线程操作对应的节点,以此来保证线程安全。
在 jdk1.8
中,选择了和 HashMap
相同的 Node
数组 + 链表 + 红黑树结构,采用 CAS + Synchronized
来保证并发安全性。抛弃了原有 Segment
分段锁,直接用 table
数组存储键值对;当 HashEntry
对象组成的链表长度超过 TREEIFY_THRESHOLD
时,链表转换为红黑树,提升性能。
jdk1.7
遍历链表 O(n)
,jdk1.8
变成遍历红黑树 O(logN)
。
jdk1.7
是对需要进行数据操作的 Segment
加锁,jdk1.8
调整为对每个数组元素的头节点加锁。
1、 HashTable
实现并发安全的原理是通过 synchronized
关键字,如果我们查看 put()
方法源码,可以看出 put()
方法是被 synchronized
关键字所修饰的,而其他的方法,比如 clear()、get()、size()
等方法也同样是被 synchronized
关键字修饰的。之所以 Hashtable
是线程安全的,是因为几乎每个方法都被 synchronized
关键字所修饰了,这也就保证了线程安全。
2、ConcurrentHashMap
实现并发安全在 jdk1.7
中采用 分段锁的方式,jdk1.8
中直接采用了 CAS + synchronized + Node
节点的方式,这和 Hashtable
的完全利用 synchronized
的方式有很大的不同。
正因为它们在线程安全的实现方式上的不同,导致它们在性能方面也有很大的不同。
1、ConcurrentHashMap:
在 jdk1.7
的时候,ConcurrentHashMap(分段锁)
对整个桶数组进行了分割分段 (Segment)
,容器中有多把锁,每一把锁锁一段数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。 到了 jdk1.8
的时候已经抛弃了 Segment
的概念,而是直接用Node 数组+链表+红黑树
的数据结构来实现,并发控制使用 synchronized
和 CAS
来操作。(jdk1.6
以后 对 synchronized
锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap
,虽然现在 jdk1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。
2、Hashtable:
使用 synchronized
来保证线程安全,效率非常低下。当一个线程访问同步方法时,其他线程也访问同步方法,而其他线程在此期间是不能操作的,如使用 put()
方法添加元素,另一个线程不能使用 put()
添加元素,也不能使用 get()
,竞争会越来越激烈效率越低,不仅如此,还会带来额外的上下文切换等开销,所以此时它的吞吐量甚至还不如单线程的情况。
除了加锁,原理上没有太大区别。
但是 ConcurrentHashMap
集合中的 key
或者 value
插入 null(空)
值时,会报空指针异常,但是单线程操作的 HashMap
允许 key
或者 value
插入null(空)
。
这里我们查看源码:
从源码来看,在 key
或者 value
为 null(空)
时,直接抛出空指针异常。
那么为什么 ConcurrentHashMap
的 key
或者 value
不允许插入 null(空)
,而 HashMap
允许插入?
有人就问过 ConcurrentHashMap
的作者 Doug Lea
,以下是他的回答的邮件内容:
The main reason that nulls aren't allowed in ConcurrentMaps (ConcurrentHashMaps, ConcurrentSkipListMaps) is that ambiguities that may be just barely tolerable in non-concurrent maps can't be accommodated. The main one is that if map.get(key) returns null, you can't detect whether the key explicitly maps to null vs the key isn't mapped. In a non-concurrent map, you can check this via map.contains(key),but in a concurrent one, the map might have changed between calls. Further digressing: I personally think that allowing nulls in Maps (also Sets) is an open invitation for programs to contain errors that remain undetected until they break at just the wrong time. (Whether to allow nulls even in non-concurrent Maps/Sets is one of the few design issues surrounding Collections that Josh Bloch and I have long disagreed about.)
It is very difficult to check for null keys and values in my entire application .
Would it be easier to declare somewhere static final Object NULL = new Object(); and replace all use of nulls in uses of maps with NULL? -Doug
Doug Lea
如此设计最主要的原因是,不容忍在并发场景下出现歧义!
假设 ConcurrentHashMap
允许存储 null(空)
那么,我们取值的时候会出现两种结果:
null(空)
null(空)
,所以返回的结果就是原本的 null(空)
值这种情况下就会出现歧义,那为什么 HashMap
就不怕出现歧义问题呢?
这是因为 HashMap
的设计是给单线程使用的,所以如果查询到了 null(空)
值,我们可以通过 hashMap.containsKey(key)
的方法来区分这个 null(空)
值到底是存入的 null(空)
?还是压根不存在的 null(空)
?这样歧义问题就得到了解决,所以 HashMap
不怕歧义问题。
那么有人就会有疑问了,ConcurrentHashMap
里面也有 containsKey()
,是不是可以区分?
在单线程的情况下这个是没有问题的,但是 ConcurrentHashMap
是针对多线程的,多线程的情况下比较复杂的。我们假设 ConcurrentHashMap
可以存入 null(空)
值,有这样一个场景,现在有一个线程 A
调用了 concurrentHashMap.containsKey(key)
,我们期望返回的结果是 false
,但在我们调用 concurrentHashMap.containsKey(key)
之后,未返回结果之前,线程 B
又调用了 concurrentHashMap.put(key,null)
存入了 null(空) 值
,那么线程 A
最终返回的结果就是 true
了,这个结果和我们之前预想的 false
完全不一样。
也就是说,多线程的状况非常复杂,我们没办法判断某一个时刻返回的 null(空)
值,到底是值为 null(空)
,还是压根就不存在,也就是歧义问题不可被证伪,所以 ConcurrentHashMap
才会在源码中这样设计,直接杜绝 key
或 value
为 null(空)
的歧义问题。
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 都不允许为空
if (key == null || value == null) throw new NullPointerException();
//根据 key 计算出 hashCode
int hash = spread(key.hashCode());
// 用来计算在这个节点总共有多少个元素,用来控制扩容或者转换为树
int binCount = 0;
// 数组的遍历,自旋插入节点,直到成功
for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
ConcurrentHashMap.Node<K,V> f; int n, i, fh;
//当 Node 数组为 null 或 长度为0,则进行初始化
if (tab == null || (n = tab.length) == 0)
// 如何保证初始化数组是线程安全的?下面讲
tab = initTable();
// Unsafe 类 volatile 的方式取出 hashCode 散列后通过与运算得出的 Node[] 数组下标值对应的 Node 对象
// 此时 Node 位置若为 null,则表示还没有线程在此 Node 位置进行插入操作,说明本次操作是第一次
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果这个位置没有元素的话,则通过 CAS 的方式插入数据,CAS 失败则继续下一次循环
if (casTabAt(tab, i, null,
new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
//插入成功,退出循环
break; // no lock when adding to empty bin
}
// 如果执行到这一步,说明该位置已经存在值
// 上面的判断中,把这个位置的值赋给了 f ,所以 f.hash 也就是该位置元素的 hash 值
// 现在这个判断也将 f.hash 赋给了 fh 变量
// 如果当前节点的 hash 值为 MOVED,即有其他线程正在扩容,当前线程会帮助它扩容
else if ((fh = f.hash) == MOVED)
// 帮助扩容
tab = helpTransfer(tab, f);
// 执行到这里的时候,说明当前 ConcurrentHashMap 没有在扩容,并且要添加的位置已经有元素了,后面的插入很有可能会发生 Hash 冲突
//但是并不知道是链表结构还是红黑树结构,所以要执行下面的操作
else {
//如果都不满足,则利用 synchronized 锁写入数据
V oldVal = null;
// 对数组该节点位置加锁(而不是对整个数组加锁,开始处理数组该位置的迁移工作),确保线程安全的将节点插入到链表或红黑树中
synchronized (f) {
// 确认在加锁过程中没有其他线程更改了节点
if (tabAt(tab, i) == f) {
// 当前节点的 hash 值大于 0,表示是链表上的节点
if (fh >= 0) {
// 用来记录链表长度,来决定是否转化为红黑树
binCount = 1;
// 遍历链表,将节点插入到链表中
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果当前元素的 hash 值和 key 跟要存储的位置的节点的相同的时候,替换掉该节点的 value 即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 如果 onlyIfAbsent 为 false 对 value 值进行替换
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最尾端,将新值插入到链表的最尾端
Node<K,V> pred = e;
// 如果不是同样的 hash,同样的 key 的时候,则判断该节点的下一个节点是否为空
if ((e = e.next) == null) {
//尾插法插入新节点
pred.next = new Node<K,V>(hash, key,
value, null);
// 插入完成,终止循环
break;
}
}
}
// 如果 f 的类型是TreeBin(红黑树节点),TreeBin 是 ConcurrentHashMap 的一个内部类,包装了ThreeNode,将节点插入到红黑树中,如果找到相同 key 的节点,则根据 onlyIfAbsent 判断是否替换 value 值
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用 putTreeVal 方法,将该元素添加到树中去
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 链表转红黑树的操作
if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也都是 8
if (binCount >= TREEIFY_THRESHOLD)
// 链表 -> 红黑树 转换
treeifyBin(tab, i);
// 如果存在旧的 value 值,则将旧的 value 值返回
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加元素的个数,也就是 size,并且在这个方法中判断是否需要扩容
addCount(1L, binCount);
return null;
}
值得关注的是 tabAt(tab, i)
方法,其使用 Unsafe
类 volatile
的操作 volatile
方式地查看值,保证每次获取到的值都是最新的。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
虽然上面的 table
变量加了 volatile
,但也只能保证其引用的可见性,并不能确保其数组中的对象是否是最新的,所以需要 Unsafe
类 volatile
拿到最新的 Node
。
在 jdk1.8
中,初始化 ConcurrentHashMap
的时候这个 Node[]
数组是还未初始化的,会等到第一次 put
方法调用时才初始化,那么问题来了,此时是会有并发问题的,如果多个线程同时调用 initTable
初始化 Node
数组怎么办?看看 Doug Lea
大师是如何处理的:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 每次循环都获取最新的 Node 数组引用
while ((tab = table) == null || tab.length == 0) {
// sizeCtl 是一个标记位,若为 -1 也就是小于 0,代表有线程在进行初始化工作了
if ((sc = sizeCtl) < 0)
// 让出 CPU 时间片
Thread.yield(); // lost initialization race; just spin
// CAS 操作,将本实例的 sizeCtl 变量设置为 -1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 如果 CAS 操作成功了,代表本线程将负责初始化工作
try {
// 再检查一遍数组是否为空
if ((tab = table) == null || tab.length == 0) {
// 在初始化 Map 时,sizeCtl 代表数组大小,默认 16
// 所以此时 n 默认为 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// Node 数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将其赋值给 table 变量
table = tab = nt;
// 通过位运算,n 减去 n 二进制右移 2 位,相当于乘以 0.75
// 例如 16 经过运算为 12,与 乘0.75 一样,只不过位运算更快
sc = n - (n >>> 2);
}
} finally {
// 将计算后的sc(12)直接赋值给 sizeCtl,表示达到 12 长度就扩容
//由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题
//只需要保证可见性
sizeCtl = sc;
}
break;
}
}
return tab;
}
table
变量使用了 volatile
来保证每次获取到的都是最新写入的值
transient volatile Node<K,V>[] table;
这里总结一下:
就算有多个线程同时进行 put
操作,在初始化 Node[]
数组时,使用了 CAS
操作来决定到底是哪个线程有资格进行初始化,其他线程只能等待。用到的并发技巧如下:
volatile
修饰 sizeCtl
变量:它是一个标记位,用来告诉其他线程这个坑位有没有线程在进行初始化工作,其线程间的可见性由 volatile
保证。CAS
操作:CAS
操作保证了设置 sizeCtl
标记位的原子性,保证了在多线程同时进行初始化 Node[]
数组时,只有一个线程能成功。CAS
(比较并交换):是原子操作的一种。在多线程没有锁的状态下,可以保证多个线程对同一个值的更新。
CAS
可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性,产生的数据不一致问题。该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。
线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。
自旋,字面意思就是自己旋转,也就是循环,一般是用一个无限循环实现。
这样一来,一个无限循环中,执行一个 CAS
操作,当操作成功,返回 true
时,循环结束。当返回 false
时,接着执行循环,继续尝试 CAS
操作,直到返回 true
。
并不是,比如很经典的 ABA
问题,CAS
就无法判断了。
因为 CAS
需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A
,变成了 B
,又变成了 A
,那么使用 CAS
进行检查时会发现它的值没有发生变化,但是实际上却变化了,这就是 ABA
问题。
用版本号去保证就好了,就比如说,我在修改前去查询他原来的值的时候再带一个版本号,每次判断就连值和版本号一起判断,判断成功就给版本号加 1
。
添加时间戳也可以,查询的时候把时间戳一起查出来,对的上才修改并且更新值的时候一起修改更新时间,这样也能保证,方法很多但是跟版本号都是异曲同工之妙,看场景大家想怎么设计吧。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算 hash 值
int h = spread(key.hashCode());
//首先通过计算 key 的 hash 值来确定该元素放在table[i]数组的哪个位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//判断头部数据的 hash 和 key 和传入的数据是否一致
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
//一致直接返回该值
return e.val;
}
//如果 eh 小于 0,可能是正在扩容,或者是红黑树,执行相应的查找方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//如果是链表,遍历链表,找到 key 对应的 value 值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//如果不存在的话返回null
return null;
}
进行 get
操作大致可以分为以下步骤:
hash
值key
的 hash
值来确定该元素放在 table[i]
数组的哪个位置hash
和 key
和传入的数据是否一致,一致直接返回该值eh
小于 0
,可能是正在扩容,或者是红黑树,执行相应的查找方法 key
对应的 value
值null
get 操作可以无锁是由于 Node 的元素 val 和指针 next 是用 volatile 修饰的,volatile 关键字来保证可见性、有序性。但不保证原子性。
从源码中我们看到,Node
节点里面的 val
节点值,和下一个节点 next
都是加了 volatitle
关键字的,也就是说,我们在修改节点值的时候和插入节点值的时候都是线程之间可见的,这样我们在使用 get
方法的时候 就能在不加任何锁的情况下得到最新的值,也就完成了多线程下的同步,保证了多线程下的安全。
而数组用 volatile
修饰主要是保证在数组扩容的时候保证可见性。
在每次 put
值之后,有可能触发扩容动作,其中包括两种情况:
put
新的元素,链表的长度达到了阈值 8
,会调用 treeifyBin(tab, i)
方法将链表转为红黑树,不过在 treeifyBin(tab, i)
方法中会先对数组长度进行判断,如果数组长度小于 64
,则会调用 tryPresize(n << 1)
方法对数组进行扩容,进而通过 transfer()
方法重新调整节点位置。 private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY = 64,若数组长度小于 64,则先扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//数组扩容两倍,n << 1 相当于 2n
tryPresize(n << 1);
//如果数组长度大于等于 64,则进行红黑树的操作
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
}
}
}
}
}
put
元素之后,会调用 addCount()
方法将元素个数 +1
,并检查是否需要进行扩容,当数组元素个数达到扩容阈值 sizeCtl
时,会触发 transfer()
方法,对数组进行两倍扩容,进而重新调整节点的位置。 /*
* 当数组中的容量大于扩容阈值时:sizeCtl,将进行扩容操作
* */
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 统计元素个数的操作
.........
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果元素个数大于等于了阈值或 -1 就自旋扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// resizeStamp 这个方法返回一个数字,比如 n=16,rs 则 = 32795,rs 后面会被用来将 sizeCtl 设置为负数
int rs = resizeStamp(n);
// 如果 sc 小于 0,表示已经有其他线程在进行扩容了
if (sc < 0) {
// 如果全部元素已经转移完了,或者已经达到了最大并发扩容数限制则 breack
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果没有,则 sizeCt l加 1,然后进行转移元素
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//如果没有在扩容,但是需要扩容。那么就将 sizeCtl 更新,赋值为标识符左移 16 位,一个负数,然后加 2, 表示已经有一个线程开始扩容了。然后进行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 更新 sizeCtl 为负数后,开始扩容
transfer(tab, null);
// 更新 count,看看是否还需要扩容
s = sumCount();
}
}
}
jdk1.8
版本的 ConcurrentHashMap
支持并发扩容,当第一个线程开始对 ConCurrentHashMap
进行扩容时,直接进入到 transfer(tab, null)
方法进行扩容,后面的线程全部进入到 transfer(tab, nextTab)
方法加入扩容线程大军,协助其他线程进行扩容操作。
那么到底是怎么样扩容的呢,下面我们根据源码来理解多线程是怎么实现并发扩容的:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// stride 为每个 cpu 所需要处理的桶个数
int n = tab.length, stride;
// 如果是多 cpu,那么每个线程划分任务,最小任务量是 16 个桶位的迁移
// 这里让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 本线程分到的迁移量,假设为16(默认也为16)
stride = MIN_TRANSFER_STRIDE;
//如果新的 table 没有初始化,那么就初始化一个大小为原来 2 倍的数组为新数组,被 nextTab 引用
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
// 创建一个两倍长度的新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
//nextTab 引用新数组
nextTab = nt;
} catch (Throwable ex) {
//如果扩容出现错误,则 sizeCtl 赋值为 int 最大值返回
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//记录线程开始迁移的桶位,从后往前迁移。
transferIndex = n;
}
// 新扩容数组的长度
int nextn = nextTab.length;
// ForwardingNode 是继承了 Node 的,但是他所有节点的 hash 值都是设置成了 MOVED,标识着当前节点在迁移当中,在 get 或者 put 时若遇到此 Node,则可以知道当前 Node 正在迁移
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 数组一层层推进的标识符,标识一个桶的迁移工作是否完成,advance == true 表示可以进行下一个位置的迁移
// advance 变量指的是是否继续进行下一个位置的迁移,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进
boolean advance = true;
//标识所有桶是否都已迁移完成,最后一个数据迁移的线程将该值置为 true,并进行本轮扩容的收尾工作
boolean finishing = false;
// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// while 的作用就是确定每个线程需要处理的桶的下标,并且通过i--来遍历hash表中的每个桶,
// 假如旧数组有 32 个桶模板 transferferIndex 就是 32,当第一个线程进来的时候,nextIndex = transferIndex = 32,nextBound = nextIndex - stride = 16,而 transferIndex 也通过 CAS 被调整为 16,所以第一个线程处理桶的范围是从 16 到 31 号桶
//所以第二个线程进来,nextIndex = transferIndex = 16,nextBound = 0
//所以第二个线程处理桶的范围是从 0 到第 15 号桶
while (advance) {
int nextIndex, nextBound;
// 过了第一次之后 --i >= bound,在这里就跳出循环了,这里的 finishing 是为了二次循环检查用的,让最后一个线程遍历整个数组,但是如果多线程的时候 i-- 可能会使 i 跳出这个线程对应的范围,所以用 finishing 保证 i 可以一直--
if (--i >= bound || finishing)
advance = false;
// 当所有的线程都分配完对应需要转移的桶的范围后,transferindex 就为 0,所以当某一个线程完成了自己的任务后,nextIndex = transferIndex = 0,所以 i 被设置为-1,跳出这个 whlle
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
advance = false;
}
// 第一次进入这个扩容方法时,是到这里来的,因为前面 for 循环 i = 0,所以前面的两个判断都跳过,直接来到这里,这里就是在分配当前线程所需要转移的范围
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 确定当前线程每次分配的待迁移桶的范围为[bound, nextIndex)
// 这个值就是当前线程可以处理的最小当前区间最小下标
bound = nextBound;
// 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
i = nextIndex - 1;
// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况
advance = false;
}
}
// 当前线程自己的活已经做完或所有线程的活都已做完,就进入该if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 所有线程已干完活,最后才走这里。
// 扩容结束后,保存新数组,并重新计算扩容阈值,赋值给 sizeCtl
if (finishing) {
nextTable = null;// 删除成员变量
table = nextTab;// 更新 table
// 更新阈值,就是将阈值变为旧数组的 1.5 倍,因为旧阈值是旧数组的 0.75 倍,旧数组扩容 2 倍对应的阈值也就是扩容两倍,即 1.5 倍。
sizeCtl = (n << 1) - (n >>> 1);
// 结束方法
return;
}
// 当前线程已结束扩容,sizeCtl-1 表示参与扩容线程数 -1。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断当前所有扩容任务线程是否都执行完成
// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 不相等,说明没结束,当前线程结束方法。
return;
// 如果相等,扩容结束了,置 finishing = advance = true
finishing = advance = true;
// 最后一个数据迁移线程要重新检查一次旧 table 中的所有桶,看是否都被正确迁移到新table了
// 正常情况下,重新检查时,旧 table 的所有桶都应该是 ForwardingNode
// 特殊情况下,比如扩容冲突(多个线程申请到了同一个 transfer 任务),此时当前线程领取的任务会作废,那么最后检查时
// 还要处理因为作废而没有被迁移的桶,把它们正确迁移到新 table 中
i = n;
}
}
// 当前迁移的桶位没有元素,那就不用迁移了,直接在该位置通过 CAS 添加一个 fwd 占位
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果不是 null 且 hash 值是 MOVED。
else if ((fh = f.hash) == MOVED)
// 说明别的线程已经处理过了,再次推进一个下标
advance = true; // already processed
// 该旧桶未迁移完成,进行数据迁移
else {
// 当前节点需要迁移,加锁迁移,保证多线程安全
// 加锁防止在这个桶位迁移的时候,别的线程对这个桶位进行 putVal 的时候向链表插入数据
synchronized (f) {
// 判断 i 下标处的桶节点是否和 f 相同
if (tabAt(tab, i) == f) {
// 一个低位节点,一个高位节点
//ln 表示低位,hn 表示高位,接下来这段代码的作用是把链表拆分成两部分,0 在低位,1 在高位
Node<K, V> ln, hn;
Node<K,V> ln, hn;
// fh >=0 说明是链表迁移
if (fh >= 0) {
//由于 n 是 2 的幂次方(所有二进制位中只有一个 1),如 n=16(0001 0000),第 4位为 1,那么 hash&n 后的值第 4 位只能为 0 或 1,所以可以根据 hash&n 的结果将所有结点分为两部分。
// 如果是结果是 0 ,将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中它
int runBit = fh & n;
// lastRun 是最终要处理的节点,这里先让指向根节点,后面还会寻找真正的lastRun
Node<K,V> lastRun = f;
// 这个 for 循环找到 lastRun,从 lastRun 开始后面的要在低位都在低位,要在高位都在高位
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 取于桶中每个节点的 hash 值
int b = p.hash & n;
// 如果节点的 hash 值和首节点的 hash 值取于结果不同
if (b != runBit) {
// 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
runBit = b;
// 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
lastRun = p;
}
}
// 如果最后更新的 runBit 是 0 ,设置低位节点
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
// 如果最后更新的 runBit 是 1, 设置高位节点
hn = lastRun;
ln = null;
}
// 遍历链表,把 hash&n 为 0 的放在低位链表中
// 不为 0 的放在高位链表中
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将低位的链表放在 i 位置也就是不动,低位链不需要变
setTabAt(nextTab, i, ln);
// 将高位链表放在 i+n 位置,n 是数组的长度
setTabAt(nextTab, i + n, hn);
// 在原 table 中设置 ForwardingNode 节点以提示该桶扩容完成
setTabAt(tab, i, fwd);
// advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
advance = true;
}
// 如果当前节点是红黑树,则按照红黑树的处理逻辑进行迁移
else if (f instanceof TreeBin) {
// lo 为低位链表头节点,loTail 为低位链表尾节点,hi 和 hiTail 为高位链表头尾节点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 同样也是使用高位和低位两条链表进行迁移
//使用 for 循环以链表方式遍历整棵红黑树,使用尾插法拼接 ln 和 hn 链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
// 这里面形成的是以 TreeNode 为节点的链表
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 和链表相同的判断,与运算 == 0 的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
// 不是 0 的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果长度小于等于 6,则将红黑树转换成链表
// (hc != 0) ? new TreeBin(lo) : t 这行代码的用意在于,如果原来的红黑树没有被拆分成两份,那么迁移后它依旧是红黑树,可以直接使用原来的 TreeBin 对象
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// setTabAt 方法调用的是 Unsafe 类的 putObjectVolatile 方法
// 使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
// 使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
setTabAt(nextTab, i, ln);
// 使用 volatile 的方式将 hn 链设置到新数组下标为 i + n (n为原数组长度) 的位置上
setTabAt(nextTab, i + n, hn);
// 在原 table 中设置 ForwardingNode 节点以提示该桶扩容完成
setTabAt(tab, i, fwd);
// advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
advance = true;
}
}
}
}
}
}
通过源码简单总结一下逻辑:
CPU
核心数和 Map
数组的长度得到每个线程(CPU)
要帮助处理多少个桶,并且这里每个线程处理都是平均的。默认每个线程处理 16
个桶。因此,如果长度是 16
的时候,扩容的时候只会有一个线程扩容。nextTable
。将其在原有基础上扩容两倍。死循环开始转移。多线程并发转移就是在这个死循环中,根据一个 finishing
变量来判断,该变量为 true
表示扩容结束,否则继续扩容。while
循环,分配数组中一个桶的区间给线程,默认是 16
.,从大到小进行分配。当拿到分配值后,进行 i--
递减。这个 i
就是数组下标。(其中有一个 bound
参数,这个参数指的是该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容,还有一个 advance
参数,该参数指的是是否继续递减转移下一个桶,如果为 true
,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进)。while
循环,进 if
判断,判断扩容是否结束,如果扩容结束,清空临时变量,更新 table
变量,更新库容阈值。如果没完成,但已经无法领取区间(也就是没有了),该线程退出该方法,并将 sizeCtl
减 1
,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl
回归了初始状态,表示没有线程再扩容了,该方法所有的线程扩容结束了。(这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量)。然后检查所有的桶,防止遗漏。i
对应的槽位是空,尝试 CAS
插入占位符,让 putVal
方法的线程感知。 如果 i
对应的槽位不是空,且有了占位符,那么该线程跳过这个槽位,处理下一个槽位。如果以上都是不是,说明这个槽位有一个实际的值。开始同步处理这个桶。到这里,都还没有对桶内数据进行转移,只是计算了下标和处理区间,然后一些完成状态判断。同时,如果对应下标内没有数据或已经被占位了,就跳过了。putVal
的时候向链表插入数据。如果这个桶是链表,那么就将这个链表根据 length
取于拆成两份,取于结果是 0
的放在新表的低位,取于结果是 1
放在新表的高位。如果这个桶是红黑数,那么也拆成
2 份,方式和链表的方式一样,然后,判断拆分过的树的节点数量,如果数量小于等于 6
,改造成链表。反之,继续使用红黑树结构。 /**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
这里的注释解释达到最大帮助线程的数量为:65535
进入到 transfer(tab, null)
方法中,有如下一段代码:
上面代码的意思是每条线程至少处理16个桶,如果计算出来的结果少于 16
,则一条线程处理 16
个桶
在扩容过程期间形成的 hn
和 ln
链 使用的是复制引用的方式,也就是说 ln
和 hn
链是复制出来的,而非直接在旧数组的链表 / 红黑树上操作;所以旧数组 hash
桶上的链表 / 红黑树并没有受到影响。
因此从迁移开始 到 结束 这段时间都是可以正常访问旧数组 hash
桶上面的链表 / 红黑树;迁移结束后放旧数组位置上赋值为 fwd
,往后的访问请求会直接转发到扩容后的数组中了。
如果当前链表已经迁移完成,那么头节点会被设置成 fwd
节点,此时写线程会帮助扩容,如果扩容没有完成,当前链表的头节点会被锁住,所以写线程会被阻塞,直到扩容完成。
HashMap
的扩容是创建一个新数组,将值直接放入新数组中,jdk1.7
采用头插入法,会出现死循环,jdk1.8
采用尾插入法,不会造成死循环,但是可能会造成数据覆盖。ConcurrentHashMap
扩容是从数组队尾开始迁移,迁移节点时会锁住节点,迁移完成后将节点设置为转移节点。所以节点迁移完成后将新数组赋值给容器。源码 putVal
方法:
key
的 hash
值是否为 -1
来确定 table
数组是否正在扩容中,如果 hash
值为 -1
表示 table
数组正在被其他线程扩容,当前线程会加入到扩容当中帮助一起扩容。 sizeCtl
为负值时,表示正有 n
个线程正在进行扩容。
只要插入的位置扩容线程还未迁移到,就可以插入。如果当前要插入的位置正在迁移,会帮助其他扩容线程一起扩容,直到扩容结束,然后再加 synchronized
锁执行插入操作。
我们知道,当一个桶位形成了红黑树时,此时桶位中存的是一个 TreeBin
节点,其内部存在两个引用,分别是指向双向链表的 first
和指向红黑树根节点的 root
。这里我们看一下 TreeBin
类的属性,然后介绍 读 和 写实现流程是怎样的。
static final class TreeBin<K,V> extends Node<K,V> {
// 红黑树 根节点
TreeNode<K,V> root;
// 双向链表的头结点
volatile TreeNode<K,V> first;
// 等待者线程(当前lockState是读锁状态)
volatile Thread waiter;
// 锁的状态 :
// 写锁是独占状态,写的时候其他线程不能读或写,lockState = 1;
// 读锁是共享状态,其他线程可以读不能写,把 lockState + 4;
// 等待者状态 写线程要等其他线程读完才能写,把 lockState 最后两位设为2
volatile int lockState;
// 锁处于写状态
static final int WRITER = 1; // set while holding write lock
// 锁处于等待获取写锁状态
static final int WAITER = 2; // set when waiting for write lock
// 锁处于读状态
static final int READER = 4; // increment value for setting read lock
下面是 读 和 写的操作流程:
如果红黑树上有读请求访问,写线程会被阻塞,因为这时候如果写会导致红黑树失衡,会触发自平衡操作,导致数据结构产生变化,会影响读线程的读取结果。
我们知道 ConcurrentHashMap
中的红黑树由 TreeBin
来代理,TreeBin
内部有一个 Int
类型的 lockState
字段。
CAS
的方式将 state
值加 4
(表示加了读锁),读取数据完毕后,再使用 CAS
的方式将 state
值减 4
。state
值是否等于 0
,如果是 0
,则说明没有读线程在查询数据,这时候可以直接写入数据,写线程也会通过 CAS
的方式将 state
字段值设置为 1
(表示加了写锁)。state
值不是 0
,这时候就会 park()
方法挂起当前线程,使其等待被唤醒。挂起写线程时,写线程会先将 state
值的第 2
个 bit 位设置为 1
(二进制为 10
),转换成十进制就是 2
,表示有写线程等待被唤醒。那么反过来,当红黑树上有写线程正在执行写入操作,那么如果有新的读线程请求该怎么处理?
Treebin
对象保留两个数据结构,一个红黑树,一个链表。就是为了这种情况而设计的, 当这个 state
表示写锁状态时,读请求,就不能在到红黑树上面查询。但是也不能因为写操作导致,读操作不能用。TreeBing
保留的链表结构就用上,读请求直接到链表上面去访问,而不经过红黑树。
在读线程结束的时候都会让 state
值减 4
, 减完之后读线程会再检查下 state
的值是不是 2
, 如果是 2
,就说明有写线程在挂起等待。因为写线程挂起之前让 state
的 bit
位第二位设置位 1
,转换成十进制就是 2
,如果等于 2
,说明当前读线程最后一个读线程。并且有一个写线程在等待资源可用,那么这个读线程就使用 LockSupport.unpark()
将这个等待线程唤醒。
synchronized
之前一直都是重量级的锁,但是后来 Java
官方是对他进行过升级的,他现在采用的是锁升级的方式去做的。
针对 synchronized
获取锁的方式,JVM
使用了锁升级的优化方式,就是先使用偏向锁优先同一线程然后再次获取锁,如果失败,就升级为 CAS
轻量级锁,如果失败就会短暂自旋,防止线程被系统挂起。最后如果以上都失败就升级为重量级锁。
所以是一步步升级上去的,最初也是通过很多轻量级的方式锁定的
关于 synchronized 更多的可以看我这篇文章:深入分析 Synchronized 原理
synchronized
性能提升,在 jdk1.6
中对 synchronized
锁的实现引入了大量的优化,会从无锁 》偏向锁》轻量级锁》重量级锁一步步转换,就是锁膨胀优化。以及有锁的粗化、锁消除和自适应自旋等优化。CAS + synchronized
方式时,加锁的对象是每个链条的头节点,相对 Segment
再次提高了并发度。如果使用可重入锁达到同样的效果,则需要大量继承自 ReentrantLock
的对象,造成巨大的内存浪费。这里的并发度可以理解为程序运行时能够同时更新且不产生锁竞争的最大线程数。
jdk1.7
中,实际上就是 ConcurrentHashMap
中的分段锁个数,即 Segment[]
的数组长度,默认是 16
,这个值可以在构造函数中设置。如果自己设置了并发度,ConcurrentHashMap
会使用大于等于该值的最小的 2
的幂指数作为实际并发度。如果并发度设置得太小,会带来严重得锁竞争问题;如果并发度设置得过大,原本位于同一个 Segment
内的访问会扩散到不同的 Segment
中,从而引起程序性能下降。jdk1.8
中,已经抛弃了 Segment
的概念,选择了 Node
数组 + 链表 + 红黑树结构,并发度大小依赖于数组的大小。
我们还可以使用 Collections.synchronizedMap()
方法加同步锁,可以看到 synchronizedMap
内部所有的方法其实都是执行我们传入的 Map
对象的方法,只不过都用 synchronized
进行上锁来保证对 Map
的操作是线程安全的。
我们都知道, HashMap
本身非线程安全的,如果传入的是 HashMap
对象,其实也是对 HashMap
做了一层封装,里面使用对象锁来保证多线程场景下,线程安全,本质也是对 HashMap
进行全表锁,在竞争激烈的多线程环境下性能依然非常差,不推荐使用。
该类中对Map接口中的方法使用synchronized 同步关键字来保证对Map的操作是线程安全的。
不能
这里我们说一下 ConcurrentHashMap
和 HashTable
的异同:
1、线程安全:
ConcurrentHashMap
和 HashTable
都是线程安全的。2、底层数据结构:
ConcurrentHashMap
的底层数据结构: jdk1.7
使用分段数组+链表 实现,jdk1.8
使用数组+链表+红黑树 实现,红黑树可以保证查找效率。HashTable
底层数据结构是用 数组+链表实现。3、保证线程安全的方式:
ConcurrentHashMap jdk1.7
使用分段锁的方式保证线程安全,jdk1.8
使用 synchronized
和 CAS
保证线程安全。HashTable
是用全表锁来保证线程安全(既一个 HashTable
只用一把锁),这种方式的效率非常低。从上面所说的异同,ConcurrentHashMap
比 HashTable
要好,为什么不能完全替代 HashTable
呢?
原因就在于一致性问题:
HashTable
虽然效率上不如 ConcurrentHashMap
,但并不能完全被取代,两者的迭代器的一致性不同的,HashTable
的迭代器是强一致性的,而 ConcurrentHashMap
的迭起器是弱一致性的。 ConcurrentHashMap
的 get,clear,iterator
都是弱一致性的, Doug Lea
也将这个判断留给用户自己决定是否使用 ConcurrentHashMap
。
那么什么是强一致性和弱一致性?
弱一致性 :简单来说就比如我 put
了一个数据进去,本来应该立刻就可以 get
到,但是却可能在一段时间内 get
不到,一致性比较弱。
强一致性:加入数据后,马上就能 get
到。
其实要变成强一致性,就要处处用锁,甚至是用全局锁,HashTable
就是全局锁,但是这样的效率会很低。
而 ConcurrentHashMap
为了提升效率,一致性自然会变弱。
ConcurrentHashMap
的弱一致性主要是为了提升效率,是一致性与效率之间的一种权衡。要成为强一致性,就得到处使用锁,甚至是全局锁,这就与 HashTable
和同步的 HashMap
一样了。
final
:final
域确保变量的初始化安全性,初始化安全性让不可变对象不需要同步就可以被自由的访问和共享。volatile
:来保证某个变量内存的改变对其他线程即时可见,再配合 CAS
可以实现不加锁对并发操作的支持。比如:get
操作可以无锁是由于 Node
的元素 val
和指针 next
是用 volatile
修饰的,在多线程环境下线程 A
修改节点的 val
或者新增节点的时候是对线程 B
可见的。链表长度大于 8
。数组长度大于 64
。从 putVal
方法源码可以看出,并非一开始就创建红黑树结构,如果当前 Node
数组长度小于阈值 MIN_TREEIFY_CAPACITY
,默认为 64
,先通过扩大数组容量为原来的两倍,以缓解单个链表元素过大的性能问题。