• JDK1.8的ConcurrentHashMap底层实现解析


    ConcurrentHashMap的特性

    1. concurrentHashMap跟Hashtable具有相同的功能方法。可以看作是Hashtable的升级版,HashMap的线程安全版。
    2. 跟Hashtable相同,它的键或值不允许是null。
    3. ConcurrentHashMap和HashMap一样都是采用拉链法处理哈希冲突,且都为了防止单链表过长影响查询效率,所以当链表长度超过某一个值时候将用红黑树代替链表进行存储,采用了数组+链表+红黑树的结构

     

    ConcurrentHashMap与Hashtable比较

    1. 线程安全的实现:Hashtable采用对象锁(synchronized修饰对象方法)来保证线程安全,也就是一个Hashtable对象只有一把锁,如果线程1拿了对象A的锁进行有synchronized修饰的put方法,其他线程是无法操作对象A中有synchronized修饰的方法的(如get方法、remove方法等),竞争激烈所以效率低下。而ConcurrentHashMap采用CAS + synchronized来保证并发安全性,且synchronized关键字不是用在方法上而是用在了具体的对象上,实现了更小粒度的锁。
    2. 数据结构的实现:Hashtable采用的是数组 + 链表,当链表过长会影响查询效率,而ConcurrentHashMap采用数组 + 链表 + 红黑树,当链表长度超过某一个值,则将链表转成红黑树,提高查询效率。

    ConcurrentHashMap

    我们先看一下ConcurrentHashMap实现了哪些接口、继承了哪些类,对ConcurrentHashMap有一个整体认知。

     可以发现,ConcurrentHashMapHashMap一样继承AbstractMap接口,

    然后实现了ConcurrentMap接口,这个和HashMap不一样,HashMap是直接实现的Map接口。

    部分关键参数

    1. //node数组最大容量:2^30=1073741824
    2. private static final int MAXIMUM_CAPACITY = 1 << 30;
    3. //默认初始表容量。必须是 2 的幂(即至少为 1)且最多为 MAXIMUM_CAPACITY
    4. private static final int DEFAULT_CAPACITY = 16;
    5. //最大可能(非二的幂)数组大小。 toArray 和相关方法需要
    6. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    7. //负载因子
    8. private static final float LOAD_FACTOR = 0.75f;
    9. // 链表转红黑树阀值,> 8 链表转换为红黑树
    10. static final int TREEIFY_THRESHOLD = 8;
    11. //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
    12. static final int UNTREEIFY_THRESHOLD = 6;
    13. //存放node的数组
    14. transient volatile Node[] table;
    15. /*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
    16. *当为负数时:-1 代表正在初始化,-N 代表有N-1 个线程正在 进行扩容
    17. *当为 0 时:代表当时的table还没有被初始化
    18. *当为正数时:表示初始化或者下一次进行扩容的大小
    19. */
    20. private transient volatile int sizeCtl;

    几个重要的成员变量table、nextTable、baseCount、sizeCtl、transferIndex、cellsBusy

    • table:数据类型是Node数组,这里的Node和HashMap的Node一样都是内部类且实现了Map.Entry接口
    • nextTable:哈希表扩容时生成的数据,数组为扩容前的2倍
    • sizeCtl:多个线程的共享变量,是操作的控制标识符,它的作用不仅包括threshold的作用,在不同的地方有不同的值也有不同的用途。
    1.   -1代表正在初始化
    2.   -N代表有N-1个线程正在进行扩容操作
    3.    0代表hash表还没有被初始化
    4.     正数表示下一次进行扩容的容量大小
    • ForwardingNode:一个特殊的Node节点,Hash地址为-1,存储着nextTable的引用,只有table发生扩用的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或者已被移动

    ConcurrentHashMap 针对 ForwardingNode、ReservationNode,以及树根结点都定义了特定的哈希值:

    1. /*
    2. * 节点哈希字段的编码. See above for explanation.
    3. */
    4. /** ForwardingNode 结点的 hash 值 */
    5. static final int MOVED = -1; // hash for forwarding nodes 转发节点的哈希
    6. static final int TREEBIN = -2; // hash for roots of trees 树根的哈希
    7. static final int RESERVED = -3; // hash for transient reservations 临时预订的哈希
    8. static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

    构造方法


    它的构造方法一共有5个,从数量上看就和HashMap、Hashtable(4个)的不同,多出的那个构造函数是public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel),即除了传入容量大小、负载因子之外还多传入了一个整型的concurrencyLevel,这个整型是我们预先估计的并发量,比如我们估计并发是30,那么就可以传入30。

    其他的4个构造函数的参数和HashMap的一样,而具体的初始化过程却又不相同,HashMap和Hashtable传入的容量大小和负载因子都是为了计算出初始阈值(threshold),而ConcurrentHashMap传入的容量大小和负载因子是为了计算出sizeCtl用于初始化table,这个sizeCtl即table数组的大小,不同的构造函数计算sizeCtl方法都不一样。
     

    1. //1. 无参构造函数,什么也不做,此时sizeCtl参数为
    2. //table的初始化放在了第一次插入数据时,默认容量大小是16。
    3. //使用默认初始表大小 (16) 创建一个新的空映射
    4. public ConcurrentHashMap() {
    5. }
    6. //2. 传入容量大小的构造方法
    7. public ConcurrentHashMap(int initialCapacity) {
    8. if (initialCapacity < 0)
    9. throw new IllegalArgumentException();
    10. //如果传入的容量大小大于允许的最大容量值 则cap取允许的容量最大值 否则cap =
    11. //((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方),
    12. //即如果传入的容量大小是12 则 cap = 32
    13. //(12 >>> 1)=1100>>>1=0110=7
    14. //=(12 + (12 >>> 1) + 1=19 并向上取2的幂次方即32)
    15. //,这里为啥一定要是2的幂次方,原因和HashMap的threshold一样,
    16. //都是为了让位运算和取模运算的结果一样。
    17. //MAXIMUM_CAPACITY即允许的最大容量值 为2^30。
    18. int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
    19. MAXIMUM_CAPACITY :
    20. tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    21. //将上面计算出的cap 赋值给sizeCtl,注意此时sizeCtl为正数,代表进行扩容的容量大小。
    22. this.sizeCtl = cap;
    23. }
    24. //3.创建一个与给定Map具有相同映射Map
    25. public ConcurrentHashMap(Map m) {
    26. //置sizeCtl为默认容量大小 即16。
    27. this.sizeCtl = DEFAULT_CAPACITY;
    28. putAll(m);
    29. }
    30. //4.传入容量大小和负载因子的构造方法
    31. //默认并发数大小是1。
    32. public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    33. this(initialCapacity, loadFactor, 1);
    34. }
    35. //5. 传入容量大小、负载因子和并发数大小的构造方法
    36. public ConcurrentHashMap(int initialCapacity,
    37. float loadFactor, int concurrencyLevel) {
    38. //如果负载因子>0,传入容量大小<0,并发数大小<=0一项不符合就
    39. //直接抛出IllegalArgumentException非法参数异常
    40. if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
    41. throw new IllegalArgumentException();
    42. //如果传入的容量大小小于并发数大小,咱们就把初始容量变成更大的并发数。这样做的原因是确保每一个Node只会分配给一个线程
    43. if (initialCapacity < concurrencyLevel) //
    44. initialCapacity = concurrencyLevel; //
    45. /**
    46. 下面就时计算sizeCtl的值了,用于下一次扩容的时候就是初始化的时候。
    47. */
    48. long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    49. //如果size比允许的最大容量值还大,那直接sizeCtl= 允许的最大容量值。
    50. // 否则对size进行tableSizeFor操作就是 size向上取2的幂次方
    51. //比如 size = 6 tableSizeFor((int)size)=9
    52. // size = 9 tableSizeFor((int)size)=16
    53. // size = 12 tableSizeFor((int)size)=32
    54. int cap = (size >= (long)MAXIMUM_CAPACITY) ?
    55. MAXIMUM_CAPACITY : tableSizeFor((int)size)=9;
    56. this.sizeCtl = cap; //计算好之后便赋值给sizeCtl
    57. }

    构造方法简单总结:

    都是为了算出sizeCtl值,也就是初始化table数组大小。

    一共有5个构造方法。

    • 无参构造函数,什么也不做,此时sizeCtl参数为0,初始化操作在put操作中实现
    • 传入容量大小的构造方法,对参数校验后(>0),传入的容量大小大于MAXIMUM_CAPACITY即允许的最大容量 2^30, 则cap取允许的容量最大值。比他小咱们就做tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1))操作,对initialCapacity + (initialCapacity >>> 1) + 1 向上取最近的2的幂次方,最后将值给sizeCtl。
    • 创建一个与给定Map具有相同映射Map。
    • 传入容量大小和负载因子的构造方法,它调用第5个方法,并发数大小=1。
    • 传入容量大小、负载因子和预期并发数大小的构造方法。具体看上面。

    put操作

         put

    1. public V put(K key, V value) {
    2. return putVal(key, value, false);//调用了putVal方法
    3. }

       putVal

    1. final V putVal(K key, V value, boolean onlyIfAbsent) {
    2. if (key == null || value == null) throw new NullPointerException();
    3. //如果有一个为null,那么直接报错。所以可以得出concurrentHashMap中Key,value不能为空。
    4. int hash = spread(key.hashCode());//计算key的hash值,参与插入位置的计算
    5. int binCount = 0;
    6. for (Node[] tab = table;;) {
    7. //循环尝试,for循环内它是if..。else if...else格式
    8. Node f; int n, i, fh;
    9. //如果table数组为null,说明还没有table数组呢,那咱们就开始初始化数组。
    10. if (tab == null || (n = tab.length) == 0)
    11. //初始化之后,会直接进行下次循环。
    12. tab = initTable();
    13. 如果i位置没有数据(即该下标上的链表为空),就直接CAS无锁插入
    14. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    15. /* 获取对应位置Node节点(链表头节点)(Volatile的),casTabAt方法CAS无锁的添加到
    16. 空箱, 如果插入成功了则跳出for循环,插入失败(其他线程抢先插入了),
    17. 那么继续下次循环。*/
    18. if (casTabAt(tab, i, null,
    19. new Node(hash, key, value, null)))
    20. break;
    21. }
    22. //如果该下标上的节点的哈希地址为-1(即链表的头节点为ForwardingNode节点),则表示
    23. //table正在进行扩容(transfer),那咱们先等着吧。
    24. //注意的::ConcurrentHashMap初始化和扩容不是用同一个方法,而
    25. //HashMap和Hashtable都是用同一个方法,并且当前线程会去协助扩容,扩容过程后面介绍。
    26. else if ((fh = f.hash) == MOVED)//-1
    27. tab = helpTransfer(tab, f);//返回扩容完成后的table。
    28. //上面情况都没有,那么我们就将进入到链表中,将新节点插入或者覆盖旧值。
    29. else {
    30. V oldVal = null;
    31. // only针对首个节点进行加锁操作,一次锁的资源小了,减少了锁的粒度。也正是因为
    32. // 这个提高了ConcurrentHashMap的效率,提高了并发度。
    33. synchronized (f) {
    34. if (tabAt(tab, i) == f) { //做一次校验看看头节点一样不
    35. if (fh >= 0) {//哦~不知道fh怎么来的?看26行。节点的哈希地址大于
    36. //等于0,则表示这是个链表
    37. binCount = 1;
    38. //遍历链表,大家应该非常熟悉了吧。
    39. for (Node e = f;; ++binCount) {
    40. K ek;
    41. if (e.hash == hash &&
    42. ((ek = e.key) == key ||
    43. (ek != null && key.equals(ek)))) {
    44. //如果发现有key一样的,那咱们就把value值替换掉。
    45. oldVal = e.val;
    46. if (!onlyIfAbsent)
    47. e.val = value;
    48. break;//结束推出链表循环
    49. }
    50. // 如果没有找到值为key的节点,直接新建Node并加入链表尾部即可。
    51. Node pred = e;
    52. if ((e = e.next) == null) {//到链表尾部了。
    53. //将节点插入链表尾部
    54. pred.next = new Node(hash, key,
    55. value, null);
    56. break;//结束推出链表循环
    57. }
    58. }
    59. }
    60. else if (f instanceof TreeBin) {//如果首节点为TreeBin类型,
    61. // 说明为红黑树结构,执行putTreeVal操作
    62. Node p;
    63. binCount = 2;
    64. if ((p = ((TreeBin)f).putTreeVal(hash, key,
    65. value)) != null) {
    66. oldVal = p.val;
    67. if (!onlyIfAbsent)
    68. p.val = value;
    69. }
    70. }
    71. }
    72. }
    73. //这时候同步代码块已经结束,元素插入完毕,释放了锁。
    74. if (binCount != 0) {
    75. if (binCount >= TREEIFY_THRESHOLD)//如果链表的长度大于树形阈值 8 时
    76. //就会进行红黑树的转换
    77. treeifyBin(tab, i);
    78. if (oldVal != null)
    79. return oldVal;//如果key对应的以前有值,那么返回oldVal
    80. break;
    81. }
    82. }
    83. }
    84. addCount(1L, binCount);//统计size,可能会扩容transfer操作。
    85. return null;
    86. }

    总结一下put流程:

    1. 校验Key,value是否为空。如果有一个为null,那么直接报NullPointerException异常。所以可以得出concurrentHashMap中Key,value不能为空。
    2. 循环尝试插入。进入循环。
    3. case1:如果没有初始化就先调用initTable()方法来进行初始化过程
    4. case2:根据Hash值计算插入位置(n - 1) & hash=i。如果没有hash冲突,也就是说插入位置上面没有数据,就直接casTabAt()方法将数据插入。
    5. case3:插入位置上有数据。数据的头节点的哈希地址为-1(即链表的头节点为ForwardingNode节点),则表示其他线程正在对table进行扩容(transfer),就先等着,等其他线程扩容完了咱们再尝试插入。
    6. case4:上面情况都没有。就对首节点加synchronized锁来保证线程安全,两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,结束加锁。
    7. 如果Hash冲突时会形成Node链表,在链表长度超过8,Node数组超过64 时会将链表结构转换为红黑树的结构。
    8. break退出循环。
    9. 调用addCount()方法统计Map已存储的键值对数量size++,检查是否需要扩容,需要扩容就扩容。

    了解了它的基本流程之后,我们来看其中的一些调用方法。

    1. initTable 初始化数组
    2. tabAt 获取对应位置Node节点
    3. casTabAt CAS添加节点
    4. helpTransfer 帮助扩容,如果线程进入到这边说明已经有其他线程正在做扩容操作,这个是一个辅助方法
    5. transfer 扩容操作(重点)
    6. addCount ConcurrentHashMap的键值对数量size+1,并判断是否需要扩容

    initTable
    第一次进入putVal循环发现table数组是null的时候调用。

    initTable()方法初始化一个合适大小的table数组,然后设置sizeCtl值(下一次扩容阈值=数组长度*0.75)。

    我们知道ConcurrentHashMap是线程安全的,即支持多线程的,那么一开始很多个线程同时执行put()方法,而table又没初始化,那么就会很多个线程会去执行initTable()方法尝试初始化table,而put方法和initTable方法都是没有加锁的(synchronize),那怎么保证线程安全的呢?
     

    1. /**
    2. * 初始化表table数组 大小为sizeCtl值。
    3. */
    4. private final Node[] initTable() {
    5. Node[] tab; int sc;
    6. //先看看table是否被初始化,如果已经初始化了,结束while循环
    7. //如果table为null那么一直while循环尝试初始化,直到完成
    8. while ((tab = table) == null || tab.length == 0) {\
    9. if ((sc = sizeCtl) < 0) // sizeCtl< 0说明,其他线程抢先的正在初始化或者扩容,
    10. Thread.yield(); // 初始化竞争失败;让出cpu,等着就完了。运行态转成就绪态
    11. //sizeCtl >= 0
    12. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //Unsafed.compareAndSwapInt以CAS操作CASsizeCtl=-1,表示初始化状态。
    13. try {
    14. if ((tab = table) == null || tab.length == 0) {//再次确认当前table为null即还未初始化,这个判断不能少。
    15. //如果sc(sizeCtl)大于0,则n=sc,否则n=默认的容量大
    16. 16
    17. //这里的sc=sizeCtl=0,即如果在构造函数没有指定容量
    18. 大小,
    19. //否则使用了有参数的构造函数,sc=sizeCtl=指定的容量大小。
    20. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    21. @SuppressWarnings("unchecked")
    22. Node[] nt = (Node[])new Node[n]; //new Node数组(table)
    23. table = tab = nt;
    24. sc = n - (n >>> 2); //计算阈值,n - (n >>> 2) = 0.75*n,大于这个阈值,就会发生扩容。
    25. }
    26. } finally {
    27. sizeCtl = sc;
    28. }
    29. break;
    30. }
    31. }
    32. return tab;
    33. }

    initTable()初始化方法总结
           initTable()用于里面table数组的初始化,值得一提的是table初始化是没有加锁的,那么如何处理并发呢?
    由上面代码可以看到,当要初始化时会通过CAS操作将sizeCtl置为-1,而sizeCtl由volatile修饰,保证修改对后面线程可见。
    这之后如果再有线程执行到此方法时检测到sizeCtl为负数,说明已经有线程在给扩容了,这个线程就会调用Thread.yield()让出一次CPU执行时间。

    initTable流程总结:

    1. 如果table==null,进入循环。

    2. case1: sizeCtl< 0 说明其他线程抢先对table初始化或者扩容,就调用Thread.yield(); 让出一次cpu,等下次抢到cpu再循环判断。

    3. case2: 以CAS操作CASsizeCtl=-1,表示当前线程正在初始化。下面就开始初始化。

    4. 判断sizeCtl的值。 sc(sizeCtl)大于0,则 容量大小=sc,

    5. sc(sizeCtl)<=0,即如果在使用了有参数的构造函数,sc=sizeCtl=指定的容量大小,否则n=默认的容量大小16。

    6. 用上面求出的容量大小new出table数组。

    7. 计算阈值,sizeCtl = n - (n >>> 2) = 0.75*n。

    tabAt AND casTabAt

        

    1. @SuppressWarnings("unchecked")
    2. static final Node tabAt(Node[] tab, int i) {
    3. return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    4. }
    5. static final boolean casTabAt(Node[] tab, int i,
    6. Node c, Node v) {
    7. return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    8. }

    helpTransfer 和 transfer

       

    1. /**
    2. * Helps transfer if a resize is in progress.
    3. */
    4. //协助扩容方法
    5. final Node[] helpTransfer(Node[] tab, Node f) {
    6. Node[] nextTab; int sc;
    7. //如果当前table不为null 且 f为ForwardingNode节点 且新的table即nextTable存在的情况下才能协助扩容,该方法的作用是让线程参与扩容的复制。
    8. if (tab != null && (f instanceof ForwardingNode) &&
    9. (nextTab = ((ForwardingNode)f).nextTable) != null) {
    10. int rs = resizeStamp(tab.length);
    11. while (nextTab == nextTable && table == tab &&
    12. (sc = sizeCtl) < 0) { //如果小于0说明已经有线程在进行扩容操作了
    13. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    14. sc == rs + MAX_RESIZERS || transferIndex <= 0)
    15. break;
    16. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //更新sizeCtl的值,+1,代表新增一个线程参与扩容
    17. transfer(tab, nextTab);
    18. break;
    19. }
    20. }
    21. return nextTab;
    22. }
    23. return table;
    24. }

    transfer()方法为ConcurrentHashMap扩容操作的核心方法。JDK1.8中,ConcurrentHashMap最复杂的部分就是扩容/数据迁移,涉及多线程的合作和rehash。我们先来考虑下一般情况下,如何对一个Hash表进行扩容。

    Hash表的扩容,一般都包含两个步骤:

    ①table数组的扩容
    table数组的扩容,一般就是新建一个2倍大小的槽数组,这个过程通过由一个单线程完成,且不允许出现并发。

    ②数据迁移
    所谓数据迁移,就是把旧table中的各个槽中的结点重新分配到新table中*。

    这一过程通常涉及到槽中key的rehash(重新Hash),因为key映射到桶的位置与table的大小有关,新table的大小变了,key映射的位置一般也会变化。

    ConcurrentHashMap在处理rehash的时候,并不会重新计算每个key的hash值,而是利用了一种很巧妙的方法。我们在上一篇说过,ConcurrentHashMap内部的table数组的大小必须为2的幂次,原因是让key均匀分布,减少冲突,这只是其中一个原因。另一个原因就是:

    当table数组的大小为2的幂次时,通过key.hash & table.length-1这种方式计算出的索引i,当table扩容后(2倍),新的索引要么在原来的位置i,要么是i+n

    而且还有一个特点,扩容后key对应的索引如果发生了变化,那么其变化后的索引最高位一定是1 

    什么时候会扩容?

    当往hashMap中成功插入一个key/value节点时,有两种情况可能触发扩容动作:
    1、如果新增节点之后,所在链表的元素个数达到了阈值 8,则会调用treeifyBin方法把链表转换成红黑树,不过在结构转换之前,会对数组长度进行判断,实现如下:如果数组长度n小于阈值MIN_TREEIFY_CAPACITY,默认是64,则会调用tryPresize方法把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。

    2、调用put方法新增节点时,在结尾会调用addCount方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达到阈值时,会触发transfer方法,重新调整节点的位置。

    扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点会触发扩容 。

    putAll 批量插入或者插入节点后发现存在链表长度达到 8 个或以上,但数组长度为 64 以下时会触发扩容 。

    注意:桶上链表长度达到 8 个或者以上,并且数组长度为 64 以下时只会触发扩容而不会将链表转为红黑树 。


    扩容原理 

    1. /**
    2. * 数据转移和扩容.
    3. * 每个调用tranfer的线程会对当前旧table中[transferIndex-stride, transferIndex-1]位置的结点进行迁移
    4. *
    5. * @param tab 旧table数组
    6. * @param nextTab 新table数组
    7. */
    8. private final void transfer(Node[] tab, Node[] nextTab) {
    9. int n = tab.length, stride;
    10. //根据服务器CPU数量来决定每个线程负责的bucket桶数量,避免因为扩容的线程过多反而影响性能。
    11. //如果CPU数量为1,则stride=1,否则将需要迁移的bucket数量(table大小)除以CPU数量,平分给
    12. //各个线程,但是如果每个线程负责的bucket数量小于限制的最小是(16)的话,则强制给每个线程
    13. //分配16个bucket数。
    14. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    15. stride = MIN_TRANSFER_STRIDE; // subdivide range
    16. //如果nextTable还未初始化,则初始化nextTable,这个初始化和iniTable初始化一样,只能由
    17. //一个线程完成。
    18. if (nextTab == null) { // nextTab为null,那么就初始化它
    19. try {
    20. @SuppressWarnings("unchecked")
    21. Node[] nt = (Node[])new Node[n << 1];
    22. nextTab = nt;
    23. } catch (Throwable ex) { // try to cope with OOME
    24. sizeCtl = Integer.MAX_VALUE;
    25. return;
    26. }
    27. nextTable = nextTab;
    28. transferIndex = n; //[transferIndex-stride, transferIndex-1]表示当前线程要进行数据迁移的桶区间
    29. }
    30. int nextn = nextTab.length;
    31. // ForwardingNode结点,当旧table的某个桶中的所有结点都迁移完后,用该结点占据这个桶
    32. ForwardingNode fwd = new ForwardingNode(nextTab);
    33. // 标识一个桶的迁移工作是否完成,advance == true 表示可以进行下一个位置的迁移
    34. boolean advance = true;
    35. //循环的关键变量,最后一个数据迁移的线程将该值置为true,并进行本轮扩容的收尾工作
    36. boolean finishing = false;
    37. //下个循环是分配任务和控制当前线程的任务进度,这部分是transfer()的核心逻辑,描述了如何与其他线程协同工作。
    38. for (int i = 0, bound = 0;;) { //进入循环
    39. Node f; int fh;
    40. while (advance) {
    41. int nextIndex, nextBound;
    42. if (--i >= bound || finishing)
    43. advance = false;
    44. else if ((nextIndex = transferIndex) <= 0) {
    45. i = -1;
    46. advance = false;
    47. }
    48. else if (U.compareAndSwapInt
    49. (this, TRANSFERINDEX, nextIndex,
    50. nextBound = (nextIndex > stride ?
    51. nextIndex - stride : 0))) {
    52. bound = nextBound;
    53. i = nextIndex - 1;
    54. advance = false;
    55. }
    56. }
    57. //i<0说明已经遍历完旧的数组tab;i>=n什么时候有可能呢?在下面看到i=n,所以目前i最大应该是n吧。
    58. //i+n>=nextn,nextn=nextTab.length,所以如果满足i+n>=nextn说明已经扩容完成
    59. if (i < 0 || i >= n || i + n >= nextn) {
    60. int sc;
    61. if (finishing) {
    62. nextTable = null;
    63. table = nextTab;
    64. sizeCtl = (n << 1) - (n >>> 1);
    65. return;
    66. }
    67. //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作,参考sizeCtl的注释
    68. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    69. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
    70. return;
    71. finishing = advance = true;//finishing和advance保证线程已经扩容完成了可以退出循环
    72. i = n; //先退出,重新检查遍
    73. }
    74. }
    75. else if ((f = tabAt(tab, i)) == null)//如果tab[i]为null,那么就把fwd插入到tab[i],表明这个节点已经处理过了
    76. advance = casTabAt(tab, i, null, fwd);
    77. else if ((fh = f.hash) == MOVED)//那么如果f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了
    78. advance = true; // already processed
    79. //迁移过程(对当前指向的bucket),这部分的逻辑与HashMap类似,拿旧数组的容量当做一
    80. //个掩码,然后与节点的hash进行与&操作,可以得出该节点的新增有效位,如果新增有效位为
    81. //0就放入一个链表A,如果为1就放入另一个链表B,链表A在新数组中的位置不变(跟在旧数
    82. //组的索引一致),链表B在新数组中的位置为原索引加上旧数组容量。
    83. else {
    84. synchronized (f) {
    85. if (tabAt(tab, i) == f) {
    86. Node ln, hn;
    87. if (fh >= 0) {// 桶的hash>=0,说明是链表迁移
    88. /**
    89. * 下面的过程会将旧桶中的链表分成两部分:ln链和hn链
    90. * ln链会插入到新table的槽i中,hn链会插入到新table的槽i+n中
    91. */
    92. int runBit = fh & n;
    93. Node lastRun = f;
    94. for (Node p = f.next; p != null; p = p.next) {
    95. int b = p.hash & n;
    96. if (b != runBit) {
    97. runBit = b;
    98. lastRun = p;
    99. }
    100. }
    101. if (runBit == 0) {
    102. ln = lastRun;
    103. hn = null;
    104. }
    105. else {
    106. hn = lastRun;
    107. ln = null;
    108. }
    109. for (Node p = f; p != lastRun; p = p.next) {
    110. int ph = p.hash; K pk = p.key; V pv = p.val;
    111. if ((ph & n) == 0)
    112. ln = new Node(ph, pk, pv, ln);
    113. else
    114. hn = new Node(ph, pk, pv, hn);
    115. }
    116. //
    117. setTabAt(nextTab, i, ln);
    118. setTabAt(nextTab, i + n, hn);
    119. setTabAt(tab, i, fwd);//把已经替换的节点的旧tab的i的位置用fwd结点替换,fwd包含nextTab
    120. advance = true;
    121. }
    122. //不是链表,那就是红黑树。下面红黑树基本和链表差不多
    123. else if (f instanceof TreeBin) {
    124. TreeBin t = (TreeBin)f;
    125. TreeNode lo = null, loTail = null;
    126. TreeNode hi = null, hiTail = null;
    127. int lc = 0, hc = 0;
    128. for (Node e = t.first; e != null; e = e.next) {
    129. int h = e.hash;
    130. TreeNode p = new TreeNode
    131. (h, e.key, e.val, null, null);
    132. if ((h & n) == 0) {
    133. if ((p.prev = loTail) == null)
    134. lo = p;
    135. else
    136. loTail.next = p;
    137. loTail = p;
    138. ++lc;
    139. }
    140. else {
    141. if ((p.prev = hiTail) == null)
    142. hi = p;
    143. else
    144. hiTail.next = p;
    145. hiTail = p;
    146. ++hc;
    147. }
    148. }
    149. //判断扩容后是否还需要红黑树结构
    150. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
    151. (hc != 0) ? new TreeBin(lo) : t;
    152. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
    153. (lc != 0) ? new TreeBin(hi) : t;
    154. setTabAt(nextTab, i, ln);
    155. setTabAt(nextTab, i + n, hn);
    156. setTabAt(tab, i, fwd); // 设置ForwardingNode占位
    157. advance = true; // 表示当前旧桶的结点已迁移完毕
    158. }
    159. }
    160. }
    161. }
    162. }
    163. }

    tranfer方法的开头,会计算出一个stride变量的值,这个stride其实就是每个线程处理的桶区间,也就是步长:

    1. // stride可理解成“步长”,即数据迁移时,每个线程要负责旧table中的多少个桶
    2. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    3.     stride = MIN_TRANSFER_STRIDE;

    首次扩容时,会将table数组变成原来的2倍。
    注意上面的transferIndex变量,这是一个字段,table[transferIndex-stride, transferIndex-1]就是当前线程要进行数据迁移的桶区间:

    1. /**
    2. * 扩容时需要用到的一个下标变量.
    3. */
    4. private transient volatile int transferIndex;

    整个transfer方法几乎都在一个自旋操作中完成,从右往左开始进行数据迁移,transfer的退出点是当某个线程处理完最后的table区段——table[0,stride-1]

    transfer方法主要包含4个分支,即对4种不同情况进行处理:  

    首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:

    1. 如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
    2. 如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上
    3. 如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
    4. 遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍,完成扩容。

    多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。

    总结一下扩容条件:  

    (1) 元素个数达到扩容阈值。

    (2) 调用 putAll 方法,但目前容量不足以存放所有元素时。

    (3) 某条链表长度达到8,但数组长度却小于64时。

    CPU核数与迁移任务hash桶数量分配(步长)的关系

    单线程下线程的任务分配与迁移操作 

       

     多线程如何分配任务?

     get操作

    get方法无疑是在HahsMap还是在ConcurrentHashMap中最好理解的方法了。
    返回指定键映射到的值,如果此映射不包含该键的映射,则返回null 。

    和HashMap一样用equals来判断是否相等,也就是说必须Hash索引和equals一样,才算相同。

    1. public V get(Object key) {
    2. Node[] tab; Node e, p; int n, eh; K ek;
    3. int h = spread(key.hashCode());//运用key的hashCode()计算出哈希地址
    4. //table不为空 且 table长度大于0 且 计算出的下标上用tabAt CAS取值不为空,也就是这个位置上有人,那么就进去找。
    5. if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
    6. //如果哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点,就不用往下找了。
    7. if ((eh = e.hash) == h) {
    8. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
    9. return e.val;
    10. }
    11. //eh=-2ash值<0 可能是Tree树形,可能e节点为ForwardingNode结点。
    12. //如果eh=-1就说明e节点为ForwardingNode,这说明什么,说明这个节点已经不存在了,被另一个线程正则扩容所以要查找key对应的值的话,直接到新newtable找
    13. //如果是eh=-2 说明e节点为Tree根结点,那么后面就以红黑树的方式查找。
    14. else if (eh < 0)
    15. return (p = e.find(h, key)) != null ? p.val : null;//eh=-1这里调用的find其实就是ForwardingNode中的find方法了。
    16. //排除完上面情况,咱就正常找
    17. while ((e = e.next) != null) {
    18. if (e.hash == h &&
    19. ((ek = e.key) == key || (ek != null && key.equals(ek))))
    20. return e.val;
    21. }
    22. }
    23. return null;
    24. }

    总结流程:

    1. 调用spread()方法计算key的hashCode()获得哈希地址。
    2. 计算出键key所在的下标,算法是(n - 1) & h,如果table不为空,且下标上的bucket不为空,则到bucket中查找。
    3. 如果bucket的头节点的哈希地址小于0,有可能e结点是树的根节点,那么就按照红黑树的方式查找。还有可能说明e节点可能为ForwardingNode,这说明什么,说明这个节点已经不存在了,被另一个线程正则扩容所以要查找key对应的值的话,直接到新newtable找。
    4. 找到则返回该键key的值,找不到则返回null。

    这里有个疑问,问什么put的时候加锁了,get读的时候不加锁

    这要归功于使用的tabAt中的了Unsafe的getObjectVolatile,因为table是volatile类型,所以对tab[i]的原子请求也是可见的。因为如果同步正确的情况下,根据happens-before原则,对volatile域的写入操作happens-before于每一个后续对同一域的读操作。所以不管其他线程对table链表或树的修改,都对get读取可见。

    总结与思考

    其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树,相对而言,总结如下思考

    • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
    • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
    • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档

    还有以下几个可能遇到的问题,进行回答:

     1.扩容期间在未迁移到的hash桶插入数据会发生什么?
    答:只要插入的位置扩容线程还未迁移到,就可以插入,当迁移到该插入的位置时,就会阻塞等待插入操作完成再继续迁移 。

    2.正在迁移的hash桶遇到 get 操作会发生什么?
    答:在扩容过程期间形成的 hn 和 ln链 是使用的类似于复制引用的方式,也就是说 ln 和 hn 链是复制出来的,而非原来的链表迁移过去的,所以原来 hash 桶上的链表并没有受到影响,因此如果当前节点有数据,还没迁移完成,此时不影响读,能够正常进行。如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时get线程会帮助扩容。

    3.正在迁移的hash桶遇到 put/remove 操作会发生什么?
    如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时写线程会帮助扩容,如果扩容没有完成,当前链表的头节点会被锁住,所以写线程会被阻塞,直到扩容完成。

    4.并发情况下,各线程中的数据可能不是最新的,那为什么 get 方法不需要加锁?

    答:get操作全程不需要加锁是因为Node的成员val是用volatile修饰的,在多线程环境下线程A修改结点的val或者新增节点的时候是对线程B可见的。

  • 相关阅读:
    whereIn 遇到了最大限制,临时表处理方式
    软件项目管理 4.2.传统需求建模方法
    git项目如何打patch以及打patch的注意事项
    namonamo Daimayuan Online Judge
    【好书分享第十三期】AI数据处理实战108招:ChatGPT+Excel+VBA
    Mybatis-Plus之使用LocalDateTime等java8新日期时间类型报错
    概率论 第三章习题课
    剑指 Offer 27. 二叉树的镜像
    Django 做migrations时出错,解决方案
    swift UI 和UIKIT 如何配合使用
  • 原文地址:https://blog.csdn.net/qq_51901495/article/details/126320698