• ConcurrentHashMap 面试题 30 问


    1、请你说说 ConcurrentHashMap 的数据结构有什么区别?

    ConcurrentHashMap 在 jdk1.7 中的结构:

    在这里插入图片描述
    jdk1.7 版本及其以下版本中, ConcurrentHashMap 的数据结构是由 Segments 数组 + HashEntry 数组 + 链表实现的

    不同的是, ConcurrentHashMap 中的数组被分为大数组和小数组,大数组是 Segment ,小数组是 HashEntry Segment 本身是基于 ReentrantLock 可重入锁来实现加锁和释放锁,这样就能保证多线程同时访问 ConcurrentHashMap 的时候,同一时间只能有一个线程操作对应的节点,以此来保证线程安全。

    ConcurrentHashMap 在 jdk1.8 中的结构:

    在这里插入图片描述
    jdk1.8 中,选择了和 HashMap 相同的 Node 数组 + 链表 + 红黑树结构,采用 CAS + Synchronized 来保证并发安全性。抛弃了原有 Segment 分段锁,直接用 table 数组存储键值对;当 HashEntry 对象组成的链表长度超过 TREEIFY_THRESHOLD 时,链表转换为红黑树,提升性能。

    数据查询复杂度

    jdk1.7 遍历链表 O(n)jdk1.8 变成遍历红黑树 O(logN)

    锁粒度

    jdk1.7 是对需要进行数据操作的 Segment 加锁,jdk1.8 调整为对每个数组元素的头节点加锁。

    2、同样是线程安全,ConcurrentHashMap 与 HashTable 在线程同步上有什么不同

    1、 HashTable 实现并发安全的原理是通过 synchronized 关键字,如果我们查看 put() 方法源码,可以看出 put() 方法是被 synchronized 关键字所修饰的,而其他的方法,比如 clear()、get()、size() 等方法也同样是被 synchronized 关键字修饰的。之所以 Hashtable 是线程安全的,是因为几乎每个方法都被 synchronized 关键字所修饰了,这也就保证了线程安全。

    2、ConcurrentHashMap 实现并发安全在 jdk1.7 中采用 分段锁的方式,jdk1.8 中直接采用了 CAS + synchronized + Node 节点的方式,这和 Hashtable 的完全利用 synchronized 的方式有很大的不同。

    3、为什么 ConcurrentHashMap 比 HashTable 效率要高?

    正因为它们在线程安全的实现方式上的不同,导致它们在性能方面也有很大的不同。

    1、ConcurrentHashMap:jdk1.7 的时候,ConcurrentHashMap(分段锁) 对整个桶数组进行了分割分段 (Segment),容器中有多把锁,每一把锁锁一段数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。 到了 jdk1.8 的时候已经抛弃了 Segment 的概念,而是直接用Node 数组+链表+红黑树的数据结构来实现,并发控制使用 synchronizedCAS 来操作。(jdk1.6 以后 对 synchronized 锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap,虽然现在 jdk1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。

    2、Hashtable:使用 synchronized 来保证线程安全,效率非常低下。当一个线程访问同步方法时,其他线程也访问同步方法,而其他线程在此期间是不能操作的,如使用 put() 方法添加元素,另一个线程不能使用 put() 添加元素,也不能使用 get(),竞争会越来越激烈效率越低,不仅如此,还会带来额外的上下文切换等开销,所以此时它的吞吐量甚至还不如单线程的情况。

    4、HashMap 和 ConcurrentHashMap 有什么区别?

    除了加锁,原理上没有太大区别。

    但是 ConcurrentHashMap 集合中的 key 或者 value 插入 null(空) 值时,会报空指针异常,但是单线程操作的 HashMap 允许 key 或者 value 插入null(空)

    这里我们查看源码:
    在这里插入图片描述
    从源码来看,在 key 或者 valuenull(空) 时,直接抛出空指针异常。

    那么为什么 ConcurrentHashMap key 或者 value 不允许插入 null(空),而 HashMap 允许插入?

    有人就问过 ConcurrentHashMap 的作者 Doug Lea,以下是他的回答的邮件内容:

    The main reason that nulls aren't allowed in ConcurrentMaps (ConcurrentHashMaps, ConcurrentSkipListMaps) is that ambiguities that may be just barely tolerable in non-concurrent maps can't be accommodated. The main one is that if map.get(key) returns null, you can't detect whether the key explicitly maps to null vs the key isn't mapped. In a non-concurrent map, you can check this via map.contains(key),but in a concurrent one, the map might have changed between calls. Further digressing: I personally think that allowing nulls in Maps (also Sets) is an open invitation for programs to contain errors that remain undetected until they break at just the wrong time. (Whether to allow nulls even in non-concurrent Maps/Sets is one of the few design issues surrounding Collections that Josh Bloch and I have long disagreed about.)
    
    It is very difficult to check for null keys and values in my entire application .
    
    Would it be easier to declare somewhere     static final Object NULL = new Object(); and replace all use of nulls in uses of maps with NULL? -Doug
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Doug Lea 如此设计最主要的原因是,不容忍在并发场景下出现歧义!

    假设 ConcurrentHashMap 允许存储 null(空) 那么,我们取值的时候会出现两种结果:

    • 值没有在集合中,取出的是 null(空)
    • 值就是 null(空),所以返回的结果就是原本的 null(空)

    这种情况下就会出现歧义,那为什么 HashMap 就不怕出现歧义问题呢?

    这是因为 HashMap 的设计是给单线程使用的,所以如果查询到了 null(空) 值,我们可以通过 hashMap.containsKey(key) 的方法来区分这个 null(空) 值到底是存入的 null(空) ?还是压根不存在的 null(空)?这样歧义问题就得到了解决,所以 HashMap 不怕歧义问题。

    那么有人就会有疑问了,ConcurrentHashMap 里面也有 containsKey() ,是不是可以区分?

    在单线程的情况下这个是没有问题的,但是 ConcurrentHashMap 是针对多线程的,多线程的情况下比较复杂的。我们假设 ConcurrentHashMap 可以存入 null(空) 值,有这样一个场景,现在有一个线程 A 调用了 concurrentHashMap.containsKey(key),我们期望返回的结果是 false,但在我们调用 concurrentHashMap.containsKey(key) 之后,未返回结果之前,线程 B 又调用了 concurrentHashMap.put(key,null) 存入了 null(空) 值,那么线程 A 最终返回的结果就是 true 了,这个结果和我们之前预想的 false 完全不一样。

    也就是说,多线程的状况非常复杂,我们没办法判断某一个时刻返回的 null(空) 值,到底是值为 null(空),还是压根就不存在,也就是歧义问题不可被证伪,所以 ConcurrentHashMap 才会在源码中这样设计,直接杜绝 key value null(空) 的歧义问题。

    5、聊聊 jdk1.8 中 ConcurrentHashMap 值的存操作么?以及是怎么保证线程安全的?

    put 方法源码:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
            // key 和 value 都不允许为空
    		if (key == null || value == null) throw new NullPointerException();
    		//根据 key 计算出 hashCode
    		int hash = spread(key.hashCode());
    		// 用来计算在这个节点总共有多少个元素,用来控制扩容或者转换为树
    		int binCount = 0;
    		// 数组的遍历,自旋插入节点,直到成功
    		for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
    			ConcurrentHashMap.Node<K,V> f; int n, i, fh;
    			//当 Node 数组为 null 或 长度为0,则进行初始化
    			if (tab == null || (n = tab.length) == 0)
    			     // 如何保证初始化数组是线程安全的?下面讲
    				tab = initTable();
    				// Unsafe 类 volatile 的方式取出 hashCode 散列后通过与运算得出的 Node[] 数组下标值对应的 Node 对象
        	        // 此时 Node 位置若为 null,则表示还没有线程在此 Node 位置进行插入操作,说明本次操作是第一次
    			else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    					// 如果这个位置没有元素的话,则通过 CAS 的方式插入数据,CAS 失败则继续下一次循环
    				if (casTabAt(tab, i, null,
    					new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
    					
                 //插入成功,退出循环
    					break;                   // no lock when adding to empty bin
    			}
                // 如果执行到这一步,说明该位置已经存在值
                // 上面的判断中,把这个位置的值赋给了 f ,所以 f.hash 也就是该位置元素的 hash 值
                // 现在这个判断也将 f.hash 赋给了 fh 变量
    			// 如果当前节点的 hash 值为 MOVED,即有其他线程正在扩容,当前线程会帮助它扩容
    			else if ((fh = f.hash) == MOVED)
    			// 帮助扩容
    				tab = helpTransfer(tab, f);
    				// 执行到这里的时候,说明当前 ConcurrentHashMap 没有在扩容,并且要添加的位置已经有元素了,后面的插入很有可能会发生 Hash 冲突
    				//但是并不知道是链表结构还是红黑树结构,所以要执行下面的操作
    			else {
    				//如果都不满足,则利用 synchronized 锁写入数据
    				V oldVal = null;
    				  // 对数组该节点位置加锁(而不是对整个数组加锁,开始处理数组该位置的迁移工作),确保线程安全的将节点插入到链表或红黑树中
    				synchronized (f) {
    				// 确认在加锁过程中没有其他线程更改了节点
    				if (tabAt(tab, i) == f) {
    				 // 当前节点的 hash 值大于 0,表示是链表上的节点
                            if (fh >= 0) {
                               // 用来记录链表长度,来决定是否转化为红黑树
                                binCount = 1;
                                // 遍历链表,将节点插入到链表中
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    // 如果当前元素的 hash 值和 key 跟要存储的位置的节点的相同的时候,替换掉该节点的 value 即可
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                          // 如果 onlyIfAbsent 为 false 对 value 值进行替换
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    // 到了链表的最尾端,将新值插入到链表的最尾端
                                    Node<K,V> pred = e;
                                    // 如果不是同样的 hash,同样的 key 的时候,则判断该节点的下一个节点是否为空
                                    if ((e = e.next) == null) {
                                    //尾插法插入新节点
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                    // 插入完成,终止循环
                                        break;
                                    }
                                }
                            }
                               // 如果 f 的类型是TreeBin(红黑树节点),TreeBin 是 ConcurrentHashMap 的一个内部类,包装了ThreeNode,将节点插入到红黑树中,如果找到相同 key 的节点,则根据 onlyIfAbsent 判断是否替换 value 值
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                // 调用 putTreeVal 方法,将该元素添加到树中去
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
    				}
    				// 链表转红黑树的操作
    				if (binCount != 0) {
    					// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也都是 8 
    					if (binCount >= TREEIFY_THRESHOLD)
    						// 链表 -> 红黑树 转换
    						treeifyBin(tab, i);
    						// 如果存在旧的 value 值,则将旧的 value 值返回
    					if (oldVal != null)
    						return oldVal;
    					break;
    				}
    			}
    		}
          // 增加元素的个数,也就是 size,并且在这个方法中判断是否需要扩容
    		addCount(1L, binCount);
    		return null;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    值得关注的是 tabAt(tab, i) 方法,其使用 Unsafevolatile 的操作 volatile 方式地查看值,保证每次获取到的值都是最新的。

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    	return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    • 1
    • 2
    • 3

    虽然上面的 table 变量加了 volatile,但也只能保证其引用的可见性,并不能确保其数组中的对象是否是最新的,所以需要 Unsafe volatile 拿到最新的 Node

    jdk1.8 中,初始化 ConcurrentHashMap 的时候这个 Node[] 数组是还未初始化的,会等到第一次 put 方法调用时才初始化,那么问题来了,此时是会有并发问题的,如果多个线程同时调用 initTable 初始化 Node 数组怎么办?看看 Doug Lea 大师是如何处理的:

    private final Node<K,V>[] initTable() {
      Node<K,V>[] tab; int sc;
      // 每次循环都获取最新的 Node 数组引用
      while ((tab = table) == null || tab.length == 0) {
        // sizeCtl 是一个标记位,若为 -1 也就是小于 0,代表有线程在进行初始化工作了
        if ((sc = sizeCtl) < 0)
          // 让出 CPU 时间片
          Thread.yield(); // lost initialization race; just spin
        // CAS 操作,将本实例的 sizeCtl 变量设置为 -1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
          // 如果 CAS 操作成功了,代表本线程将负责初始化工作
          try {
            // 再检查一遍数组是否为空
            if ((tab = table) == null || tab.length == 0) {
              // 在初始化 Map 时,sizeCtl 代表数组大小,默认 16
              // 所以此时 n 默认为 16
              int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
              @SuppressWarnings("unchecked")
              // Node 数组
              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
              // 将其赋值给 table 变量
              table = tab = nt;
              // 通过位运算,n 减去 n 二进制右移 2 位,相当于乘以 0.75
              // 例如 16 经过运算为 12,与 乘0.75 一样,只不过位运算更快
              sc = n - (n >>> 2);
            }
          } finally {
            // 将计算后的sc(12)直接赋值给 sizeCtl,表示达到 12 长度就扩容
            //由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题
            //只需要保证可见性
            sizeCtl = sc;
          }
          break;
        }
      }
      return tab;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    table 变量使用了 volatile 来保证每次获取到的都是最新写入的值

    transient volatile Node<K,V>[] table;
    
    • 1

    这里总结一下:

    就算有多个线程同时进行 put 操作,在初始化 Node[] 数组时,使用了 CAS 操作来决定到底是哪个线程有资格进行初始化,其他线程只能等待。用到的并发技巧如下:

    • volatile 修饰 sizeCtl 变量:它是一个标记位,用来告诉其他线程这个坑位有没有线程在进行初始化工作,其线程间的可见性由 volatile 保证。
    • CAS 操作:CAS 操作保证了设置 sizeCtl 标记位的原子性,保证了在多线程同时进行初始化 Node[] 数组时,只有一个线程能成功。

    6、put 方法中提到的 CAS 是什么?自旋又是什么?

    CAS(比较并交换):是原子操作的一种。在多线程没有锁的状态下,可以保证多个线程对同一个值的更新。

    CAS 可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性,产生的数据不一致问题。该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。

    CAS 操作的流程如下图所示:

    线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。

    在这里插入图片描述

    自旋,字面意思就是自己旋转,也就是循环,一般是用一个无限循环实现。

    这样一来,一个无限循环中,执行一个 CAS 操作,当操作成功,返回 true 时,循环结束。当返回 false 时,接着执行循环,继续尝试 CAS 操作,直到返回 true

    7、CAS 就一定能保证数据没被别的线程修改过么?

    并不是,比如很经典的 ABA 问题,CAS 就无法判断了。

    8、什么是 ABA 问题?

    因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了,这就是 ABA 问题。

    9、那么 ABA 问题怎么解决?

    添加版本号

    用版本号去保证就好了,就比如说,我在修改前去查询他原来的值的时候再带一个版本号,每次判断就连值和版本号一起判断,判断成功就给版本号加 1

    添加时间戳

    添加时间戳也可以,查询的时候把时间戳一起查出来,对的上才修改并且更新值的时候一起修改更新时间,这样也能保证,方法很多但是跟版本号都是异曲同工之妙,看场景大家想怎么设计吧。

    10、ConcurrentHashMap 的 get 操作又是怎么样子的呢?

    public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            //计算 hash 值
            int h = spread(key.hashCode());
            //首先通过计算 key 的 hash 值来确定该元素放在table[i]数组的哪个位置
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                //判断头部数据的 hash 和 key 和传入的数据是否一致
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    	//一致直接返回该值
                        return e.val;
                }
                //如果 eh 小于 0,可能是正在扩容,或者是红黑树,执行相应的查找方法
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                //如果是链表,遍历链表,找到 key 对应的 value 值
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            //如果不存在的话返回null
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    进行 get操作大致可以分为以下步骤:

    1. 计算 hash
    2. 首先通过计算 key hash 值来确定该元素放在 table[i] 数组的哪个位置
    3. 判断头部数据的 hash key 和传入的数据是否一致,一致直接返回该值
    4. 如果 eh 小于 0,可能是正在扩容,或者是红黑树,执行相应的查找方法
    5. 如果是链表,遍历链表,找到 key 对应的 value
    6. 如果不存在的话返回 null

    11、get 没有加锁的话,是如何保证读到的数据不是脏数据的呢?

    get 操作可以无锁是由于 Node 的元素 val 和指针 next 是用 volatile 修饰的,volatile 关键字来保证可见性、有序性。但不保证原子性。
    在这里插入图片描述
    从源码中我们看到,Node 节点里面的 val 节点值,和下一个节点 next 都是加了 volatitle 关键字的,也就是说,我们在修改节点值的时候和插入节点值的时候都是线程之间可见的,这样我们在使用 get 方法的时候 就能在不加任何锁的情况下得到最新的值,也就完成了多线程下的同步,保证了多线程下的安全。

    而数组用 volatile 修饰主要是保证在数组扩容的时候保证可见性。
    在这里插入图片描述

    12、哪些情况下,会触发扩容⾏为?

    在每次 put 值之后,有可能触发扩容动作,其中包括两种情况:

    • 如果 put 新的元素,链表的长度达到了阈值 8,会调用 treeifyBin(tab, i) 方法将链表转为红黑树,不过在 treeifyBin(tab, i) 方法中会先对数组长度进行判断,如果数组长度小于 64,则会调用 tryPresize(n << 1) 方法对数组进行扩容,进而通过 transfer() 方法重新调整节点位置。
        private final void treeifyBin(Node<K,V>[] tab, int index) {
            Node<K,V> b; int n, sc;
            if (tab != null) {
            // MIN_TREEIFY_CAPACITY = 64,若数组长度小于 64,则先扩容
                if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                //数组扩容两倍,n << 1 相当于 2n
                    tryPresize(n << 1);
                    //如果数组长度大于等于 64,则进行红黑树的操作
                else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                    synchronized (b) {
                 
                        }
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • put 元素之后,会调用 addCount() 方法将元素个数 +1,并检查是否需要进行扩容,当数组元素个数达到扩容阈值 sizeCtl 时,会触发 transfer() 方法,对数组进行两倍扩容,进而重新调整节点的位置。
    • /*
      * 当数组中的容量大于扩容阈值时:sizeCtl,将进行扩容操作
      * */
      private final void addCount(long x, int check) {
      	CounterCell[] as; long b, s;
      	if ((as = counterCells) != null ||
          	!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
          	// 统计元素个数的操作
          	.........
      		s = sumCount();
      	}
      	if (check >= 0) {
      		Node<K,V>[] tab, nt; int n, sc;
      		  // 如果元素个数大于等于了阈值或 -1 就自旋扩容
             	while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
              	(n = tab.length) < MAXIMUM_CAPACITY) {
              	  // resizeStamp 这个方法返回一个数字,比如 n=16,rs 则 = 32795,rs 后面会被用来将 sizeCtl 设置为负数
              	int rs = resizeStamp(n);
                  // 如果 sc 小于 0,表示已经有其他线程在进行扩容了
                  if (sc < 0) {
                  // 如果全部元素已经转移完了,或者已经达到了最大并发扩容数限制则 breack
                  	if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                      	sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                          transferIndex <= 0)
                          break;
                       // 如果没有,则 sizeCt l加 1,然后进行转移元素
      				if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                      	transfer(tab, nt);
      			}
                 //如果没有在扩容,但是需要扩容。那么就将 sizeCtl 更新,赋值为标识符左移 16 位,一个负数,然后加 2, 表示已经有一个线程开始扩容了。然后进行扩容
      			else if (U.compareAndSwapInt(this, SIZECTL, sc,
                   							(rs << RESIZE_STAMP_SHIFT) + 2))
                   	// 更新 sizeCtl 为负数后,开始扩容
      				transfer(tab, null);
      			// 更新 count,看看是否还需要扩容
      			s = sumCount();
      		}
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39

      13、ConcurrentHashMap 支持多线程并发扩容,那么它是如何进⾏扩容的?

      jdk1.8 版本的 ConcurrentHashMap 支持并发扩容,当第一个线程开始对 ConCurrentHashMap 进行扩容时,直接进入到 transfer(tab, null) 方法进行扩容,后面的线程全部进入到 transfer(tab, nextTab) 方法加入扩容线程大军,协助其他线程进行扩容操作。

      那么到底是怎么样扩容的呢,下面我们根据源码来理解多线程是怎么实现并发扩容的:

      private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
              // stride 为每个 cpu 所需要处理的桶个数
              int n = tab.length, stride;
              // 如果是多 cpu,那么每个线程划分任务,最小任务量是 16 个桶位的迁移
              // 这里让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
              if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
              // 本线程分到的迁移量,假设为16(默认也为16)
                  stride = MIN_TRANSFER_STRIDE;
              //如果新的 table 没有初始化,那么就初始化一个大小为原来 2 倍的数组为新数组,被 nextTab 引用
              if (nextTab == null) {
                  try {
                      @SuppressWarnings("unchecked")
                      // 创建一个两倍长度的新数组
                      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                     //nextTab 引用新数组
                      nextTab = nt;
                  } catch (Throwable ex) {
                    //如果扩容出现错误,则 sizeCtl 赋值为 int 最大值返回
                      sizeCtl = Integer.MAX_VALUE;
                      return;
                  }
                  nextTable = nextTab;
                  //记录线程开始迁移的桶位,从后往前迁移。
                  transferIndex = n;
              }
             // 新扩容数组的长度
              int nextn = nextTab.length;
          // ForwardingNode 是继承了 Node 的,但是他所有节点的 hash 值都是设置成了 MOVED,标识着当前节点在迁移当中,在 get 或者 put 时若遇到此 Node,则可以知道当前 Node 正在迁移
              ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
              // 数组一层层推进的标识符,标识一个桶的迁移工作是否完成,advance == true 表示可以进行下一个位置的迁移
              // advance 变量指的是是否继续进行下一个位置的迁移,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进
              boolean advance = true;
              //标识所有桶是否都已迁移完成,最后一个数据迁移的线程将该值置为 true,并进行本轮扩容的收尾工作
              boolean finishing = false;
              // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
              for (int i = 0, bound = 0;;) {
                  Node<K,V> f; int fh;
                
             // while 的作用就是确定每个线程需要处理的桶的下标,并且通过i--来遍历hash表中的每个桶,
             // 假如旧数组有 32 个桶模板 transferferIndex 就是 32,当第一个线程进来的时候,nextIndex = transferIndex = 32,nextBound = nextIndex - stride = 16,而 transferIndex 也通过 CAS 被调整为 16,所以第一个线程处理桶的范围是从 16 到 31 号桶
      //所以第二个线程进来,nextIndex = transferIndex = 16,nextBound = 0
      //所以第二个线程处理桶的范围是从 0 到第 15 号桶
                  while (advance) {
                      int nextIndex, nextBound;
        // 过了第一次之后 --i >= bound,在这里就跳出循环了,这里的 finishing 是为了二次循环检查用的,让最后一个线程遍历整个数组,但是如果多线程的时候 i-- 可能会使 i 跳出这个线程对应的范围,所以用 finishing 保证 i 可以一直--
                      if (--i >= bound || finishing)
                          advance = false;
      // 当所有的线程都分配完对应需要转移的桶的范围后,transferindex 就为 0,所以当某一个线程完成了自己的任务后,nextIndex = transferIndex = 0,所以 i 被设置为-1,跳出这个 whlle
                      else if ((nextIndex = transferIndex) <= 0) {
                          i = -1;
                          // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
                          advance = false;
                      }
      // 第一次进入这个扩容方法时,是到这里来的,因为前面 for 循环 i = 0,所以前面的两个判断都跳过,直接来到这里,这里就是在分配当前线程所需要转移的范围
                      else if (U.compareAndSwapInt
                               (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                             nextIndex - stride : 0))) {
                         // 确定当前线程每次分配的待迁移桶的范围为[bound, nextIndex)
                         // 这个值就是当前线程可以处理的最小当前区间最小下标
                          bound = nextBound;
                         // 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
                          i = nextIndex - 1;
                          // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况
                          advance = false;
                      }
                  }
            // 当前线程自己的活已经做完或所有线程的活都已做完,就进入该if
                  if (i < 0 || i >= n || i + n >= nextn) {
                      int sc;
                      // 所有线程已干完活,最后才走这里。
                      // 扩容结束后,保存新数组,并重新计算扩容阈值,赋值给 sizeCtl
                      if (finishing) {
                          nextTable = null;// 删除成员变量
                          table = nextTab;// 更新 table
                          // 更新阈值,就是将阈值变为旧数组的 1.5 倍,因为旧阈值是旧数组的 0.75 倍,旧数组扩容 2 倍对应的阈值也就是扩容两倍,即 1.5 倍。
                          sizeCtl = (n << 1) - (n >>> 1);
                          // 结束方法
                          return;
                      }
                // 当前线程已结束扩容,sizeCtl-1 表示参与扩容线程数 -1。
                      if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                       // 判断当前所有扩容任务线程是否都执行完成
                      // 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了
                          if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                          // 不相等,说明没结束,当前线程结束方法。
                              return;
                          // 如果相等,扩容结束了,置 finishing = advance = true
                          finishing = advance = true;
                         // 最后一个数据迁移线程要重新检查一次旧 table 中的所有桶,看是否都被正确迁移到新table了
                       // 正常情况下,重新检查时,旧 table 的所有桶都应该是 ForwardingNode
                       // 特殊情况下,比如扩容冲突(多个线程申请到了同一个 transfer 任务),此时当前线程领取的任务会作废,那么最后检查时
                       // 还要处理因为作废而没有被迁移的桶,把它们正确迁移到新 table 中
                       
                          i = n;
                      }
                  }
            // 当前迁移的桶位没有元素,那就不用迁移了,直接在该位置通过 CAS 添加一个 fwd 占位
                  else if ((f = tabAt(tab, i)) == null)
                      advance = casTabAt(tab, i, null, fwd);
                  // 如果不是 null 且 hash 值是 MOVED。
                  else if ((fh = f.hash) == MOVED)
                  // 说明别的线程已经处理过了,再次推进一个下标
                      advance = true; // already processed
                  // 该旧桶未迁移完成,进行数据迁移
                  else {
                   // 当前节点需要迁移,加锁迁移,保证多线程安全
                   // 加锁防止在这个桶位迁移的时候,别的线程对这个桶位进行 putVal 的时候向链表插入数据
                      synchronized (f) {
                      // 判断 i 下标处的桶节点是否和 f 相同
                          if (tabAt(tab, i) == f) {
                               // 一个低位节点,一个高位节点
                               //ln 表示低位,hn 表示高位,接下来这段代码的作用是把链表拆分成两部分,0 在低位,1 在高位
                              Node<K, V> ln, hn;
                              Node<K,V> ln, hn;
                              // fh >=0 说明是链表迁移
                              if (fh >= 0) {
                               //由于 n 是 2 的幂次方(所有二进制位中只有一个 1),如 n=16(0001 0000),第 4位为 1,那么 hash&n 后的值第 4 位只能为 0 或 1,所以可以根据 hash&n 的结果将所有结点分为两部分。
                                 //  如果是结果是 0 ,将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中它
                     int runBit = fh & n;
                     // lastRun 是最终要处理的节点,这里先让指向根节点,后面还会寻找真正的lastRun
                                  Node<K,V> lastRun = f;
                               // 这个 for 循环找到 lastRun,从 lastRun 开始后面的要在低位都在低位,要在高位都在高位
                                  for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    // 取于桶中每个节点的 hash 值
                                      int b = p.hash & n;
                       // 如果节点的 hash 值和首节点的 hash 值取于结果不同
                                      if (b != runBit) {
                                          // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
                                          runBit = b;
                                          // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
                                          lastRun = p;
                                      }
                                  }
                                  // 如果最后更新的 runBit 是 0 ,设置低位节点
                                  if (runBit == 0) {
                                      ln = lastRun;
                                      hn = null;
                                  }
                                  else {
                                  // 如果最后更新的 runBit 是 1, 设置高位节点
                                      hn = lastRun;
                                      ln = null;
                                  }
                          // 遍历链表,把 hash&n 为 0 的放在低位链表中
                                   // 不为 0 的放在高位链表中
                              for (Node<K,V> p = f; p != lastRun; p = p.next) {
                              	int ph = p.hash; K pk = p.key; V pv = p.val;
                                  if ((ph & n) == 0)
                                  	ln = new Node<K,V>(ph, pk, pv, ln);
                                  else
                                      hn = new Node<K,V>(ph, pk, pv, hn);
                              }
                                // 将低位的链表放在 i 位置也就是不动,低位链不需要变
                                  setTabAt(nextTab, i, ln);
                                 // 将高位链表放在 i+n 位置,n 是数组的长度
                                  setTabAt(nextTab, i + n, hn);
                                  // 在原 table 中设置 ForwardingNode 节点以提示该桶扩容完成
                                  setTabAt(tab, i, fwd);
                                   // advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
                                  advance = true;
                              }
                              // 如果当前节点是红黑树,则按照红黑树的处理逻辑进行迁移
                              else if (f instanceof TreeBin) {
                               // lo 为低位链表头节点,loTail 为低位链表尾节点,hi 和 hiTail 为高位链表头尾节点
                                  TreeBin<K,V> t = (TreeBin<K,V>)f;
                                  TreeNode<K,V> lo = null, loTail = null;
                                  TreeNode<K,V> hi = null, hiTail = null;
                                  int lc = 0, hc = 0;
                                    // 同样也是使用高位和低位两条链表进行迁移
                                    //使用 for 循环以链表方式遍历整棵红黑树,使用尾插法拼接 ln 和 hn 链表
                                  for (Node<K,V> e = t.first; e != null; e = e.next) {
                                      int h = e.hash;
                                       // 这里面形成的是以 TreeNode 为节点的链表
                                      TreeNode<K,V> p = new TreeNode<K,V>
                                          (h, e.key, e.val, null, null);
                                       // 和链表相同的判断,与运算 == 0 的放在低位
                                      if ((h & n) == 0) {
                                          if ((p.prev = loTail) == null)
                                              lo = p;
                                          else
                                              loTail.next = p;
                                          loTail = p;
                                          ++lc;
                                      }
                                      // 不是 0 的放在高位
                                      else {
                                          if ((p.prev = hiTail) == null)
                                              hi = p;
                                          else
                                              hiTail.next = p;
                                          hiTail = p;
                                          ++hc;
                                      }
                                  }
                                  // 如果长度小于等于 6,则将红黑树转换成链表
                                  // (hc != 0) ? new TreeBin(lo) : t 这行代码的用意在于,如果原来的红黑树没有被拆分成两份,那么迁移后它依旧是红黑树,可以直接使用原来的 TreeBin 对象
                                  ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                      (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                  hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                      (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                   // setTabAt 方法调用的是 Unsafe 类的 putObjectVolatile 方法
            // 使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
                                   // 使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
                                  setTabAt(nextTab, i, ln);
                                   // 使用 volatile 的方式将 hn 链设置到新数组下标为 i + n (n为原数组长度) 的位置上 
                                  setTabAt(nextTab, i + n, hn);
                                  // 在原 table 中设置 ForwardingNode 节点以提示该桶扩容完成
                                  setTabAt(tab, i, fwd);
                                  // advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
                                  advance = true;
                              }
                          }
                      }
                  }
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162
      • 163
      • 164
      • 165
      • 166
      • 167
      • 168
      • 169
      • 170
      • 171
      • 172
      • 173
      • 174
      • 175
      • 176
      • 177
      • 178
      • 179
      • 180
      • 181
      • 182
      • 183
      • 184
      • 185
      • 186
      • 187
      • 188
      • 189
      • 190
      • 191
      • 192
      • 193
      • 194
      • 195
      • 196
      • 197
      • 198
      • 199
      • 200
      • 201
      • 202
      • 203
      • 204
      • 205
      • 206
      • 207
      • 208
      • 209
      • 210
      • 211
      • 212
      • 213
      • 214
      • 215
      • 216
      • 217

      通过源码简单总结一下逻辑:

      1. 通过计算 CPU 核心数和 Map 数组的长度得到每个线程(CPU)要帮助处理多少个桶,并且这里每个线程处理都是平均的。默认每个线程处理 16 个桶。因此,如果长度是 16 的时候,扩容的时候只会有一个线程扩容。
      2. 初始化临时变量 nextTable。将其在原有基础上扩容两倍。死循环开始转移。多线程并发转移就是在这个死循环中,根据一个 finishing 变量来判断,该变量为 true 表示扩容结束,否则继续扩容。
      3. 进入一个 while 循环,分配数组中一个桶的区间给线程,默认是 16.,从大到小进行分配。当拿到分配值后,进行 i-- 递减。这个 i 就是数组下标。(其中有一个 bound 参数,这个参数指的是该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容,还有一个 advance 参数,该参数指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进)。
      4. while 循环,进 if 判断,判断扩容是否结束,如果扩容结束,清空临时变量,更新 table 变量,更新库容阈值。如果没完成,但已经无法领取区间(也就是没有了),该线程退出该方法,并将 sizeCtl 1,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl 回归了初始状态,表示没有线程再扩容了,该方法所有的线程扩容结束了。(这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量)。然后检查所有的桶,防止遗漏。
      5. 如果没有完成任务,且 i 对应的槽位是空,尝试 CAS 插入占位符,让 putVal 方法的线程感知。 如果 i 对应的槽位不是空,且有了占位符,那么该线程跳过这个槽位,处理下一个槽位。如果以上都是不是,说明这个槽位有一个实际的值。开始同步处理这个桶。到这里,都还没有对桶内数据进行转移,只是计算了下标和处理区间,然后一些完成状态判断。同时,如果对应下标内没有数据或已经被占位了,就跳过了。
      6. 处理每个桶的行为都是同步的。防止 putVal 的时候向链表插入数据。如果这个桶是链表,那么就将这个链表根据 length 取于拆成两份,取于结果是 0 的放在新表的低位,取于结果是 1 放在新表的高位。如果这个桶是红黑数,那么也拆成 2 份,方式和链表的方式一样,然后,判断拆分过的树的节点数量,如果数量小于等于 6,改造成链表。反之,继续使用红黑树结构。

      14、ConcurrentHashMap 最多允许多少线程同时参与扩容?

      在这里插入图片描述

          /**
           * The maximum number of threads that can help resize.
           * Must fit in 32 - RESIZE_STAMP_BITS bits.
           */
          private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
      
      • 1
      • 2
      • 3
      • 4
      • 5

      这里的注释解释达到最大帮助线程的数量为:65535

      15、ConcurrentHashMap 参与扩容的线程至少负责多少个哈希桶的迁移?

      进入到 transfer(tab, null) 方法中,有如下一段代码:
      在这里插入图片描述
      上面代码的意思是每条线程至少处理16个桶,如果计算出来的结果少于 16 ,则一条线程处理 16 个桶

      16、正在迁移的 hash 桶遇到 get 操作会发生什么?

      在扩容过程期间形成的 hn ln 链 使用的是复制引用的方式,也就是说 ln hn 链是复制出来的,而非直接在旧数组的链表 / 红黑树上操作;所以旧数组 hash 桶上的链表 / 红黑树并没有受到影响。

      因此从迁移开始 到 结束 这段时间都是可以正常访问旧数组 hash 桶上面的链表 / 红黑树;迁移结束后放旧数组位置上赋值为 fwd ,往后的访问请求会直接转发到扩容后的数组中了。

      17、正在迁移的hash桶遇到 put/remove 操作会发生什么?

      如果当前链表已经迁移完成,那么头节点会被设置成 fwd 节点,此时写线程会帮助扩容,如果扩容没有完成,当前链表的头节点会被锁住,所以写线程会被阻塞,直到扩容完成。

      18、ConcurrentHashMap 和 HashMap 的扩容有什么不同?

      1. HashMap 的扩容是创建一个新数组,将值直接放入新数组中,jdk1.7 采用头插入法,会出现死循环,jdk1.8 采用尾插入法,不会造成死循环,但是可能会造成数据覆盖。
      2. ConcurrentHashMap 扩容是从数组队尾开始迁移,迁移节点时会锁住节点,迁移完成后将节点设置为转移节点。所以节点迁移完成后将新数组赋值给容器。

      19、ConcurrentHashMap 是如何发现当前节点正在扩容的?

      源码 putVal 方法:

      • 通过判断数组位置节点或红黑树节点 key hash 值是否为 -1 来确定 table 数组是否正在扩容中,如果 hash 值为 -1 表示 table 数组正在被其他线程扩容,当前线程会加入到扩容当中帮助一起扩容。

      在这里插入图片描述

    • sizeCtl 为负值时,表示正有 n 个线程正在进行扩容。
    • 在这里插入图片描述
      在这里插入图片描述

      20、ConcurrentHashMap 扩容期间在未迁移到的 hash 桶插入数据会发生什么?

      只要插入的位置扩容线程还未迁移到,就可以插入。如果当前要插入的位置正在迁移,会帮助其他扩容线程一起扩容,直到扩容结束,然后再加 synchronized 锁执行插入操作。

      21、ConcurrentHashMap 如果某个桶位升级为红黑树,并且当前红黑树上面有读线程访问,再来写请求怎么办?

      我们知道,当一个桶位形成了红黑树时,此时桶位中存的是一个 TreeBin 节点,其内部存在两个引用,分别是指向双向链表的 first 和指向红黑树根节点的 root。这里我们看一下 TreeBin 类的属性,然后介绍 读 和 写实现流程是怎样的。

          static final class TreeBin<K,V> extends Node<K,V> {
              // 红黑树 根节点
              TreeNode<K,V> root;
              // 双向链表的头结点
              volatile TreeNode<K,V> first;
              // 等待者线程(当前lockState是读锁状态)
              volatile Thread waiter;
              // 锁的状态 :
              // 写锁是独占状态,写的时候其他线程不能读或写,lockState = 1;
              // 读锁是共享状态,其他线程可以读不能写,把 lockState + 4;
              // 等待者状态 写线程要等其他线程读完才能写,把 lockState 最后两位设为2
              volatile int lockState;
              // 锁处于写状态
              static final int WRITER = 1; // set while holding write lock
              // 锁处于等待获取写锁状态
              static final int WAITER = 2; // set when waiting for write lock
              // 锁处于读状态
              static final int READER = 4; // increment value for setting read lock
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

      下面是 读 和 写的操作流程:

      如果红黑树上有读请求访问,写线程会被阻塞,因为这时候如果写会导致红黑树失衡,会触发自平衡操作,导致数据结构产生变化,会影响读线程的读取结果。

      我们知道 ConcurrentHashMap 中的红黑树由 TreeBin 来代理,TreeBin 内部有一个 Int 类型的 lockState字段。

      1. 当读线程在读取数据时,会使用 CAS 的方式将 state 值加 4(表示加了读锁),读取数据完毕后,再使用 CAS 的方式将 state 值减 4
      2. 如果写线程去向红黑树中写入数据时,会先检查 state 值是否等于 0,如果是 0,则说明没有读线程在查询数据,这时候可以直接写入数据,写线程也会通过 CAS 的方式将 state 字段值设置为 1(表示加了写锁)。
      3. 如果写线程检查 state 值不是 0,这时候就会 park() 方法挂起当前线程,使其等待被唤醒。挂起写线程时,写线程会先将 state 值的第 2 个 bit 位设置为 1(二进制为 10),转换成十进制就是 2,表示有写线程等待被唤醒。

      那么反过来,当红黑树上有写线程正在执行写入操作,那么如果有新的读线程请求该怎么处理?

      Treebin 对象保留两个数据结构,一个红黑树,一个链表。就是为了这种情况而设计的, 当这个 state 表示写锁状态时,读请求,就不能在到红黑树上面查询。但是也不能因为写操作导致,读操作不能用。TreeBing 保留的链表结构就用上,读请求直接到链表上面去访问,而不经过红黑树。

      22、ConcurrentHashMap 挂起等待的写线程,什么时候将其唤醒再继续执行写操作?

      在读线程结束的时候都会让 state 值减 4, 减完之后读线程会再检查下 state 的值是不是 2, 如果是 2 ,就说明有写线程在挂起等待。因为写线程挂起之前让 statebit 位第二位设置位 1,转换成十进制就是 2,如果等于 2,说明当前读线程最后一个读线程。并且有一个写线程在等待资源可用,那么这个读线程就使用 LockSupport.unpark()将这个等待线程唤醒。

      23、CAS性能很高,但是 synchronized 性能不是很好,为什么 jdk1.8 之后使用 synchronized?

      synchronized 之前一直都是重量级的锁,但是后来 Java 官方是对他进行过升级的,他现在采用的是锁升级的方式去做的。

      针对 synchronized 获取锁的方式,JVM 使用了锁升级的优化方式,就是先使用偏向锁优先同一线程然后再次获取锁,如果失败,就升级为 CAS 轻量级锁,如果失败就会短暂自旋,防止线程被系统挂起。最后如果以上都失败就升级为重量级锁。

      所以是一步步升级上去的,最初也是通过很多轻量级的方式锁定的

      关于 synchronized 更多的可以看我这篇文章:深入分析 Synchronized 原理

      24、jdk1.8 的 ConcurrentHashMap 为什么要使用内置锁 synchronized 来替换ReentractLock 重入锁?

      1. synchronized 性能提升,在 jdk1.6 中对 synchronized 锁的实现引入了大量的优化,会从无锁 》偏向锁》轻量级锁》重量级锁一步步转换,就是锁膨胀优化。以及有锁的粗化、锁消除和自适应自旋等优化。
      2. 提升并发度和减少内存开销, CAS + synchronized 方式时,加锁的对象是每个链条的头节点,相对 Segment 再次提高了并发度。如果使用可重入锁达到同样的效果,则需要大量继承自 ReentrantLock 的对象,造成巨大的内存浪费。

      25、ConcurrentHashMap 的并发度是如何设计的?

      这里的并发度可以理解为程序运行时能够同时更新且不产生锁竞争的最大线程数。

      1. jdk1.7 中,实际上就是 ConcurrentHashMap 中的分段锁个数,即 Segment[] 的数组长度,默认是 16,这个值可以在构造函数中设置。如果自己设置了并发度,ConcurrentHashMap 会使用大于等于该值的最小的 2 的幂指数作为实际并发度。如果并发度设置得太小,会带来严重得锁竞争问题;如果并发度设置得过大,原本位于同一个 Segment 内的访问会扩散到不同的 Segment 中,从而引起程序性能下降。
      2. jdk1.8 中,已经抛弃了 Segment 的概念,选择了 Node 数组 + 链表 + 红黑树结构,并发度大小依赖于数组的大小。

      26、那么除了 ConcurrentHashMap,线程下安全的操作 Map 还有其他方法吗?

      在这里插入图片描述
      我们还可以使用 Collections.synchronizedMap() 方法加同步锁,可以看到 synchronizedMap 内部所有的方法其实都是执行我们传入的 Map 对象的方法,只不过都用 synchronized 进行上锁来保证对 Map 的操作是线程安全的。

      我们都知道, HashMap 本身非线程安全的,如果传入的是 HashMap 对象,其实也是对 HashMap 做了一层封装,里面使用对象锁来保证多线程场景下,线程安全,本质也是对 HashMap 进行全表锁,在竞争激烈的多线程环境下性能依然非常差,不推荐使用。

      该类中对Map接口中的方法使用synchronized 同步关键字来保证对Map的操作是线程安全的。

      27、ConcurrentHashMap 能完全替代 Hashtable 吗?

      不能

      这里我们说一下 ConcurrentHashMap HashTable 的异同:

      1、线程安全:

      • ConcurrentHashMap HashTable 都是线程安全的。

      2、底层数据结构:

      • ConcurrentHashMap 的底层数据结构: jdk1.7使用分段数组+链表 实现,jdk1.8 使用数组+链表+红黑树 实现,红黑树可以保证查找效率。
      • HashTable 底层数据结构是用 数组+链表实现。

      3、保证线程安全的方式:

      • ConcurrentHashMap jdk1.7 使用分段锁的方式保证线程安全,jdk1.8 使用 synchronized CAS 保证线程安全。
      • HashTable 是用全表锁来保证线程安全(既一个 HashTable 只用一把锁),这种方式的效率非常低。

      从上面所说的异同,ConcurrentHashMap HashTable 要好,为什么不能完全替代 HashTable 呢?

      原因就在于一致性问题:

      HashTable 虽然效率上不如 ConcurrentHashMap,但并不能完全被取代,两者的迭代器的一致性不同的,HashTable 的迭代器是强一致性的,而 ConcurrentHashMap 的迭起器是弱一致性的。 ConcurrentHashMapget,clear,iterator 都是弱一致性的, Doug Lea 也将这个判断留给用户自己决定是否使用 ConcurrentHashMap

      那么什么是强一致性和弱一致性?

      弱一致性 :简单来说就比如我 put 了一个数据进去,本来应该立刻就可以 get 到,但是却可能在一段时间内 get 不到,一致性比较弱。

      强一致性:加入数据后,马上就能 get 到。

      其实要变成强一致性,就要处处用锁,甚至是用全局锁,HashTable 就是全局锁,但是这样的效率会很低。

      ConcurrentHashMap 为了提升效率,一致性自然会变弱。

      ConcurrentHashMap 的弱一致性主要是为了提升效率,是一致性与效率之间的一种权衡。要成为强一致性,就得到处使用锁,甚至是全局锁,这就与 HashTable 和同步的 HashMap 一样了。

      28、ConcurrentHashMap jdk1.8 中变量使用 final 和 volatile 修饰有什么用?

      1. final final域确保变量的初始化安全性,初始化安全性让不可变对象不需要同步就可以被自由的访问和共享。
      2. volatile:来保证某个变量内存的改变对其他线程即时可见,再配合 CAS 可以实现不加锁对并发操作的支持。比如:get 操作可以无锁是由于 Node 的元素 val 和指针 next 是用 volatile 修饰的,在多线程环境下线程 A 修改节点的 val 或者新增节点的时候是对线程 B 可见的。

      30、ConcurrentHashMap jdk1.8中,什么情况下链表才会转换成红黑树?

      链表长度大于 8。数组长度大于 64。从 putVal 方法源码可以看出,并非一开始就创建红黑树结构,如果当前 Node 数组长度小于阈值 MIN_TREEIFY_CAPACITY,默认为 64,先通过扩大数组容量为原来的两倍,以缓解单个链表元素过大的性能问题。

      未完待续

  • 相关阅读:
    深入学习 Redis Cluster - 基于 Docker、DockerCompose 搭建 Redis 集群,处理故障、扩容方案
    前端面试题目(二十三)
    数据结构:反射
    本地GPT-window平台 搭建ChatGLM3-6B
    Archlinux Gnome上解决N卡驱动安装和IBUS输入法两个小问题记录
    Android studio新版本多渠道打包配置
    windbg使用教程
    Blazor实战——Known框架多表增删改查
    MySQL 索引简介
    【机器学习】向量化计算 -- 机器学习路上必经路
  • 原文地址:https://blog.csdn.net/wuhuayangs/article/details/126049472