• ConcurrentHashMap原理详解(太细了)


    一、什么是ConcurrentHashMap

    ConcurrentHashMapHashMap一样,是一个存放键值对的容器。使用hash算法来获取值的地址,因此时间复杂度是O(1)。查询非常快。
    同时,ConcurrentHashMap是线程安全的HashMap。专门用于多线程环境。

    二、ConcurrentHashMap和HashMap以及Hashtable的区别

    2.1 HashMap

    HashMap是线程不安全的,因为HashMap中操作都没有加锁,因此在多线程环境下会导致数据覆盖之类的问题,所以,在多线程中使用HashMap是会抛出异常的。

    2.2 HashTable

    HashTable是线程安全的,但是HashTable只是单纯的在put()方法上加上synchronized。保证插入时阻塞其他线程的插入操作。虽然安全,但因为设计简单,所以性能低下。

    2.3 ConcurrentHashMap

    ConcurrentHashMap是线程安全的,ConcurrentHashMap并非锁住整个方法,而是通过原子操作和局部加锁的方法保证了多线程的线程安全,且尽可能减少了性能损耗。

    由此可见,HashTable可真是一无是处…

    三、ConcurrentHashMap原理

    这一节专门介绍ConcurrentHashMap是如何保证线程安全的。如果想详细了解ConcurrentHashMap的数据结构,请参考HashMap

    3.1 volatile修饰的节点数组

    请看源码

    //ConcurrentHashMap使用volatile修饰节点数组,保证其可见性,禁止指令重排。
    transient volatile Node<K,V>[] table;
    
    • 1
    • 2

    再看看HashMap是怎么做的

    //HashMap没有用volatile修饰节点数组。
    transient Node<K,V>[] table;
    
    • 1
    • 2

    显然,HashMap并不是为多线程环境设计的。

    3.2 ConcurrentHashMap的put()方法

    //put()方法直接调用putVal()方法
    public V put(K key, V value) {
     return putVal(key, value, false);
    }
    //所以直接看putVal()方法。
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    我来给大家讲解一下步骤把。

    public V put(K key, V value) {
    
    • 1

    首先,put()方法是没有用synchronized修饰的。

    for (Node<K,V>[] tab = table;;)
    
    • 1

    新插入一个节点时,首先会进入一个死循环
    情商高的就会说,这是一个乐观锁
    进入乐观锁后,

    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    
    • 1
    • 2

    如果tab未被初始化,则先将tab初始化。此时,这轮循环结束,因为被乐观锁锁住,开始下一轮循环。
    第二轮循环,此时tab已经被初始化了,所以跳过。

    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        if (casTabAt(tab, i, null,
                     new Node<K,V>(hash, key, value, null)))
            break;                   // no lock when adding to empty bin
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    接下来通过keyhash值来判断table中是否存在相同的key,如果不存在,执行casTabAt()方法。
    注意,这个操作时不加锁的,看到里面的那行注释了么// no lock when adding to empty bin。位置为空时不加锁。
    这里其实是利用了一个CAS操作。

    CAS(Compare-And-Swap):比较并交换

    这里就插播一个小知识,CAS就是通过一个原子操作,用预期值去和实际值做对比,如果实际值和预期相同,则做更新操作。
    如果预期值和实际不同,我们就认为,其他线程更新了这个值,此时不做更新操作。
    而且这整个流程是原子性的,所以只要实际值和预期值相同,就能保证这次更新不会被其他线程影响。

    好了,我们继续。
    既然这里用了CAS操作去更新值,那么就存在两者情况。

    1. 实际值和预期值相同
      相同时,直接将值插入,因为此时是线程安全的。好了,这时插入操作完成。使用break;跳出了乐观锁。循环结束。
    2. 实际值和预期值不同
      不同时,不进行操作,因为此时这个值已经被其他线程修改过了,此时这轮操作就结束了,因为还被乐观锁锁住,进入第三轮循环。

    第三轮循环中,前面的判断又会重新执行一次,我就跳过不说了,进入后面的判断。

     else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);
    
    • 1
    • 2

    这里判断的是tab的状态,MOVED表示在扩容中,如果在扩容中,帮助其扩容。帮助完了后就会进行第四轮循环。
    终于,来到了最后一轮循环。

    else {
        V oldVal = null;
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                if (fh >= 0) {
                    binCount = 1;
                    for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                        if (e.hash == hash &&
                            ((ek = e.key) == key ||
                             (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            if (!onlyIfAbsent)
                                e.val = value;
                            break;
                        }
                        Node<K,V> pred = e;
                        if ((e = e.next) == null) {
                            pred.next = new Node<K,V>(hash, key,
                                                      value, null);
                            break;
                        }
                    }
                }
                else if (f instanceof TreeBin) {
                    Node<K,V> p;
                    binCount = 2;
                    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                   value)) != null) {
                        oldVal = p.val;
                        if (!onlyIfAbsent)
                            p.val = value;
                    }
                }
            }
        }
        if (binCount != 0) {
            if (binCount >= TREEIFY_THRESHOLD)
                treeifyBin(tab, i);
            if (oldVal != null)
                return oldVal;
            break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    上面的判断都不满足时,就会进入最后的分支,这条分支表示,keyhash值位置不为null,表示发生了hash冲突,此时节点就要通过链表的形式存储这个插入的新值。Node类是有next字段的,用来指向链表的下一个位置,新节点就往这插。

    synchronized (f) {
    
    • 1

    看,终于加排它锁了,只有在发生hash冲突的时候才加了排它锁。

    if (tabAt(tab, i) == f) {
    	if (fh >= 0) {
    
    • 1
    • 2

    重新判断当前节点是不是第二轮判断过的节点,如果不是,表示节点被其他线程改过了,进入下一轮循环,
    如果是,再次判断是否在扩容中,如果是,进入下一轮循环,
    如果不是,其他线程没改过,继续走,

    for (Node<K,V> e = f;; ++binCount) {
    
    • 1

    for循环,循环遍历这个节点上的链表,

    if (e.hash == hash &&
        ((ek = e.key) == key ||
         (ek != null && key.equals(ek)))) {
        oldVal = e.val;
        if (!onlyIfAbsent)
            e.val = value;
        break;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    找到一个hash值相同,且key也完全相同的节点,更新这个节点。
    如果找不到

    if ((e = e.next) == null) {
    	pred.next = new Node<K,V>(hash, key,
                                  value, null);
        break;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    往链表最后插入这个新节点。因为在排他锁中,这些操作都可以直接操作。终于到这插入就基本完成了。

    总结

    做插入操作时,首先进入乐观锁,
    然后,在乐观锁中判断容器是否初始化,
    如果没初始化则初始化容器,
    如果已经初始化,则判断该hash位置的节点是否为空,如果为空,则通过CAS操作进行插入。
    如果该节点不为空,再判断容器是否在扩容中,如果在扩容,则帮助其扩容。
    如果没有扩容,则进行最后一步,先加锁,然后找到hash值相同的那个节点(hash冲突),
    循环判断这个节点上的链表,决定做覆盖操作还是插入操作。
    循环结束,插入完毕。

    3.3 ConcurrentHashMap的get()方法

    //ConcurrentHashMap的get()方法是不加锁的,方法内部也没加锁。
    public V get(Object key)
    
    • 1
    • 2

    看上面这代码,ConcurrentHashMapget()方法是不加锁的,为什么可以不加锁?因为tablevolatile关键字修饰,保证每次获取值都是最新的。

    //Hashtable的get()是加锁的,所以性能差。
    public synchronized V get(Object key) 
    
    • 1
    • 2

    再看看Hashtable,差距啊。

    四、使用场景

    嗯,多线程环境下,更新少,查询多时使用的话,性能比较高。
    乐观锁嘛,认为更新操作时不会被其他线程影响。所以时候再更新少的情况下性能高。

    对你有帮助吗?点个赞吧~

  • 相关阅读:
    MySql死锁
    网络原理---拿捏应用层:HTTP协议
    Linux 之文件查找
    【DELL】戴尔笔记本PE下没有硬盘解决方法
    Java版分布式微服务云开发架构 Spring Cloud+Spring Boot+Mybatis 电子招标采购系统功能清单
    32.Python面向对象(五)【描述符、运算符底层、装饰器:闭包-闭包参数-内置装饰器-类装饰器】
    如何看待Java上层技术与JVM
    serve error code=5011(RtcRtpMuxer)(Failed to mux RTP packet for RTC)
    图的存储结构
    C++参考好资源网址推荐
  • 原文地址:https://blog.csdn.net/qq_42068856/article/details/126091526