• Java 并发编程之ConcurrentHashMap源码详解


    Java 并发编程之ConcurrentHashMap原理详解

    原理剖析

    HashMap通常的实现方式是“数组+链表”,这种方式被称为“拉链法”。 ConcurrentHashMap在这个基本原理之上进行了各种优化。

    ⾸先是所有数据都放在⼀个⼤的HashMap中,其次是引入了红黑树。

    其原理如下图所示:

    在这里插入图片描述

    如果头节点是Node类型,则尾随它的就是⼀个普通的链表。如果头节点是TreeNode类型,它的后面就是⼀颗红黑树, TreeNode是Node的子类。

    链表和红黑树之间可以相互转换:初始的时候是链表,当链表中的元素超过某个阈值时,把链表转换成红黑树。反之,当红黑树中的元素个数小于某个阈值时,再转换为链表。

    那为什么要做这种设计呢?

    1. 使⽤红黑树,当⼀个槽里有很多元素时,其查询和更新速度会比链表快很多, Hash冲突的问题由此得到较好的解决。
    2. 加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是Node数组的⻓度,初始长度为16。
    3. 并发扩容,这是难度最大的。当⼀个线程要扩容Node数组的时候,其他线程还要读写,因此处理过程很复杂,后面会详细分析。

    总结

    由上述对比可以总结出来:这种设计⼀方面降低了Hash冲突,另⼀方面也提升了并发度。

    下面从构造方法开始,一步步深入分析其实现过程

    源码剖析

    一、构造方法分析

    /**
         * Creates a new, empty map with an initial table size based on
         * the given number of elements ({@code initialCapacity}), table
         * density ({@code loadFactor}), and number of concurrently
         * updating threads ({@code concurrencyLevel}).
         *
         * @param initialCapacity the initial capacity. The implementation
         * performs internal sizing to accommodate this many elements,
         * given the specified load factor.
         * @param loadFactor the load factor (table density) for
         * establishing the initial table size
         * @param concurrencyLevel the estimated number of concurrently
         * updating threads. The implementation may use this value as
         * a sizing hint.
         * @throws IllegalArgumentException if the initial capacity is
         * negative or the load factor or concurrencyLevel are
         * nonpositive
         */
        public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                // 如果设置的初始化容量小于预估的并发更新线程数,则初始容量设为该预估的并发线程数
                initialCapacity = concurrencyLevel;   // as estimated threads
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            // 表初始化和调整大小控件
            // 在表初始化方法中,用于CAS操作控制并发线程的初始化过程:如果为-1,则表正在初始化或调整大小
            this.sizeCtl = cap;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在上面的代码中,变量cap就是Node数组的长度,保持为2的整数次方。 tableSizeFor()方法是根据传入的初始容量,计算出⼀个合适的数组长度。具体而言: 1.5倍的初始容量+1,再往上取最接近的2的整数次方,作为数组长度cap的初始值。

    这里的 sizeCtl,其含义是用于控制在初始化或者并发扩容时候的线程数,只不过其初始值设置成cap。

    二、初始化

    在上面的构造方法里只计算了数组的初始⼤小,并没有对数组进行初始化。当多个线程都往里面放入元素的时候,再进行初始化。这就存在⼀个问题:多个线程重复初始化。下面看⼀下是如何处理的。

    /**
         * Initializes table, using the size recorded in sizeCtl.
         */
        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                // 跟下面else中的cas操作对应
                // 如果sizeCtl被修改为-1,说明此时有线程在初始化操作,因此当前线程自悬等待
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                // 利用CAS操作来解决多线程下的初始化操作
                // CAS操作成功的线程会把sizeCtl的值置为-1,从而使其他线程yield等待
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            // Node数组初始化,初始容量为n
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            // 初始化成功后,sizeCtl=n-(n>>>2)=0.75n
                            // 表示下一次扩容的阈值:n-n/4
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        // sizeCtl赋新值,为上面计算出的下一次扩容的阈值
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    通过上面的代码可以看到,多个线程的竞争是通过对sizeCtl进行CAS操作实现的。如果某个线程成功地把 sizeCtl设置为-1,它就拥有了初始化的权利,进⼊初始化的代码模块,等到初始化完成,再把sizeCtl设置回去。其他线程则⼀直执行while循环,自旋等待,直到数组不为null,即当初始化结束时,退出整个方法。

    因为初始化的⼯作量很小,所以此处选择的策略是让其他线程⼀直等待,而没有帮助其初始化。

    三、put()实现分析

    /** Implementation for put and putIfAbsent */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                // 分支1,整个数组初始化
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                // 分支2:第i个元素初始化
                // 这里调用tabAt方法获取到Node数组第i个位置下的值,如果为空,则初始化当前插入元素插入到数组第i个索引位置
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    // 利用CAS方法尝试插入,如果CAS操作成功,说明当前元素已经插入到第i个索引位置,则break退出。
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                // 分支3:尝试扩容
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                // 分支4:插入元素到链尾,或红黑树中
                else {
                    V oldVal = null;
                    // 重点,利用synchronized关键字加锁。
                    // 同时,可以看到,锁的粒度是Node数组中节点的粒度,也就是只锁数组中每个Node节点,其他Node节点仍然可以访问
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    // 这里是更新的旧值的功能
                                    // 在插入的过程中,如果发现插入元素与已有元素相同,则用新的value值替换旧的value值
                                    // 这里的相同指 当且仅当两个元素的hashcode相同,且key值相同
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            // 将插入的新value替换已有元素的旧value
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    // 这里e=e.next来遍历某一Node节点下的链表/红黑树
                                    if ((e = e.next) == null) {
                                        // 如果走到链尾,则将新元素插入到链尾
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            // 当fh<0时,表示当前节点下时红黑树
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                // 将元素插入红黑树中,同样的,如果树中已有当前元素,则返回对应TreeNode元素对象,否则,返回null
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        // 如果已有元素,则更新旧值
                                        p.val = value;
                                }
                            }
                        }
                    }
                    // binCount保存了节点是链表时的元素个数
                    if (binCount != 0) {
                        // 如果链表的元素个数大于设置的阈值=8,则会转成红黑树的操作
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            // 如果put过程是更新操作,会返回旧值
                            return oldVal;
                        break;
                    }
                }
            }
            // 容器中总元素个数+1(sizeCtl+1)
            addCount(1L, binCount);
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    其中调用的几个方法为:

    	// 其中h是key的hashCode()得到的
    	// h ^ (h >>> 16)就是让高16位与低16位进行异或,目的是使得计算出来的哈希值更加散列。
    	// & HASH_BITS(二进制是31个1)就是为了让最终得到的哈希值始终是一个正数。
        static final int spread(int h) {
            return (h ^ (h >>> 16)) & HASH_BITS;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    关键步骤分析:

    • synchronized (f):通过源码分析知道,ConcurrentHashMap并发的同步锁是加在Node数组中每个索引位置的节点上的,意味着每个数组元素有一把锁,并发度等于数组的长度

    • f = tabAt(tab, i = (n - 1) & hash这里为了获取插入元素在Node数组中的索引位置,不是用的hash%n,而是采用了(n-1) & hash的方式。上面在对原理进行剖析时分析过,hashmap类的容器大小都是2的整数倍,因此(n-1)相当于除了高位为0外,其余位都为1。由于n=2^m,hashcode / n 相当于右移m位,而根据二进制求余的方法可知,这右移的m位就是余数。那么问题就在于如何获取这m位。由于n是2的整数幂,则n-1用二进制表示的位数就等于m,比如:n=8=2^3, 则n-1=7=0111n=16=2^4,则n-1=15=0 1111。可以发现1的位数即等于m的幂数。这里的原因就在n是2的整数幂,所以n的二进制表示的1的位置,在m+1的位置,因此用n-1=m就可以获取到该取余所需的位数。

    • put()方法过程中,如果插入的元素是ConcurrentHashMap中已有的,则会更新现在的值。这里判断相同的方法是:当且仅当两个元素的hashcode相同 且 key相同。

    • binCount字段保存了节点是链表时的元素个数。在插入的过程中,binCount字段会对链表的长度计数,如果链表的元素个数大于设置的阈值=8,则会转成红黑树的操作

    四、扩容

    扩容的实现是最复杂的,下面从treeifyBin()的源码方法讲起

    /**
         * Replaces all linked nodes in bin at given index unless table is
         * too small, in which case resizes instead.
         */
        private final void treeifyBin(Node<K,V>[] tab, int index) {
            Node<K,V> b; int n, sc;
            if (tab != null) {
                if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                    // 数组长度小于阈值64,不做红黑树转换,直接扩容
                    tryPresize(n << 1);
                else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                    // 链表转换为红黑树
                    synchronized (b) {
                        if (tabAt(tab, index) == b) {
                            TreeNode<K,V> hd = null, tl = null;
                            // 遍历链表,初始化红黑树
                            for (Node<K,V> e = b; e != null; e = e.next) {
                                TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                                      null, null);
                                if ((p.prev = tl) == null)
                                    hd = p;
                                else
                                    tl.next = p;
                                tl = p;
                            }
                            setTabAt(tab, index, new TreeBin<K,V>(hd));
                        }
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在上面的代码中, MIN_TREEIFY_CAPACITY=64,意味着当数组的长度没有超过64的时候,数组的每个节点里都是链表,只会扩容,不会转换成红⿊树。只有当数组长度大于或等于64时(static final int MIN_TREEIFY_CAPACITY = 64;),才考虑把链表转换成红黑树。

    上面方法在扩容时,调用了tryPresize方法,来进行扩容,接下来看一下这个方法内部实现:

    private final void tryPresize(int size) {
            int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                tableSizeFor(size + (size >>> 1) + 1);
            int sc;
            while ((sc = sizeCtl) >= 0) {
                Node<K,V>[] tab = table; int n;
                if (tab == null || (n = tab.length) == 0) {
                    n = (sc > c) ? sc : c;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                        try {
                            if (table == tab) {
                                @SuppressWarnings("unchecked")
                                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                                table = nt;
                                sc = n - (n >>> 2);
                            }
                        } finally {
                            sizeCtl = sc;
                        }
                    }
                }
                else if (c <= sc || n >= MAXIMUM_CAPACITY)
                    break;
                else if (tab == table) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        Node<K,V>[] nt;
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    tryPresize方法中最重要的一个方法就是transfer方法,也是扩容的主要方法:

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                // 计算步长
                stride = MIN_TRANSFER_STRIDE; // subdivide range
        	// 初始化新的HashMap
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    // 扩容两倍
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                // 初始的transferIndex为旧的HashMap的数组长度
                transferIndex = n;
            }
            int nextn = nextTab.length;
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            boolean advance = true;
            boolean finishing = false; // to ensure sweep before committing nextTab
            // 此处,i为遍历下标,bound为边界。
        	// 如果成功获取一个任务,则i=nextIndex-1
        	// bound = nextIndex-stride
        	// 如果获取不到,则i=0, bound=0
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                // advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续
                while (advance) {
                    int nextIndex, nextBound;
                    // 以下是哪个分支中的advance都是false,表示如果三个分支都不执行,才可以一直while循环
                    // 目的在于当对transferIndex执行CAS操作不成功的时候,需要自旋,以期获取一个stride的迁移任务
                    if (--i >= bound || finishing)
                        // 对数组遍历,通过这里的--i进行。如果成功执行了--i,就不需要继续while循环了,因为advance只能进一步。
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {
                        // transferIndex <= 0,整个HashMap完成
                        i = -1;
                        advance = false;
                    }
                    // 对transferIndex执行CAS操作,即为当前线程分配1个stride
                    // CAS操作成功,线程成功获取到一个stride的迁移任务
                    // CAS操作不成功,线程没有抢到任务,会继续执行while循环,自旋。
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                // i越界,整个HashMap遍历完成
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    // finishing表示整个hashMap扩容完成
                    if (finishing) {
                        nextTable = null;
                        // 将nextTab赋值给当前table
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
                // tab[i]迁移完毕,赋值一个ForwardingNode
                else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
                // tab[i]的位置已经在迁移过程中
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {
                    // 对tab[i]进⾏迁移操作, tab[i]可能是⼀个链表或者红⿊树
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            // 链表
                            if (fh >= 0) {
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        // 表示lastRun之后的所有元素, hash值都是⼀样的
                                        // 记录下这个最后的位置
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    // 链表迁移的优化做法
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            // 红⿊树,迁移做法和链表类似
                            else if (f instanceof TreeBin) {
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159

    该⽅法非常复杂,下面⼀步步分析:

    1. 扩容的基本原理如下图,⾸先建⼀个新的HashMap,其数组长度是旧数组长度的2倍,然后把旧的元素逐个迁移过来。所以,上面的方法参数有2个,第1个参数tab是扩容之前的HashMap,第2个参数nextTab是扩容之后的HashMap。当nextTab=null的时候,⽅法最初会对nextTab进行初始化。这里有⼀个关键点要说明:该⽅法会被多个线程调⽤,所以每个线程只是扩容旧的HashMap部分,这就涉及如何划分任务的问题

    在这里插入图片描述

    1. 上图为多个线程并行扩容-任务划分示意图。旧数组的⻓度是N,每个线程扩容⼀段,⼀段的长度用变量stride(步长)来表示, transferIndex表示了整个数组扩容的进度。

      stride的计算公式如上面的代码所示,即:在单核模式下直接等于n,因为在单核模式下没有办法多个线程并⾏扩容,只需要1个线程来扩容整个数组;在多核模式下为 (n>>> 3) /NCPU,并且保证步⻓的最⼩值是16。显然,需要的线程个数约为n/stride。

      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                  stride = MIN_TRANSFER_STRIDE; // subdivide range
      
      • 1
      • 2

      transferIndex是ConcurrentHashMap的⼀个成员变量,记录了扩容的进度。初始值为n,从大到小扩容,每次减stride个位置,最终减至n< =0,表示整个扩容完成。因此,从[0,transferIndex-1]的位置表示还没有分配到线程扩容的部分,从[transfexIndex, n-1]的位置表示已经分配给某个线程进行扩容,当前正在扩容中,或者已经扩容成功。因为transferIndex会被多个线程并发修改,每次减stride,所以需要通过CAS进⾏操作,如下面代码所示 :

      else if (U.compareAndSwapInt
               (this, TRANSFERINDEX, nextIndex,
                nextBound = (nextIndex > stride ?
                nextIndex - stride : 0))) {
      
      • 1
      • 2
      • 3
      • 4

    在这里插入图片描述

    1. 在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的HashMap里面,有的还在旧的 HashMap里面。这个时候,所有调用get(k, v)的线程还是会访问旧 HashMap,怎么处理呢?

      下图为扩容过程中的转发示意图:当Node[0]已经迁移成功,而其他Node还在迁移过程中时,如果有线程要读取Node[0]的数据,就会访问失败。为此,新建⼀个ForwardingNode,即转发节点,在这个节点⾥⾯记录的是新的 ConcurrentHashMap 的引⽤。这样,当线程访问到ForwardingNode之后,会去查询新的ConcurrentHashMap。

    2. 因为数组的⻓度 tab.length 是2的整数次方,每次扩容又是2倍。而Hash 函数是hashCode%tab.length,等价于hashCode&(tab.length-1)。这意味着:处于第i个位置的元素,在新的Hash表的数组中⼀定处于
      第i个或者第i+n个位置,如下图所示。举个简单的例⼦:假设数组长度是8,扩容之后是16:
      若hashCode=5, 5%8=0,扩容后, 5%16=0,位置保持不变;
      若hashCode=24, 24%8=0,扩容后, 24%16=8,后移8个位置;
      若hashCode=25, 25%8=1,扩容后, 25%16=9,后移8个位置;
      若hashCode=39, 39%8=7,扩容后, 39%8=7,位置保持不变;

    在这里插入图片描述

    正因为有这样的规律,所以如下有代码:

    	setTabAt(nextTab, i, ln);
    	setTabAt(nextTab, i + n, hn);
    	setTabAt(tab, i, fwd);
    	advance = true;
    
    • 1
    • 2
    • 3
    • 4

    也就是把tab[i]位置的链表或红黑树重新组装成两部分,⼀部分链接到nextTab[i]的位置,⼀部分链接到nextTab[i+n]的位置,如上图所示。然后把tab[i]的位置指向⼀个ForwardingNode节点。

    同时,当tab[i]后面是链表时,使⽤类似于JDK 8中在扩容时的优化方法,从lastRun往后的所有节点,不需依次拷贝,而是直接链接到新的链表头部。从lastRun往前的所有节点,需要依次拷贝。

    了解了核心的迁移函数transfer(tab, nextTab),再回头看tryPresize(int size)函数。这个函数的输入是整个Hash表的元素个数,在函数里面,根据需要对整个Hash表进行扩容。想要看明白这个函数,需要透彻地理解sizeCtl变量,下面这段注释摘自源码。

    /**
         * Table initialization and resizing control.  When negative, the
         * table is being initialized or resized: -1 for initialization,
         * else -(1 + the number of active resizing threads).  Otherwise,
         * when table is null, holds the initial table size to use upon
         * creation, or 0 for default. After initialization, holds the
         * next element count value upon which to resize the table.
         */
        private transient volatile int sizeCtl;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    当sizeCtl=-1时,表示整个HashMap正在初始化;
    当sizeCtl=某个其他负数时,表示多个线程在对HashMap做并发扩容;
    当sizeCtl=cap时, tab=null,表示未初始之前的初始容量(如上面的构造函数所示);
    扩容成功之后, sizeCtl存储的是下⼀次要扩容的阈值,即上面初始化代码中的n-(n>>> 2) =0.75n。
    所以, sizeCtl变量在Hash表处于不同状态时,表达不同的含义。明白了这个道理,再来看上面的tryPresize(int size)函数

    tryPresize(int size)是根据期望的元素个数对整个Hash表进行扩容,核心是调用transfer函数。在第⼀次扩容的时候, sizeCtl会被设置成⼀个很大的负数U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) +2)。之后每⼀个线程扩容的时候, sizeCtl 就加 1, U.compareAndSwapInt(this, SIZECTL, sc, sc+1),待扩容完成之后, sizeCtl减1

  • 相关阅读:
    art-template系列之前端页面使用
    C++——虚函数、虚析构函数、纯虚函数、抽象类
    element-ui使用el-date-picker日期组件常见场景
    【SQL屠夫系列】leetcode-176. 第二高的薪水
    电脑合并盘符
    【Linux网络编程】日志与守护进程
    ctfshow 萌新赛 给她
    2023年03月 Scratch(一级)真题解析#中国电子学会#全国青少年软件编程等级考试
    zdppy_amauth如何测试给用户批量绑定角色接口
    小白也能看懂的缓存雪崩、穿透、击穿
  • 原文地址:https://blog.csdn.net/Urbanears/article/details/128121012