在日常开发中使用的HashMap是线程不安全的,而线程安全类Hashtable只是简单的在方法上加锁实现线程安全,效率低下,所以在线程安全的环境下通常会使用ConcurrentHashMap
ConcurrentHashMap中的节点类型
1)Node节点
- static final class TreeNode
extends Node { - TreeNode
parent; // red-black tree links - TreeNode
left; - TreeNode
right; - TreeNode
prev; // needed to unlink next upon deletion - boolean red;
- }
TreeNode就是红黑树的结点,TreeNode不会直接链接到table[i]——桶上面,而是由TreeBin链接,TreeBin会指向红黑树的根结点。
2)TreeBin节点
- static final class TreeBin
extends Node { -
- TreeNode
root; -
- volatile TreeNode
first; -
- volatile Thread waiter;
-
- volatile int lockState;
-
- // values for lockState
-
- static final int WRITER = 1; // set while holding write lock
-
- static final int WAITER = 2; // set when waiting for write lock
-
- static final int READER = 4; // increment value for setting read lock
- }
TreeBin会直接链接到table[i]——桶上面,该结点提供了一系列红黑树相关的操作,以及加锁、解锁操作。另外TreeBin提供了一系列的操作
- TreeBin(TreeNode
- lockRoot(),对红黑树的根结点加写锁
- unlockRoot(),释放写锁
- find(int h, Object k),从根结点开始遍历查找,找到相等的结点就返回它,没找到就返回null,
当存在写锁时,以链表方式进行查找,不阻塞读锁
3)ForwardingNode
- static final class ForwardingNode
extends Node { - final Node
[] nextTable; - }
ForwardingNode在table扩容时使用,内部记录了扩容后的table,即nextTable
- 当table需要扩容时,依次遍历table中的每个槽,如果不为null,把所有元素根据hash值放入扩容后的nextTable中,在原table的槽内放置一个ForwardingNode
- ForwardingNode是一种临时结点,在扩容进行中才会出现,hash值固定为-1,且不存储实际数据
- 如果旧table数组的一个hash桶中全部的结点都迁移到了新table中,则在这个桶中放置一个ForwardingNode
- 读操作碰到ForwardingNode时,将操作转发到扩容后的新table数组上去执行;写操作碰见它时,则尝试帮助扩容,扩容是支持多线程一起扩容
4)ReservationNode保留结点
- static final class ReservationNode
extends Node { - ReservationNode() {
- super(RESERVED, null, null); //-3
- }
-
- Node
find(int h, Object k) { - return null;
- }
- }
- 在并发场景下、在从Key不存在到插入的时间间隔内,为了防止哈希槽被其他线程抢占,当前线程会使用一个reservationNode节点放到槽中并加锁,从而保证线程安全
- hash值固定为-3,不保存实际数据。只在computeIfAbsent和compute这两个函数式API中充当占位符加锁使用
构造器
- public ConcurrentHashMap() {
- }
-
- public ConcurrentHashMap(int initialCapacity) { 初始化容积--数组长度
- this(initialCapacity, LOAD_FACTOR, 1);
- }
-
- public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
- if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
- throw new IllegalArgumentException();
- if (initialCapacity < concurrencyLevel) // Use at least as many bins
- initialCapacity = concurrencyLevel; // as estimated threads
- long size = (long)(1.0 + (long)initialCapacity / loadFactor);
- int cap = (size >= (long)MAXIMUM_CAPACITY) ?
- MAXIMUM_CAPACITY : tableSizeFor((int)size);
- this.sizeCtl = cap;
- }
初始化ConcurrentHashMap的时候这个`Node[]`数组是还未初始化的,会等到第一次put方法调用时才初始化
- //不允许空的key和value,否则异常
-
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- ... ...
- //判断Node数组为空
- if (tab == null || (n = tab.length) == 0)
- //初始化Node数组
- tab = initTable();
- }
此时是会有并发问题的
- private final Node
[] initTable() { - Node
[] tab; int sc; - //每次循环都获取最新的Node数组引用
- while ((tab = table) == null || tab.length == 0) {
- //sizeCtl是一个标记位,若为-1也就是小于0,代表有线程在进行初始化工作了
- if ((sc = sizeCtl) < 0)
- //让出CPU时间片
- Thread.yield(); // lost initialization race; just spin
- //CAS操作,将本实例的sizeCtl变量设置为-1
- else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
- //如果CAS操作成功了,代表本线程将负责初始化工作
- try {
- //再检查一遍数组是否为空
- if ((tab = table) == null || tab.length == 0) {
- //在初始化Map时,sizeCtl代表数组大小,默认16
- //所以此时n默认为16
- int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
- @SuppressWarnings("unchecked")
- //Node数组
- Node
[] nt = (Node[])new Node,?>[n]; - //将其赋值给table变量
- table = tab = nt;
- //通过位运算,n减去n二进制右移2位,相当于乘以0.75
- //例如16经过运算为12,与乘0.75一样,只不过位运算更快
- sc = n - (n >>> 2);
- }
- } finally {
- //将计算后的sc(12)直接赋值给sizeCtl,表示达到12长度就扩容
- //由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题
- //只需要保证可见性
- sizeCtl = sc;
- }
- break;
- }
- }
- return tab
- }
-
table变量使用了volatile来保证每次获取到的都是最新写入的值:
transient volatile Node[] table;
就算有多个线程同时进行put操作,在初始化数组时使用了乐观锁CAS操作来决定到底是哪个线程有资格进行初始化,其他线程均只能等待。
用到的并发技巧:
- volatile变量(sizeCtl):它是一个标记位,用来告诉其他线程这个坑位有没有人在,其线程间的可见性由volatile保证。
- CAS操作:CAS操作保证了设置sizeCtl标记位的原子性,保证了只有一个线程能设置成功
put操作的线程安全
- //put操作的线程安全
-
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null)
- throw new NullPointerException();
- //对key的hashCode进行散列
- int hash = spread(key.hashCode());
- int binCount = 0;
- //一个无限循环,直到put操作完成后退出循环
- for (Node
[] tab = table;;) { - Node
f; int n, i, fh; - //当Node数组为空时进行初始化
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- //Unsafe类volatile的方式取出hashCode散列后通过与运算得出的Node数组下标值对应的Node对象
- //此时的Node对象若为空,则代表还未有线程对此Node进行插入操作
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- //直接CAS方式插入数据
- if (casTabAt(tab, i, null, new Node
(hash, key, value, null))) - //插入成功,退出循环
- break; // no lock when adding to empty bin
- }
- //查看是否在扩容,先不看,扩容再介绍
- else if ((fh = f.hash) == MOVED)
- //帮助扩容
- tab = helpTransfer(tab, f);
- else {
- V oldVal = null;
- //对Node对象进行加锁
- synchronized (f) {
- //二次确认此Node对象还是原来的那一个
- if (tabAt(tab, i) == f) {
- if (fh >= 0) {
- binCount = 1;
- //无限循环,直到完成put
- for (Node
e = f;; ++binCount) { - K ek;
- //和HashMap一样,先比较hash,再比较equals
- if (e.hash == hash &&
- ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
- oldVal = e.val;
- if (!onlyIfAbsent)
- e.val = value;
- break;
- }
- Node
pred = e; - if ((e = e.next) == null) {
- //和链表头Node节点不冲突,就将其初始化为新Node作为上一个Node节点的next
- //形成链表结构
- pred.next = new Node
(hash, key,value, null); - break;
- }
- }
- }
- }
值得关注的是tabAt(tab, i)方法,其使用Unsafe类volatile的操作volatile式地查看值,保证每次获取到的值都是最新的:
- static final
Node tabAt(Node[] tab, int i) { - return (Node
)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); - }
由于其减小了锁的粒度,若Hash完美不冲突的情况下,可同时支持n个线程同时put操作,n为Node数组大小,在默认大小16下,可以支持最大同时16个线程无竞争同时操作且线程安全。hash冲突严重时,Node链表越来越长,将导致严重的锁竞争,此时会进行扩容,将Node进行再散列。
用到的并发技巧:
- 减小锁粒度:将Node链表的头节点作为锁,若在默认大小16情况下,将有16把锁,大大减小了锁竞争(上下文切换),就像开头所说,将串行的部分最大化缩小,在理想情况下线程的put操作都为并行操作。同时直接锁住头节点,保证了线程安全
- Unsafe的getObjectVolatile方法:此方法确保获取到的值为最新。
扩容操作的线程安全
在扩容时,ConcurrentHashMap支持多线程并发扩容,在扩容过程中同时支持get查数据,若有线程put数据,还会帮助一起扩容,这种无阻塞算法将并行最大化
在put值的时候,首先会计算hash值,再散列到指定的Node数组下标中
//根据key的hashCode再散列int hash = spread(key.hashCode());
//使用(n - 1) & hash 运算,定位Node数组中下标值,实际上就是执行 hash % n(f = tabAt(tab, i = (n - 1) & hash);
扩容时的get操作
- public V get(Object key) {
- Node
[] tab; Node e, p; int n, eh; K ek; - int h = spread(key.hashCode());
- if ((tab = table) != null && (n = tab.length) > 0 &&
- (e = tabAt(tab, (n - 1) & h)) != null) {
- if ((eh = e.hash) == h) {
- if ((ek = e.key) == key || (ek != null && key.equals(ek)))
- return e.val;
- }
- //假如Node节点的hash值小于0,则有可能是fwd节点
- else if (eh < 0)
- //调用节点对象的find方法查找值
- return (p = e.find(h, key)) != null ? p.val : null;
- while ((e = e.next) != null) {
- if (e.hash == h &&
- ((ek = e.key) == key || (ek != null && key.equals(ek))))
- return e.val;
- }
- }
- return null;
- }
在get操作的源码中,会判断Node中的hash是否小于0,是否还记得我们的占位Node,其hash为MOVED,为常量值-1,所以此时判断线程正在迁移,委托给fwd占位Node去查找值。