• JDK8中ConcurrentHashMap底层源码解析-扩容机制



    前言

    在面试中ConcurrentHashMap的扩容机制也是经常会被问到的问题。ConcurrentHashMap的扩容机制比较复杂,在JDK8中加入了多线程扩容来改进效率,源码也是非常的长。本篇文章就来简单的解析一下ConcurrentHashMap扩容的底层源码,供大家参考学习。


    一、transfer方法

    我们知道如果添加完一个元素,集合的长度超过了阈值,就会触发扩容,在JDK8的ConcurrentHashMap中,扩容的方法是transfer,transfer方法源码非常的长,我们不妨先设想一下这个方法要做什么事情,下面用一张流程图展示transfer方法的整个流程:
    在这里插入图片描述

    下面是transfer方法的源码解析,非常的长,但是已经尽可能的把注释给写清楚了,如果这里看不懂可以先放一放,先继续往下看。

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //如果是扩容线程,此时新数组为null
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                //两倍扩容创建新数组
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //记录线程开始迁移的桶位,从后往前迁移
            transferIndex = n;
        }
        //记录新数组的末尾
        int nextn = nextTab.length;
        //已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 推进标记
        boolean advance = true;
        // 完成标记,在提交下一个选项卡之前确保扫描
        boolean finishing = false; // to ensure sweep before committing nextTab
        // i 表示当前线程的任务已经干到哪里了,bound 表示分配给当前线程任务的下界限制
        // 自旋,非常长的一个for循环
        for (int i = 0, bound = 0;;) {
           // f 桶位的头结点   fh 头结点的hash
            Node<K,V> f; int fh;
            /**
             * while循环的任务如下:(循环的条件为推进标记advance为true)
             * 1.给当前线程分配任务区间
             * 2.维护当前线程任务进度(i 表示当前处理的桶位)
             * 3.维护ConcurrentHashMap全局扩容范围内的进度
             */
            while (advance) {
                int nextIndex, nextBound;
                //i记录当前正在迁移桶位的索引值
                //bound记录下一次任务迁移的开始桶位
                //--i >= bound 成立表示当前线程分配的迁移任务还没有完成
                if (--i >= bound || finishing)
                    advance = false;
                //没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //如果没有更多的需要迁移的桶位,就进入该if
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
    		   //扩容任务线程数减1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //判断当前所有扩容任务线程是否都执行完成
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //所有扩容线程都执行完,标识结束
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
            else if ((f = tabAt(tab, i)) == null)
                //casTabAt通过CAS的方式将tab数组下标为i的结点设置为FWD结点
                advance = casTabAt(tab, i, null, fwd);
            //当前节点已经被迁移
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                //锁定该桶并迁移元素:(synchronized 锁加在当前桶位的头结点),线程安全
                synchronized (f) {
                 // 再次判断当前桶第一个元素是否有修改,(可能出现其它线程抢先一步迁移/修改了元素)
                // 防止在你加锁头对象之前,当前桶位的头对象被其它写线程修改过,导致你目前加锁对象错误
                    if (tabAt(tab, i) == f) {
                //把一个链表拆分成两个链表(高位和低位),逻辑跟JDK8中跟hashmap的逻辑差不多,参考hashmap拆分链表
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                             // 低位链表的位置不变
                            setTabAt(nextTab, i, ln);
                             // 高位链表的位置是:原位置 + n
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                       // 表示当前桶位是红黑树,转移的逻辑跟JDK8中hashmap差不多
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                        //遍历红黑树(实质上也是在遍历链表,红⿊树是由链表改造⽽成),判断结点在高位还是低位,这样就能统计出哪些结点在低位,哪些结点在高位。
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 如果分化的树中元素个数小于等于6,则退化成链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            //低位树的位置不变
                            setTabAt(nextTab, i, ln);
                            // 高位树的位置是原位置加n
                            setTabAt(nextTab, i + n, hn);
                            // 标记该桶已迁移
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174

    我们可以看到ConcurrentHashMap扩容的逻辑跟hashmap扩容的逻辑有很多相似的地方,比如链表的转移和红黑树的转移大体都差不多,但是不同的是ConcurrentHashMap在转移元素的过程中是会对当前位置的第一个结点加锁的(synchronized ),这样能保证扩容时的安全。

    下面用一张图来演示一下数据迁移的过程:
    在这里插入图片描述
    小结一下transfer方法的过程:
    (1)新桶数组大小是旧桶数组的两倍;
    (2)迁移元素先从末尾开始;
    (3)迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;
    (4)迁移时根据hash&n把桶中元素分化成两个链表或树;
    (5)低位链表(树)存储在原来的位置;
    (6)高们链表(树)存储在原来的位置加n的位置;
    (7)迁移元素时会锁住当前桶的第一个结点,线程安全;


    二、JDK8多线程扩容效率改进

    ConcurrentHashMap是⽀持多个线程同时扩容的,这可以提高扩容效率。多线程协助扩容的操作会在两个地方被触发(这非常重要,这是理解多线程扩容的前提):

    当添加元素时,发现添加的元素对用的桶位为fwd节点,就会先去协助扩容,然后再添加元素

    当添加完元素后,判断当前元素个数达到了扩容阈值,此时发现sizeCtl的值小于0,并且新数组不为空,这个时候,会去协助扩容

    下面来看一下源码解析,第一种情况:当添加元素时,发现添加的元素对用的桶位为fwd节点,就会先去协助扩容,然后再添加元素

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //发现此处为fwd节点,协助扩容,扩容结束后,再循环回来添加元素
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            
            //省略代码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    transfer:协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    //扩容,传递一个不是null的nextTab
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    第二种情况:当添加完元素后,判断当前元素个数达到了扩容阈值,此时发现sizeCtl的值小于0,并且新数组不为空,这个时候,会去协助扩容

    private final void addCount(long x, int check) {
        //省略代码
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
      	    //元素个数达到扩容阈值
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //sizeCtl小于0,说明正在执行扩容,那么协助扩容
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    下面来用一张图演示一下多线程协助扩容的操作:
    在这里插入图片描述


    总结

    本章的难度较大,ConcurrentHashMap的源码难就难在它不仅长度长而且逻辑比较复杂,第一次看看不懂是很正常的事情。个人认为看源码还是很有必要的,这会让你对这个知识点有一个全新的认识,而不是局限在背八股文。


  • 相关阅读:
    Learned Index on GPU(ICDE2022)
    小程序vant-tabbar使用示例,及报错处理
    【OpenCV】-边缘检测汇总示例
    「PAT甲级真题解析」Advanced Level 1006 Sign In and Sign Out
    编译器-最优寄存器分配
    几个事件的问题
    金融行业安全事件频发,数据共享与安全如何平衡?
    STM32F103ZET6_ADC
    【从零开始游戏开发】Unity 前后端网络通信该如何搭建?注释解答 | 全面总结 |建议收藏
    CentOS 7 安装RabbitMQ
  • 原文地址:https://blog.csdn.net/qq_52173163/article/details/126109804