• Java - ConcurrentHashMap原理分析


    前言

    我在上一篇文章Java - HashMap原理分析中讲解了HashMap中主要的几个知识点。但是我们知道在高并发的情况下,HashMap并不是线性安全的。因此也就产生了与之对应的线性安全的Map,也就是ConcurrentHashMap

    一. ConcurrentHashMap 原理分析

    我之前讲过了HashMap的一个结构,ConcurrentHashMap 的架构和它一样,也是数组+链表+红黑树的结构。

    • HashMap 中:哈希桶中的对象是一个由Node节点组成的链表。或者是TreeNode节点组成的红黑树。
    • ConcurrentHashMap 中:在Node类型的节点基础上做了扩展:ForwardingNodeReservationNode

    因为ConcurrentHashMap仅仅通过加锁的方式是无法实现线性安全的。因此也衍生出不同的节点,其作用如下:

    • ForwardingNode:用于解决当进行扩容的时候,进行查询的问题。
    • ReservationNode:用于解决当进行计算时,计算的对象为空的问题。

    那么ConcurrentHashMap如何保证高并发的情况下的扩容和元素的插入呢?我们先从put函数开始看。

    1.1 几个重要的成员变量

    // 代表是ForwardingNode节点
    static final int MOVED     = -1; 
    // 红黑树节点
    static final int TREEBIN   = -2; 
    // ReservationNode节点
    static final int RESERVED  = -3; 
    // HASH_BITS为int最大值,最高位为0
    static final int HASH_BITS = 0x7fffffff;
    // 代表容器在初始化或者扩容时候的锁,必须拿到这个锁,才能够进行相关的操作。
    // 如果是负数,代表现在容器在初始化或者扩容阶段。 默认是0
    private transient volatile int sizeCtl;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    再来看下Node节点,这里和HashMap中的Node节点有几个不同:

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    我们可以发现,关于val还有next指针,都是经过volatile修饰的,说明线程间可见。

    1.2 put 方法的原理

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
    	// 首先,ConcurrentHashMap不允许存储null,无论是key还是value
        if (key == null || value == null) throw new NullPointerException();
        // 获取当前key对应的哈希值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果哈希槽数组为null或者长度为0,那么进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 否则,根据哈希值计算出对应的哈希槽下标,如果这个哈希槽是null,那么就直接添加一个节点。
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            	// HashMap中是直接赋值,而ConcurrentHashMap中则通过CAS来进行值的替换
            	// 替换成功则跳出循环,否则进行下一次循环
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果遇到了ForwardingNode节点(代表此时正在进行扩容操作),那么帮助程序进行扩容。
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 否则正常的情况下,即槽位非空,且不在扩容状态下
            else {
                V oldVal = null;
                // 那么对整个槽进行加锁
                synchronized (f) {
                	// 在判断一次,避免槽中的数据发生变化
                    if (tabAt(tab, i) == f) {
                    	// 如果是链表
                        if (fh >= 0) {
                            binCount = 1;
                            // 遍历链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 如果发现某一个节点key重复,那么覆盖
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                // 若不存在相同的key,则链表的末尾加上新节点
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果是红黑树,添加一个树节点
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 添加元素完毕后
                if (binCount != 0) {
                	// 如果槽中的元素数量 >= 8 ,那么转化为红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 添加一个成员个数,同时检查扩容操作
        addCount(1L, binCount);
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    从源码上来看,我们能够发现,ConcurrentHashMapHashMap 的元素插入操作流程是非常相似的。最大的几个不同点在于:

    • 如果槽是空的,那么添加元素的时候,ConcurrentHashMap 是通过 CAS操作来完成的。HashMap则是直接赋值。对比图如下:
      在这里插入图片描述
    1. 如果槽不是空的,在对槽中元素(链表/红黑树)进行遍历的时候。ConcurrentHashMap 有通过 synchronized 将整个Node节点给加锁(也就是所谓的分段锁)。HashMap则没有。
      在这里插入图片描述
    • 除此之外,ConcurrentHashMap 中还有 tab = helpTransfer(tab, f);这个操作。

    那么接下来我们就来对几个细节做讲解。

    1.2.1 初始化操作 initTable

    我们看下代码:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        	// 如果 sizeCtl 为-1,代表此时容器处于初始化或者扩容阶段。
            if ((sc = sizeCtl) < 0)
            	// 此时放弃CPU的使用权,让出时间片,线程进入就绪状态参与锁的竞争
                Thread.yield(); 
            // 通过原子操作来修改sizeCtl的值,即只允许某一个线程能够对该容器进行初始化,因为sizeCtl的默认值是0.此时需要将其改为-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                	// 双重校验,避免容器被多次
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        // 初始化哈希槽数组
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 计算出下次扩容的阈值,相当于n - 0.25n = 0.75。和HashMap一样的阈值
                        sc = n - (n >>> 2);
                    }
                } finally {
                	// 扩容完毕,将下一次扩容的阈值进行赋值。
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    总结下在高并发的情况下,是如何避免重复初始化的:

    1. 首先使用了sizeCtl这个变量。它是被volatile修饰的,因此能被多线程看到。防止容器被多次初始化或者扩容。
    2. 如果sizeCtl的值是负数,代表容器处于初始化或者扩容阶段。那么此时走到这里的线程都会让出CPU,进入就绪状态。
    3. 只有一个线程能够通过CAS操作,改变sizeCtl的值,默认是0,将其改为-1,那么其他线程看到的时候,就是第二步。扩容完毕,sizeCtl值成正数(当前容器的一个扩容阈值大小)。

    讲完这里,按照代码的顺序来看,就是要讲helpTransfer这个函数了。但是,根据我在代码中写的注释,只有识别到当前的哈希槽为ForwardingNode的时候,才会进行协助扩容。即:

    else if ((fh = f.hash) == MOVED)
       tab = helpTransfer(tab, f);
    
    • 1
    • 2

    那么根据在哪里?我们就应该先看看扩容操作:扩容和ForwardingNode有什么关联?

    1.2.2 计数操作 addCount

    我们来回顾下HashMap中,在添加完元素之后的操作:
    在这里插入图片描述
    ConcurrentHashMap 中:却只有短短的addCount()函数。
    在这里插入图片描述
    addCount其实整合了元素的统计和扩容操作,再讲源码之前,我想先说两个属性:

    private transient volatile long baseCount;
    private transient volatile CounterCell[] counterCells;
    
    • 1
    • 2
    • baseCount:ConcurrentHashMap中元素个数,但返回的不一定是当前容器的真实元素个数。基于CAS无锁更新。
    • CounterCell数组:一个计数盒子。一般在存在锁竞争的情况下,用它来统计元素个数。

    我们来看下它的源码:

    // 外界传入的参数是 1 和 binCount。 binCount指的是哈希槽中的元素个数。
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // (as = counterCells) != null 表示cells数组已经初始化,当前线程使用hash去寻址找到合适的cell,去累加数据
        // 或者 如果修改basecount失败了,即与其它线程发生了冲突,当前线程尝试去创建cells数组
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            // true代表没有竞争
            boolean uncontended = true;
            // 三种情况满足任意一种:
            // 1. cells数组未初始化,但是写basecount失败
            // 2. cells已经初始化了,但是哈希寻址的cell为null
            // 3.当前cell单元格不为null,去进行CAS累加操作,但是竞争失败
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 创建cells,进行数量的统计
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        // ...扩容部分
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    计数部分:

    • 哈希表中元素总个数+1。只不过这个值可能是通过baseCount或者CounterCell来计算得出的。
    • 那么为什么要用到两个变量呢?如果竞争不激烈的话,通过一个baseCount就可以解决。但是如果竞争激烈的话,比如10个线程去put数据,但是在计数的时候,由于只允许一个线程CAS操作成功,就会导致其他的9个线程自旋等待机会。CPU空转。
    • 因此如果将baseCount进行拆分,拆成多个CounterCell让每个线程绑定一个计数盒子,进行自身的元素统计。就可以进行分散的统计。

    如图:
    在这里插入图片描述

    1.2.3 扩容操作 transfer

    我们继续看addCount这个函数的第二部分:

    // 外界传入的参数是 1 和 binCount。 binCount指的是哈希槽中的元素个数。
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // ...计数部分
        // 扩容部分
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                // 如果sizeCtl小于0,即代表当前哈希槽处于扩容阶段。那么需要协助Map扩容
                if (sc < 0) {
                	// 判断当前线程是否可以协助扩容,不满足则直接跳出
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 若满足,再通过CAS进行竞争。协助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    扩容部分:

    1. 扩容的条件:元素个数超过了阈值(sizeCtl),并且数组长度小于最大限制。
    2. 判断当前容器是否在扩容阶段。即sizeCtl是否小于0。
    3. 如果正在扩容,那么还需要校验数据是否发生变化。若校验不通过,break
    4. 如果校验数据通过。那么sizeCtl + 1,表示多一个线程帮助容器扩容。

    校验逻辑:

    • (sc >>> RESIZE_STAMP_SHIFT) != rssc的低16 位不等于标识符,代表sizeCtl发生了变化。
    • sc == rs + 1代表扩容结束。因为第一个线程进行扩容的时候,sc ==rs 左移 16 位 + 2。在它结束扩容之后,sc会减1,那么此时sc = rs +1
    • sc == rs + MAX_RESIZERS代表帮助线程数已经达到了上限。
    • (nt = nextTable) == null扩容结束。
    • transferIndex <= 0转移状态发生了变化。 transferIndex 相当于扩容过程中的一个任务执行索引,代表扩容操作执行到哪里了。 这里也是扩容结束的意思。

    如果校验通过,才会进行扩容操作,我们看下核心的扩容函数transfer

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // 计算当前线程在当下扩容过程中,分配到的槽位个数。即步长,一般是16
        //目的是为了让每个CPU处理的哈希桶个数一样多,避免出现转移任务不均匀的现象。默认一个线程处理16个桶
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; 
        // 如果新容器是空,那么创建一个新的容器
        if (nextTab == null) {            
            try {
            	// 扩容两倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      
            	// 扩容失败
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            // 设置数据转移的开始下标,因为是从后往前进行处理的,因此这里的开始下标就是老表的长度
            transferIndex = n;
        }
        // 新数组长度
        int nextn = nextTab.length;
        // 创建一个ForwardingNode节点,如果别的线程发现这个槽位是ForwardingNode类型的,就会跳过这个节点,该对象在后文会被用到
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 一个初始值,如果为true,当前线程就会不断的推进任务的下标,即做扩容操作
        // 因为这个时候可能只有当前线程在做扩容操作,那么就需要进行单干,不断地截取固定步长的槽位,进行数据迁移
        boolean advance = true;
        boolean finishing = false; // 完成状态,如果为true,代表当前任务完成
        // 遍历每个哈希槽,从后往前遍历,bound指的是当前线程负责迁移的哈希桶的最小下标
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 即当前线程负责的哈希桶是否都处理完毕,如果完毕了,跳出循环
            while (advance) {
                int nextIndex, nextBound;
                // --i表示下一个待处理的bucket,如果它不在自己负责的段内,或者是自己的扩容任务完成了,那么下次循环的时候,就结束操作。
                if (--i >= bound || finishing)
                    advance = false;
                // 槽位被其它线程选择完了,transferIndex这个下标之后的哈希桶都是已经被处理完毕的。
                // 新的线程进来的时候,需要截取transferIndex之前的哈希桶来重新分配数据。 
                else if ((nextIndex = transferIndex) <= 0) {
                	// 赋值i = -1 代表 旧表中的桶都已经转移完毕
                    i = -1;
                    advance = false;
                }
                // 尝试获取槽位的操作权,CAS操作
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    // CAS成功后,更新当前线程负责的哈希桶的边界下标索引。
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // 通过遍历,设置结束条件
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 如果扩容完毕
                if (finishing) {
                    nextTable = null;// 清空扩容的过程中创建出来的临时表。
                    table = nextTab;// 将当前表指向临时表。
                    sizeCtl = (n << 1) - (n >>> 1);//
                    return;
                }
                // 使用CAS操作,对 sizeCtl 的低16位进行减 1,代表当前线程做完了属于自己的任务
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                	// 说明此时没有线程在扩容操作了,扩容结束。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果当前位置没有任何节点,那么放入之前初始化的ForwardingNode节点
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 表示该位置已经完成了迁移,ForwardingNode类型节点的hash值就是MOVED
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                    // 扩容操作,元素的重新分配
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    从上述代码,我们可以得知这么几个重点:

    1.首先,当前线程在处理哈希槽的时候,如果发现当前的哈希槽没有被处理,那么就会塞一个ForwardingNode对象。我们看下它的构造函数:

    ForwardingNode(Node<K,V>[] tab) {
    	// MOVED 的赋值对象就是hash,就是-1
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    那么其他线程在遍历的时候,遍历到对应槽的时候,就通过以下代码判断当前槽已经被处理过了:

    else if ((fh = f.hash) == MOVED)
    
    • 1

    第二点:每个线程,在分段处理哈希槽的时候,默认的步长是16(如果计算出的步长小于最小值,那么就用最小值作为步长):

    private static final int MIN_TRANSFER_STRIDE = 16;
    
    • 1

    整个transfer步骤可以分为几个大步骤:

    1. 计算一个线程在扩容时需要处理的槽位数量。
    2. 每个线程获取到自己对应的哈希槽段,即while (advance)循环。
    3. 然后遍历处理自己负责的哈希槽段中的节点。

    我们再看下最后一个步骤:对哈希槽中元素的处理:这里只说链表元素的处理。

    // 给当前的哈希桶加锁
    synchronized (f) {
    	// 双重检索下。因为当前哈希桶f,在未加锁之前,额能有别的线程调用了put函数,更新了桶内的数据
    	// 若不做这层校验,可能导致新加入到桶的元素没有被处理而丢失。
        if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            // 如果哈希桶对应的结构是链表
            if (fh >= 0) {
            	// 由于我们put一个元素的时候,哈希桶下标的计算是hash&(n-1),同时表容量是2的幂。因此这里计算出的结果即:
            	// hash&n的结果只有两个。要么是n要么是0。
                int runBit = fh & n;
                Node<K,V> lastRun = f;
                // 老链表由于需要被拆分到新链表中。即rehash。这里的目的是为了计算最长的不需要改变next指针的最长链。
                // 遍历出的最长链就可以直接放到新表中,
                for (Node<K,V> p = f.next; p != null; p = p.next) {
                    int b = p.hash & n;
                    if (b != runBit) {
                        runBit = b;
                        lastRun = p;
                    }
                }
                if (runBit == 0) {
                    ln = lastRun;
                    hn = null;
                }
                else {
                    hn = lastRun;
                    ln = null;
                }
                // 逐个遍历节点
                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                    int ph = p.hash; K pk = p.key; V pv = p.val;
                    // 0表示仍然放到下标为i的桶内的链表
                    if ((ph & n) == 0)
                        ln = new Node<K,V>(ph, pk, pv, ln);
                    else
                    	// 否则就是放到下标为i+n的桶内的链表
                        hn = new Node<K,V>(ph, pk, pv, hn);
                }
                setTabAt(nextTab, i, ln);// 设置新表的下标为i的桶内数据
                setTabAt(nextTab, i + n, hn);// 设置新表的下标为i+n的桶内数据
                setTabAt(tab, i, fwd);// 将老表下标为i的桶内放上ForwardingNode对象,用来标识当前处于扩容过程
                advance = true;
            }
            else if (f instanceof TreeBin) {
                // 如果是红黑树的处理
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    1.2.3 协助扩容操作 helpTransfer

    我们来看下相关的源码:

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
                // 如果相关数组没有被并发修改,并且当前容器处于扩容阶段。
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                // 如果扩容结束了,或者超过了最大帮助线程数数量,跳出循环
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                // 否则将sizeCtl+1,代表多一个线程帮助扩容。
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    1.3 get 方法的原理

    废话不多说,上代码:

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 拿到对应的哈希值
        int h = spread(key.hashCode());
        // 只有哈希槽数组不是null的时候,才可以获取。不然都没数据,肯定返回null
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 如果哈希槽上的Node头节点就是要找的,那么直接返回
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 否则,此时有两种情况,1:该节点是TreeBin节点,hash值固定为-2.
            // 2.此时容器处于扩容阶段,同时该Hash槽已经迁移完毕,并且对应类型是ForwardingNode,对应hash值为-1
            // Node和TreeBin都有对应的find函数。具体函数具体分析。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // 否则,对应槽上的是普通的链表结构,通过while循环遍历,寻找该节点
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    二. 总结

    2.1 元素插入部分

    2.1.1 扩容的时候为什么对链表要拆分成高低链?

    本质:取模运算在 n 是2的次幂时 &(n-1) 等价于 % n

    1. Map扩容的时候,我们知道,扩容的大小是原本的2倍。而每个哈希槽中的数据要重新计算它的hash值。因为数组长度发生了改变。
    2. 一个元素在扩容前和扩容后,都是通过hash & (n-1) 的方式来获取哈希桶的下标的。

    我们举个例子,一个元素的哈希值是20(十进制)。当前Map数组长度是16,扩容后32。那么两次的哈希结果:

    • 扩容前:20&15=4
    • 扩容后:20&31=20

    两次结果相差16,正好是扩容之前的数组长度。那么我们如果将链表分为高低位,我们只需要将高位的节点,将其哈希值加上数组长度,就能定位到新Map中哈希槽数组的下标了。无需重复计算hash值。

    我们再来看下代码中的片段:高位的链表,直接插到 i+n 的位置。
    在这里插入图片描述


    2.1.2 sizeCtl 的值有哪些?

    变量sizeCtl几乎出现在源码的各个地方,而其值又运用于多种不同的运用场景:

    • sizeCtl = 0:表示没有指定初始容量。
    • sizeCtl > 0:表示初始容量(可以使用阶段)。
    • sizeCtl = -1:标记作用,告知其他线程,正在初始化或者扩容。(不能使用阶段)
    • sizeCtl = 0.75n :初始化完毕。
    • sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容。

    2.1.3 其他线程如何感知到正在扩容?

    通过感知节点ForwardingNode

    1. 第一个线程,在扩容的时候,会对正在处理的哈希槽中插入一个ForwardingNode节点。
    2. ForwardingNode节点的hash值是-1.
    3. 后续其他线程在进行put操作的时候,会看当前节点的hash值,如果对应值是-1,就代表遇到了ForwardingNode节点。代表容器目前处于扩容或者初始化阶段。那么帮助程序进行扩容。

    2.1.4 扩容时如何保证线性安全?

    1. 对哈希槽数组进行扩容的时候,通过一个索引transferIndex来控制每个线程的一个执行区间。transferIndexvolatile修饰,因此线程之间可见。
    2. 每个线程通过CAS操作,获得新数组中,自己负责的那一块区间。一般默认的区间步长是16。从数组的末尾向前处理。
    3. transferIndex代表已经分配过的区域下标。在这个值之后的所有哈希槽都代表:已经被对应的线程处理过。 同时对应的哈希槽的节点为ForwardingNode类型。
      在这里插入图片描述

    同时,在ConcurrentHashMap中可以这么总结:

    • 关于赋值操作:基本上都是通过CAS原子操作来完成。
    • 关于加锁:加在整个哈希槽中。

    2.1.5 put 操作中几个重要的变量

    • advance:推进标记,如果为true,那么会不断的给当前线程分配任务。同时控制着整个容器的扩容进度。
    • finishing:扩容完成标记。(指当前线程分配到的扩容任务)。
    • transferIndex:转换的索引,标识着扩容到哪一个哈希槽了,值从数组长度-1到0逐渐变小。
    • sizeCtl:控制着ConcurrentHashMaptable数组的初始化和扩容操作。

    2.1.6 为什么要先插入元素,再统计数量?

    因为put操作,有可能是值的覆盖或者是插入操作,如果是覆盖操作。此时容器中的元素总数并没有改变。

    2.1.7 put 操作的流程图

    在这里插入图片描述

    2.2 元素获取部分

    2.2.1 为什么get操作不需要加锁?

    因为Node节点中的val变量通过volatile修饰。所以在多线程的环境下,即便value的值被修改了,在线程之间也是可见的。

    2.2.2 迁移过程中的哈希桶遇到get操作会发生什么?

    我们知道,以链表为例,在扩容的时候,会将链表分为高低位链表:ln和hn。那么原本的链表能正常访问吗?

    回答:可以。

    1. 因为ln和hn是在原本链表的基础上复制出来的,复制引用
    2. 原链表并不会受到影响,可以正常的访问。
    3. 如果数据迁移结束,会在对应的哈希槽上放一个fwd,那么之后的get请求,就会将请求转发到扩容后的数组中。
  • 相关阅读:
    OVS与Linux Bridge的区别整理
    这才是增加收录的核心操作方向
    人机组队概念的战场应用
    2023-9-12 多重背包问题(一)
    SSM框架速成3:SpringMVC
    欧拉函数公式证明
    【Unity】填坑,Unity接入Epic Online Service上架Epic游戏商城
    【无标题】关于vtk在vs2010中的配置
    安装Git
    大数据学习笔记1.2 登录配置虚拟机
  • 原文地址:https://blog.csdn.net/Zong_0915/article/details/126867354