• 【ConcurrentHashMap】JDK1.7版本源码解读与分析


    如果对文章中提到的与 HashMap 相关的部分有任何疑问, 请移步HashMap源码详解

    简介

    在这里插入图片描述

    • 底层是一个 Segment[] 数组, 每个 Segment对象 内部又有一个 Entry[ ] 数组, 一个 Entry[] 数组就相当于一个HashMap

    • Entry[ ]采用拉链法解决冲突, 但是没有红黑树, 红黑树是1.8才引入的;

    • 一个 Segment对象就是一个段; 当往容器中添加元素调用 put 方法时, 锁的粒度就是一个段;

    • 调用 put 方法时, 先计算应该放到 Segment[ ] 中的哪个段, 然后调用 segment.put() 方法将 Entry 插入到 segment.Entry[ ],可以看出, 一次插入有两次散列,一次负责选择在 Segment[] 中的位置, 一次负责选择 Segment 内 Entry[] 的位置;

    • 两次散列使用的 hash 值不同;

    • Segment数组的长度是固定的, 构造以后就不会再扩容, 内部的Entry[ ] 是一个个独立的Map, 会各自扩容, 互相之间的大小没有必然关系;

    • ConcurrentHashMap拉链时采用的是头插法;

    构造方法

    • 默认的 initialCapacity = 16, loadFactor = 0.75, concurrentcyLevel = 16;

    • concurrencyLevel 经一些计算, 得到 Segment数组 的长度, 并且不再改变.

      具体逻辑为: 取 >= concurrencyLevel 的, 最小的, 2的整数次幂作为 Segment 数组的长度; 原理是将 1 不断左移, 直到 >= concurrencyLevel参数;

    • initialCapacity 和 concurrencyLevel 经计算得到 segment[0] 中Entry数组的长度;

      具体逻辑为: (initialCapacity / concurrentcyLevel) 向上取整得tmp, 如果tmp <= 2, 直接取 capacity = 2; 如果tmp > 2, 再取 >= temp 的, 最小的, 2的整数次幂;

    • 在默认情况下, 最终 Segment 数组长度为 16, 一个 Segment 内部的 Entry 数组长度为 2;

    • 根据计算出的尺寸, 创建一个 Segment 对象 s0;

    • 使用 UNSAFE.putOrderedObject(ss, SBASE, s0) 系统调用将 s0 直接放到 segment数组 下标为0的位置;

    • 经过构造后, 只有segment[0] 的位置有 非null segment对象, 其余位置将在各自首次添加元素的时候复制 segment[0] 的参数, 进行初始化; s0 就是一个原型;

    public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (concurrencyLevel > MAX_SEGMENTS)
                concurrencyLevel = MAX_SEGMENTS;
            // Find power-of-two sizes best matching arguments
            int sshift = 0;
            int ssize = 1; // 用于记录segment数组的长度
            while (ssize < concurrencyLevel) {
                ++sshift;
                // 通过不断左移的方式得到 >= concurrencyLevel 的2的整数次幂, 时间复杂度为32
                ssize <<= 1;
            }
            this.segmentShift = 32 - sshift; // 散列到segements和enntries时要使用不同的hash值, 通过hash值移位实现
        	// 用于取&, 代替取模操作, 因为segment数组长度不会再变化, 所以这里记录下来, 以后直接用
            this.segmentMask = ssize - 1; 
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            int c = initialCapacity / ssize; // 用于记录Entry数组长度
            if (c * ssize < initialCapacity)
                ++c; // 相当于向上取整 
            int cap = MIN_SEGMENT_TABLE_CAPACITY; // 保证最小为2
            while (cap < c)
                cap <<= 1; // 取2的整数次幂
        
        	// new 出一个segment对象, 作为 ss[0]
            Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                                 (HashEntry<K,V>[])new HashEntry[cap]);
            Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        	// 使用系统调用将 s0 放到 ss[0]
            UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
            this.segments = ss;
        }
    

    hash方法

    • 通过一个 seed 和 key.hashCode() 异或, 再进行移位相加, 异或 等操作得到Hash值
    private int hash(Object k) {
            int h = hashSeed;
    
            if ((0 != h) && (k instanceof String)) {
                return sun.misc.Hashing.stringHash32((String) k);
            }
    
            h ^= k.hashCode();
    
            // Spread bits to regularize both segment and index locations,
            // using variant of single-word Wang/Jenkins hash.
            h += (h <<  15) ^ 0xffffcd7d;
            h ^= (h >>> 10);
            h += (h <<   3);
            h ^= (h >>>  6);
            h += (h <<   2) + (h << 14);
            return h ^ (h >>> 16);
    }
    

    put方法

    • 不允许以 null 作为 key;
    • ConcurrentHashMap不允许以null作为value, 这是为了防止歧义; 在HashMap中, 串行执行的情况下, 如果 get(key0) == null, 可以使用containsKey方法确定是否存在key0; 而在多线程环境下, 如果使用相同的方式, 即使后面调用containsKey发现存在key0, 那你也不能确定key0是不是在你get之后由别的线程插入的; 也就是说无法确定在你调用get方法时得到 null 的瞬间, 究竟是不存在key0? 还是存在key0但是value == null
    • JDK1.8中, ConcurrentHashMap判断两个键值对是否重复的逻辑是 key 的hash值 == 且 (key == 或 equals), 而ConcurrentHashMap的判断逻辑是 key== 或 ( hash== 且 key.equals )
     public V put(K key, V value) {
    	Segment<K,V> s;
    	if (value == null)
    	    throw new NullPointerException();
    	int hash = hash(key);
    	// 通过 hash >>> segmentShift取一个新的hash值进行散列, 确定要放在哪个段;
    	int j = (hash >>> segmentShift) & segmentMask; 
    	// 如果要放入的位置 segment 引用为 null 的话, 调用ensureSegment生成 segment 对象放入对应位置
    	if ((s = (Segment<K,V>)UNSAFE.getObject        
    	     (segments, (j << SSHIFT) + SBASE)) == null) 
    	    s = ensureSegment(j);
    	// 拿到segment对象, 调用其 put 方法, 将键值对放到其内部的entry中
    	// 直接用原hash值, 这样两次散列使用的hash值就不一样
    	return s.put(key, hash, value, false); 
    }
    
    • ensureSegment方法中, 通过不断判断 ss[k] 上是否已经有 非null 引用和 CAS 操作来保证线程安全;
    • 最终结果就是以当前 segments[0] 中保存的大小参数, 来 new 一个 segment 对象, 放入 ss 对应位置, 并将该对象返回
    private Segment<K,V> ensureSegment(int k) {
            final Segment<K,V>[] ss = this.segments;
            long u = (k << SSHIFT) + SBASE; // raw offset
            Segment<K,V> seg;
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                Segment<K,V> proto = ss[0]; // use segment 0 as prototype
                int cap = proto.table.length;
                float lf = proto.loadFactor;
                int threshold = (int)(cap * lf);
                HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
                if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // recheck
                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                           == null) {
                        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                            break;
                    }
                }
            }
            return seg;
        }
    
    • Segment::put方法中, 使用锁机制保证线程安全;
    • static final class Segment extends ReentrantLock implements Serializable; Segment类继承了ReentrantLock, 也就是说segment对象本身就可以充当一个锁
    • segment内部的put方法, 加锁成功后的逻辑和 HashMap 基本上是一样的, 重点看scanAndLockForPut方法
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 在这里尝试上锁, 如果加锁失败, 调用scanAndLockForPut, scanAndLockForPut成功加锁后才会返回;
    	HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    	V oldValue;
    	try {
    	    HashEntry<K,V>[] tab = table;
    	    int index = (tab.length - 1) & hash;
    	    HashEntry<K,V> first = entryAt(tab, index);
    	    for (HashEntry<K,V> e = first;;) {
    	        if (e != null) {
    	            K k;
    	            if ((k = e.key) == key ||
    	                (e.hash == hash && key.equals(k))) {
    	                // ... 和HashMap逻辑一样
                        // rehash扩容方法也被包裹在内
    	} finally {
    	    unlock();
    	}
    	return oldValue;
    }
    
    • 在内部尝试提前创建Node, 创建完成后
    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    	HashEntry<K,V> first = entryForHash(this, hash); // 要散列到的位置的第一个Entry
    	HashEntry<K,V> e = first;
    	HashEntry<K,V> node = null;
    	int retries = -1; // negative while locating node
    	while (!tryLock()) {
    	    HashEntry<K,V> f; 
            // 1.尝试创建Node的分支
    	    if (retries < 0) { 
    	        if (e == null) {
    	            if (node == null) // 已经找到要插入的位置, 提前构建Node
    	                node = new HashEntry<K,V>(hash, key, value, null);
    	            retries = 0; // 已经创建Node, 不再进入此分支
    	        }
                // 有重复结点, 直接替换value即可, retries = 0, 不再进入尝试创建Node的分支
    	        else if (key.equals(e.key)) 
    	            retries = 0;
    	        else
    	            e = e.next;
    	    }
            // 3.循环MAX_SCAN_RETRIES之后, 不再while占用CPU资源了, 直接阻塞Lock
    	    else if (++retries > MAX_SCAN_RETRIES) { // MAX_SCAN_RETRIES = 64
    	        lock();
    	        break;
    	    }
            // 2.不再进入尝试创建Node的分支后, 在这里先循环 MAX_SCAN_RETRIES/2 次
            // 在奇数次进入的时候, 判断当前的链表头结点是否已经改变, 如果改变, 说明有其它线程put了Entry
            // 那么就要令 retries 再= -1 , 重新进入尝试创建的分支, 再检查是否有key重复的结点, 因为新添加的Entry可能重复
    	    else if ((retries & 1) == 0 &&
    	             (f = entryForHash(this, hash)) != first) {
    	        e = first = f; // re-traverse if entry changed
    	        retries = -1;
    	    }
    	}
    	return node;
    }
    

    扩容机制

    • 扩容函数名字是 rehash() ;
    • 扩容操作是被包裹在 segment.put 方法内部的锁的作用范围之内的; 所以必然是线程安全的
    • 其余的扩容机制和 HashMap 一致

    重点 和1.8的区别

    1. JDK1.8的时候, 不再采用分段的模式, 而是在 HashMap 的基础上, 采用 CAS 操作和 synchronized 实现线程安全;

    2. JDK1.8 的 ConcurrentHashMap 大体上和 HashMap 是一样的; 区别是:

      1. 插入的时候, 如果对应位置为null, 使用 CAS 的方式插入; 不是 null 则 synchronized 内完成插入
      2. 删除的时候, synchronized 内删除;
      3. 扩容的时候, CAS 的方式, 多线程扩容;
    3. CHM 为什么换成 JDK1.8 的实现?

      1. 代码和 HashMap 基本一致, 更清晰简单;
      2. 采用 CAS 和 synchronized 一起实现线程安全, 插入操作锁的是 Entry 数组的一个下标, 并发度更高了; 并且实现了安全的多线程扩容;
  • 相关阅读:
    Java服务频繁挂掉,内存溢出
    【OCR】基于RCNN-CTC的不定长文本识别
    艾美捷高纯度 Cholesterol胆固醇相关介绍
    二叉树的具体原理及实现
    使用.NET简单实现一个Redis的高性能克隆版(二)
    java-php-net-python-东软健身会员网站计算机毕业设计程序
    Elasticsearch7教程(3) 查询文档 term terms terms_set
    04 Springboot 格式化LocalDateTime
    企业AI机器人,客户服务的好帮手!
    在Sprinng Boot中使用Redis充当缓存
  • 原文地址:https://blog.csdn.net/wdx7770/article/details/141070902