• 9张图深入剖析ConcurrentHashMap


    前言

    在日常的开发中,我们经常使用key-value键值对的HashMap,其使用哈希表实现,用空间换取时间,提升查询性能

    但在多线程的并发场景中,HashMap并不是线程安全的

    如果想使用线程安全的,可以使用ConcurrentHashMap、HashTable、Collections.synchronizedMap等

    但由于后面二者使用synchronized的粒度太大,因此一般不使用,而使用并发包中的ConcurrentHashMap

    在ConcurrentHashMap中,使用volatile保证内存可见性,使得读场景下不需要“加锁”保证原子性

    在写场景下使用CAS+synchronized,synchronized只锁哈希表某个索引位置上的首节点,相当于细粒度加锁,增大并发性能

    本篇文章将从ConcurrentHashMap的使用,读、写、扩容的实现原理,设计思想等方面进行剖析

    查看本文前需要了解哈希表、volatile、CAS、synchronized等知识

    volatile可以查看这篇文章:5个案例和流程图让你从0到1搞懂volatile关键字

    CAS、synchronized可以查看这篇文章:15000字、6个代码案例、5个原理图让你彻底搞懂Synchronized

    使用ConcurrentHashMap

    ConcurrentHashMap是并发场景下线程安全的Map,可以在并发场景下查询存储K、V键值对

    不可变对象是绝对线程安全的,无论外界如何使用,都线程安全

    ConcurrentHashMap并不是绝对线程安全的,只提供方法的线程安全,如果在外层使用错误依旧会导致线程不安全

    来看下面的案例,使用value存储自增调用次数,开启10个线程每个执行100次,最终结果应该是1000次,但错误的使用导致不足1000

    
        public void test() {
    //        Map map = new HashMap(16);
            Map map = new ConcurrentHashMap(16);
    
            String key = "key";
            CountDownLatch countDownLatch = new CountDownLatch(10);
    
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 100; j++) {
                        incr(map, key);
    //                    incrCompute(map, key);
                    }
                    countDownLatch.countDown();
                }).start();
            }
    
            try {
                //阻塞到线程跑完
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //1000不到
            System.out.println(map.get(key));
        }
    
        private void incr(Map map, String key) {
            map.put(key, map.getOrDefault(key, 0) + 1);
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在自增方法incr中,先进行读操作,再计算,最后进行写操作,这种复合操作没有保证原子性,导致最终所有结果累加一定不为1000

    正确的使用方式是使用JDK8提供的默认方法compute

    ConcurrentHashMap实现compute的原理是在put中使用同步手段后再进行计算

        private void incrCompute(Map map, String key) {
            map.compute(key, (k, v) -> Objects.isNull(v) ? 1 : v + 1);
        }
    • 1
    • 2

    数据结构

    与HashMap类似,使用哈希表+链表/红黑树实现

    哈希表

    哈希表的实现由数组构成,当发生哈希冲突(哈希算法得到同一索引)时使用链地址法构建成链表

    image.png

    当链表上的节点太长,遍历寻找开销大,超过阈值时(链表节点超过8个、哈希表长度大于64),树化成红黑树减少遍历寻找开销,时间复杂度从O(n)优化为(log n)

    image.png

    ConcurrentHashMap由Node数组构成,在扩容时会存在新旧两个哈希表:table、nextTable

    public class ConcurrentHashMap extends AbstractMap
        implements ConcurrentMap, Serializable {    
        //哈希表 node数组
        transient volatile Node[] table;
    
        //扩容时为了兼容读写,会存在两个哈希表,这个是新哈希表
        private transient volatile Node[] nextTable;
    
        // 默认为 0
        // 当初始化时, 为 -1
        // 当扩容时, 为 -(1 + 扩容线程数)
        // 当初始化或扩容完成后,为 下一次的扩容的阈值大小
        private transient volatile int sizeCtl;
    
        //扩容时 用于指定迁移区间的下标
        private transient volatile int transferIndex;
    
        //统计每个哈希槽中的元素数量
        private transient volatile CounterCell[] counterCells;
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    节点

    Node用于实现哈希表数组的节点和发生哈希冲突时,构建成链表的节点

    //实现哈希表的节点,数组和链表时使用
    static class Node implements Map.Entry {
        //节点哈希值
        final int hash;
        final K key;
        volatile V val;
        //作为链表时的 后续指针 
        volatile Node next;        
    }
    
    // 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
    static final class ForwardingNode extends Node {}
    
    // 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
    static final class ReservationNode extends Node {}
    
    // 作为 treebin 的头节点, 存储 root 和 first
    static final class TreeBin extends Node {}
    
    // 作为 treebin 的节点, 存储 parent, left, right
    static final class TreeNode extends Node {}
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    节点哈希值

    //转发节点
    static final int MOVED     = -1;
    //红黑树在数组中的节点
    static final int TREEBIN   = -2;
    //占位节点
    static final int RESERVED  = -3;
    • 1
    • 2
    • 3
    • 4
    • 5

    转发节点:继承Node,用于扩容时设置在旧哈希表某索引的首节点,遇到转发节点要去新哈希表中寻找

    static final class ForwardingNode extends Node {
            //新哈希表
            final Node[] nextTable;
    
            ForwardingNode(Node[] tab) {
                //哈希值设置为-1
                super(MOVED, null, null, null);
                this.nextTable = tab;
            }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    红黑树在数组中的节点 TreeBin:继承Node,first指向红黑树的首节点

    static final class TreeBin extends Node {
            TreeNode root;
            //红黑树首节点
            volatile TreeNode first;
    }    
    • 1
    • 2
    • 3
    • 4

    image.png

    红黑树节点TreeNode

    static final class TreeNode extends Node {
            TreeNode parent;  
            TreeNode left;
            TreeNode right;
            TreeNode prev; 
            boolean red;
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    占位节点:继承Node,需要计算时(使用computer方法),先使用占位节点占位,计算完再构建节点取代占位节点

        static final class ReservationNode extends Node {
            ReservationNode() {
                super(RESERVED, null, null, null);
            }
    
            Node find(int h, Object k) {
                return null;
            }
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    实现原理

    构造

    在构造时会检查入参,然后根据需要存储的数据容量、负载因子计算哈希表容量,最后将哈希表容量调整成2次幂

    构造时并不会初始化,而是等到使用再进行创建(懒加载)

        public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            //检查负载因子、初始容量
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
    
            //concurrencyLevel:1
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                initialCapacity = concurrencyLevel;   // as estimated threads
            //计算大小 = 容量/负载因子 向上取整
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            //如果超过最大值就使用最大值 
            //tableSizeFor 将大小调整为2次幂
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
    
            //设置容量
            this.sizeCtl = cap;
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    读-get

    读场景使用volatile保证可见性,即使其他线程修改也是可见的,不用使用其他手段保证同步

    读操作需要在哈希表中寻找元素,经过扰动算法打乱哈希值,再使用哈希值通过哈希算法得到索引,根据索引上的首节点分为多种情况处理

    1. 扰动算法将哈希值充分打乱(避免造成太多的哈希冲突),符号位&0保证结果正数

      int h = spread(key.hashCode())

      扰动算法:哈希值高低16位异或运算

      经过扰动算法后,&HASH_BITS = 0x7fffffff (011111...),符号位为0保证结果为正数

      负数的哈希值表示特殊的作用,比如:转发节点、树的首节点、占位节点等

          static final int spread(int h) {
              return (h ^ (h >>> 16)) & HASH_BITS;
          }
      • 1
      • 2
    2. 使用打乱的哈希值经过哈希算法得到数组中的索引(下标)

      n 为哈希表长度:(n = tab.length)

      (e = tabAt(tab, (n - 1) & h)

      h为计算后的哈希值,哈希值%(哈希表长度-1) 就能求出索引位置

      为了性能提升,规定哈希表长度为2的n次幂,哈希表长度二进制一定是1000....,而(n-1)的二进制一定是0111...

      因此(n - 1) & h计算索引,进行与运算的结果一定在0~n-1之间 使用位运算提升性能

    3. 得到数组上的节点后,需要进行比较

      找到哈希表上的首个节点后,进行比较key 查看是否是当前节点

      比较规则:先对哈希值进行比较,如果对象哈希值相同,那么可能是同一个对象,还需要比较key(==与equals),如果哈希值都不相同,那么肯定不是同一个对象

      先比较哈希值的好处就是提升查找性能,如果直接使用equals 可能时间复杂度会上升(比如String的equals)

    4. 使用链地址法解决哈希冲突,因此找到节点后可能遍历链表或树;由于哈希表存在扩容,也可能要去新节点上寻找

      4.1 首节点比较成功,直接返回

      4.2 首节点哈希值为负,说明该节点是特殊情况的:转发节点、树的首节点 、计算的预订占位节点

      • 如果是转发节点,正在扩容则去新数组上找
      • 如果是TreeBin则去红黑树中寻找
      • 如果是占位节点 直接返回空

      4.3 遍历该链表依次比较

    get代码

    public V get(Object key) {
        Node[] tab; Node e, p; int n, eh; K ek;
        //1.spread:扰动算法 + 让key的哈希值不能为负数,因为负数哈希值代表红黑树或ForwardingNode
        int h = spread(key.hashCode());
        //2.(n - 1) & h:下标、索引 实际上就是数组长度模哈希值 位运算效率更高
        //e:哈希表中对应索引位置上的节点
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //3.如果哈希值相等,说明可能找到,再比较key
            if ((eh = e.hash) == h) {
                //4.1 key相等说明找到 返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //4.2 首节点哈希值为负,说明该节点是转发节点,当前正在扩容则去新数组上找
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
    
            //4.3 遍历该链表,能找到就返回值,不能返回null
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    写-put

    添加元素时,使用CAS+synchronized(只锁住哈希表中某个首节点)的同步方式保证原子性

    1. 获取哈希值:扰动算法+确保哈希值为正数
    2. 哈希表为空,CAS保证一个线程初始化
        private final Node[] initTable() {
            Node[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                //小于0 说明其他线程在初始化 让出CPU时间片 后续初始化完退出
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); 
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    //CAS将SIZECTL设置成-1 (表示有线程在初始化)成功后 初始化
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node[] nt = (Node[])new Node[n];
                            table = tab = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    1. 将哈希值通过哈希算法获取索引上的节点 f = tabAt(tab, i = (n - 1) & hash)

    2. 根据不同情况进行处理

      • 4.1 首节点为空时,直接CAS往索引位置添加节点 casTabAt(tab, i, null,new Node(hash, key, value, null))

        image.png

      • 4.2 首节点hash为MOVED -1时,表示该节点是转发节点,说明正在扩容,帮助扩容

      • 4.3 首节点加锁

        • 4.3.1 遍历链表寻找并添加/覆盖

          image.png

        • 4.3.2 遍历树寻找并添加/覆盖

    3. addCount统计每个节点上的数据,并检查扩容

    put代码

    //onlyIfAbsent为true时,如果原来有k,v则这次不会覆盖
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //1.获取哈希值:扰动算法+确保哈希值为正数
        int hash = spread(key.hashCode());
        int binCount = 0;
        //乐观锁思想 CSA+失败重试
        for (Node[] tab = table;;) {
            Node f; int n, i, fh;
            //2.哈希表为空 CAS保证只有一个线程初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //3. 哈希算法求得索引找到索引上的首节点
            //4.1 节点为空时,直接CAS构建节点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //4.2 索引首节点hash 为MOVED 说明该节点是转发节点,当前正在扩容,去帮助扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //4.3 首节点 加锁
                synchronized (f) {
                    //首节点没变
                    if (tabAt(tab, i) == f) {
                        //首节点哈希值大于等于0 说明节点是链表上的节点  
                        //4.3.1 遍历链表寻找然后添加/覆盖
                        if (fh >= 0) {
                            //记录链表上有几个节点
                            binCount = 1;
                            //遍历链表找到则替换,如果遍历完了还没找到就添加(尾插)
                            for (Node e = f;; ++binCount) {
                                K ek;
                                //替换
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    //onlyIfAbsent为false允许覆盖(使用xxIfAbsent方法时,有值就不覆盖)
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node pred = e;
                                //添加
                                if ((e = e.next) == null) {
                                    pred.next = new Node(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果是红黑树首节点,则找到对应节点再覆盖
                        //4.3.2 遍历树寻找然后添加/覆盖
                        else if (f instanceof TreeBin) {
                            Node p;
                            binCount = 2;
                            //如果是添加返回null,返回不是null则出来添加
                            if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                //覆盖
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
    
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        //链表上的节点超过TREEIFY_THRESHOLD 8个(不算首节点) 并且 数组长度超过64才树化,否则扩容
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //5.添加计数,用于统计元素(添加节点的情况)
        addCount(1L, binCount);
        return null;
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    扩容

    为了避免频繁发生哈希冲突,当哈希表中的元素数量 / 哈希表长度 超过负载因子时,进行扩容(增大哈希表的长度)

    一般来说扩容都是增大哈希表长度的2倍,比如从32到64保证长度是2次幂;如果扩容长度达到整型上限则使用整型最大值

    当发生扩容时,需要将数组中每个槽里的链表或树迁移到新数组中

    如果处理器是多核,那么这个迁移的操作并不是一个线程单独完成的,而是会让其他线程也来帮助迁移

    在迁移时让每个线程从右往左的每次迁移多个槽,迁移完再判断是否全部迁移完,如果没迁移完则继续循环迁移

    扩容操作主要在transfer方法中,扩容主要在三个场景下:

    1. addCount:添加完节点增加计数检查扩容
    2. helpTransfer:线程put时发现正在迁移,来帮忙扩容
    3. tryPresize:尝试调整容量(批量添加putAll,树化数组长度没超过64时treeifyBin调用)

    分为以下3个步骤

    1. 根据CPU核数、哈希表总长度计算每次迁移多少个槽,最小16个

    2. 新哈希表为空,说明是初始化

    3. 循环迁移

      • 3.1 分配负责迁移的区间 [bround,i](可能存在多线程同时迁移)

        image.png

      • 3.2 迁移:分为链表迁移、树迁移

        链表迁移

        1. 将链表上的节点充分散列到新哈希表的index、index+旧哈希表长度的两个下标中(与HashMap类似)

        2. 将index位置链表中的节点 (hash & 哈希表长度),结果为0的放到新数组的index位置,结果为1放到新数组index+旧哈希表长度的位置

          image.png

          比如旧哈希表长度为16,在索引3的位置上,16的二进制是10000,hash&16 => hash& 10000 ,也就是说节点哈希值第5位是0就放到新哈希表的3位置上,是1就放到新哈希表的3+16下标

        3. 使用头插法将计算结果为0构建成ln链表,为1构建成hn链表,为方便构建链表,会先寻找lastRun节点:lastRun节点及后续节点都为同一链表上的节点,方便迁移

          构建链表前先构建lastRun,比如图中lastRun e->f ,先将lastRun放到ln链表上,在遍历原始链表,遍历到a :a->e->f,遍历到b:b->a->e->f

        image.png

        1. 每迁移完一个索引位置就将转发节点设置到原哈希表对应位置,当其他线程进行读get操作时,根据转发节点来新哈希表中寻找,进行写put操作时,来帮助扩容(其他区间进行迁移)

          image.png

    扩容代码

    //tab 旧哈希表
    //nextTab 新哈希表
    private final void transfer(Node[] tab, Node[] nextTab) {
            //1.计算每次迁移多少个槽
            //n:哈希表长度(多少个槽)
            int n = tab.length, stride;
            //stride:每次负责迁移多少个槽
            //NCPU: CPU核数
            //如果是多核,每次迁移槽数 = 总槽数无符号右移3位(n/8)再除CPU核数  
            //每次最小迁移槽数 = MIN_TRANSFER_STRIDE = 16
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
    
            //2.如果新哈希表为空,说明是初始化
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node[] nt = (Node[])new Node[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                //transferIndex用于记录 每次负责迁移的槽右区间下标,从右往左分配,起始为最右
                transferIndex = n;
            }
            //新哈希表长度
            int nextn = nextTab.length;
            //创建转发节点,转发节点一般设置在旧哈希表首节点,通过转发节点可以找到新哈希表
            ForwardingNode fwd = new ForwardingNode(nextTab);
            //advance:是否继续循环迁移
            boolean advance = true;
            // 
            boolean finishing = false; // to ensure sweep before committing nextTab
            //3.循环迁移
            for (int i = 0, bound = 0;;) {
                Node f; int fh;
                //3.1 分配负责迁移的区间
                //bound为左区间 i为右区间
                while (advance) {
                    int nextIndex, nextBound;
                    //处理完一个槽 右区间 自减
                    if (--i >= bound || finishing)
                        advance = false;
                    //transferIndex<=0说明 要迁移的区间全分配完
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        advance = false;
                    }
                    //CAS设置本次迁移的区间,防止多线程分到相同区间
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
    
                //3.2 迁移
    
                //3.2.1 如果右区间i不再范围,说明迁移完
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    //如果完成迁移,设置哈希表、数量
                    if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    //CAS 将sizeCtl数量-1 表示 一个线程迁移完成 
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        //如果不是最后一条线程直接返回
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        //是最后一条线程设置finishing为true  后面再循环 去设置哈希表、数量等操作
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
                //3.2.2 如果旧哈希表i位置节点为空就CAS设置成转发节点
                else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
                //3.2.3 如果旧哈希表该位置首节点是转发节点,说明其他线程已处理,重新循环
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {
                    //3.2.4 对首节点加锁 迁移
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            Node ln, hn;
                            //3.2.4.1 链表迁移
                            //首节点哈希值大于等于0 说明 是链表节点
                            if (fh >= 0) {
                                int runBit = fh & n;
                                Node lastRun = f;
                                //寻找lastRun节点 
                                for (Node p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                //如果最后一次计算值是0
                                //lastRun节点以及后续节点计算值都是0构建成ln链表 否则 都是1构建成hn链表
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
    
                                //遍历构建ln、hn链表 (头插)
                                for (Node p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    //头插:Node构造第四个参数是后继节点
                                    if ((ph & n) == 0)
                                        ln = new Node(ph, pk, pv, ln);
                                    else
                                        hn = new Node(ph, pk, pv, hn);
                                }
                                //设置ln链表到i位置
                                setTabAt(nextTab, i, ln);
                                //设置hn链表到i+n位置
                                setTabAt(nextTab, i + n, hn);
                                //设置转发节点
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            //3.2.4.2 树迁移
                            else if (f instanceof TreeBin) {
                                TreeBin t = (TreeBin)f;
                                TreeNode lo = null, loTail = null;
                                TreeNode hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode p = new TreeNode
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175

    实现原理并没有对红黑树进行太多描述,一方面是红黑树的概念太多,另一方面是我忘的差不多了(已经老了,不像大学那样可以手写红黑树了)

    还有一方面是:我认为只需要知道使用红黑树的好处就足够,而且工作中也不常用,就算死扣红黑树要怎么变色、左旋、右旋去满足红黑树的条件也没什么意义,感兴趣的同学去学习就好了

    迭代器

    ConcurrentHashMap中的迭代器是弱一致性,在获取时使用记录的哈希表重新构建新对象

    Entry迭代器:

    public Iterator> iterator() {
        ConcurrentHashMap m = map;
        Node[] t;
        int f = (t = m.table) == null ? 0 : t.length;
        return new EntryIterator(t, f, 0, f, m);
    }
    • 1
    • 2
    • 3
    • 4
    • 5

    key迭代器

    public Enumeration keys() {
        Node[] t;
        int f = (t = table) == null ? 0 : t.length;
        return new KeyIterator(t, f, 0, f, this);
    }
    • 1
    • 2
    • 3
    • 4

    value迭代器

    public Enumeration elements() {
        Node[] t;
        int f = (t = table) == null ? 0 : t.length;
        return new ValueIterator(t, f, 0, f, this);
    }
    • 1
    • 2
    • 3
    • 4

    总结

    ConcurrentHashMap使用哈希表的数据结构,当发生哈希冲突时,使用链地址法解决,将哈希到同一索引的节点构建成链表,当数据量达到一定阈值,会将链表转化为红黑树

    ConcurrentHashMap使用volatile修饰存储数据,使得在读场景下对其他线程的修改可见,不需要使用同步机制,使用CAS与synchronzied保证写场景下的原子性

    在get查询数据时,先将key的哈希值通过扰动算法(高低16位异或)并保证结果为正数(与上符号位0),再与上哈希表长度-1求出索引值,找到索引后再根据不同情况查找(比较先判断哈希值,相等再判断key)

    在put添加/覆盖数据时,也是先通过扰动算法和哈希求出索引位置,在根据不同情况查找,找到则覆盖,找不到则替换

    在需要扩容时,会为线程安排需要迁移的槽区间,当其他线程进行put时也会来帮忙迁移,每次线程迁移完一个槽,会设置转发节点到原哈希表中,这样有线程查询就可以通过转发节点来新哈希表中查找,当迁移完所有槽时留一个线程来设置哈希表、数量等

    迭代器使用的是弱一致性,在获取迭代器时通过哈希表去构建新的对象

    ConcurrentHashMap 只保证相对线程安全,不能保证绝对线程安全,如果需要进行一系列操作时,要正确的去使用

    本文由博客一文多发平台 OpenWrite 发布!

  • 相关阅读:
    Bootstrap框架
    使用Python进行页面开发——模板层
    21.4 Python 使用GeoIP2地图定位
    python requests.post请求404问题
    洛谷P1607 Fair Shuttle G
    面试官:为什么 Redis 要有哨兵?我该怎么回答?
    支持对协议和会话分享动作进行授权,新增API Key白名单功能,JumpServer堡垒机v3.9.0发布
    LeetCode | 一探环形链表的奥秘【快慢双指针妙解BAT等大厂经典算法题】
    默认为4G网络
    android 解决AVC问题
  • 原文地址:https://blog.csdn.net/Tc_lccc/article/details/133590623