• ConcurrentHashMap-1


    ConcurrentHashMap

            在日常开发中使用的HashMap是线程不安全的,而线程安全类Hashtable只是简单的在方法上加锁实现线程安全,效率低下,所以在线程安全的环境下通常会使用ConcurrentHashMap

    ConcurrentHashMap中的节点类型

    1)Node节点

    1. static final class TreeNode extends Node {
    2. TreeNode parent; // red-black tree links
    3. TreeNode left;
    4. TreeNode right;
    5. TreeNode prev; // needed to unlink next upon deletion
    6. boolean red;
    7. }

     TreeNode就是红黑树的结点,TreeNode不会直接链接到table[i]——桶上面,而是由TreeBin链接,TreeBin会指向红黑树的根结点。
    2)TreeBin节点

    1. static final class TreeBin extends Node {
    2. TreeNode root;
    3. volatile TreeNode first;
    4. volatile Thread waiter;
    5. volatile int lockState;
    6. // values for lockState
    7. static final int WRITER = 1; // set while holding write lock
    8. static final int WAITER = 2; // set when waiting for write lock
    9. static final int READER = 4; // increment value for setting read lock
    10. }

            TreeBin会直接链接到table[i]——桶上面,该结点提供了一系列红黑树相关的操作,以及加锁、解锁操作。另外TreeBin提供了一系列的操作

    - TreeBin(TreeNode b),将以b为头结点的链表转换为红黑树

    - lockRoot(),对红黑树的根结点加写锁

    - unlockRoot(),释放写锁

    - find(int h, Object k),从根结点开始遍历查找,找到相等的结点就返回它,没找到就返回null,

    当存在写锁时,以链表方式进行查找,不阻塞读锁

    3)ForwardingNode

    1. static final class ForwardingNode extends Node {
    2. final Node[] nextTable;
    3. }

            ForwardingNode在table扩容时使用,内部记录了扩容后的table,即nextTable

    - 当table需要扩容时,依次遍历table中的每个槽,如果不为null,把所有元素根据hash值放入扩容后的nextTable中,在原table的槽内放置一个ForwardingNode

    - ForwardingNode是一种临时结点,在扩容进行中才会出现,hash值固定为-1,且不存储实际数据

    - 如果旧table数组的一个hash桶中全部的结点都迁移到了新table中,则在这个桶中放置一个ForwardingNode

    - 读操作碰到ForwardingNode时,将操作转发到扩容后的新table数组上去执行;写操作碰见它时,则尝试帮助扩容,扩容是支持多线程一起扩容

    4)ReservationNode保留结点

    1. static final class ReservationNode extends Node {
    2. ReservationNode() {
    3. super(RESERVED, null, null); //-3
    4. }
    5. Node find(int h, Object k) {
    6. return null;
    7. }
    8. }

    - 在并发场景下、在从Key不存在到插入的时间间隔内,为了防止哈希槽被其他线程抢占,当前线程会使用一个reservationNode节点放到槽中并加锁,从而保证线程安全

    - hash值固定为-3,不保存实际数据。只在computeIfAbsent和compute这两个函数式API中充当占位符加锁使用

    构造器

    1. public ConcurrentHashMap() {
    2. }
    3. public ConcurrentHashMap(int initialCapacity) { 初始化容积--数组长度
    4. this(initialCapacity, LOAD_FACTOR, 1);
    5. }
    6. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    7. if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
    8. throw new IllegalArgumentException();
    9. if (initialCapacity < concurrencyLevel) // Use at least as many bins
    10. initialCapacity = concurrencyLevel; // as estimated threads
    11. long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    12. int cap = (size >= (long)MAXIMUM_CAPACITY) ?
    13. MAXIMUM_CAPACITY : tableSizeFor((int)size);
    14. this.sizeCtl = cap;
    15. }

            初始化ConcurrentHashMap的时候这个`Node[]`数组是还未初始化的,会等到第一次put方法调用时才初始化

    1. //不允许空的key和value,否则异常
    2. final V putVal(K key, V value, boolean onlyIfAbsent) {
    3. if (key == null || value == null) throw new NullPointerException();
    4. ... ...
    5. //判断Node数组为空
    6. if (tab == null || (n = tab.length) == 0)
    7. //初始化Node数组
    8. tab = initTable();
    9. }

     此时是会有并发问题的

    1. private final Node[] initTable() {
    2. Node[] tab; int sc;
    3. //每次循环都获取最新的Node数组引用
    4. while ((tab = table) == null || tab.length == 0) {
    5. //sizeCtl是一个标记位,若为-1也就是小于0,代表有线程在进行初始化工作了
    6. if ((sc = sizeCtl) < 0)
    7. //让出CPU时间片
    8. Thread.yield(); // lost initialization race; just spin
    9. //CAS操作,将本实例的sizeCtl变量设置为-1
    10. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    11. //如果CAS操作成功了,代表本线程将负责初始化工作
    12. try {
    13. //再检查一遍数组是否为空
    14. if ((tab = table) == null || tab.length == 0) {
    15. //在初始化Map时,sizeCtl代表数组大小,默认16
    16. //所以此时n默认为16
    17. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    18. @SuppressWarnings("unchecked")
    19. //Node数组
    20. Node[] nt = (Node[])new Node[n];
    21. //将其赋值给table变量
    22. table = tab = nt;
    23. //通过位运算,n减去n二进制右移2位,相当于乘以0.75
    24. //例如16经过运算为12,与乘0.75一样,只不过位运算更快
    25. sc = n - (n >>> 2);
    26. }
    27. } finally {
    28. //将计算后的sc(12)直接赋值给sizeCtl,表示达到12长度就扩容
    29. //由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题
    30. //只需要保证可见性
    31. sizeCtl = sc;
    32. }
    33. break;
    34. }
    35. }
    36. return tab
    37. }

       table变量使用了volatile来保证每次获取到的都是最新写入的值:
            transient volatile Node[] table;

            就算有多个线程同时进行put操作,在初始化数组时使用了乐观锁CAS操作来决定到底是哪个线程有资格进行初始化,其他线程均只能等待。

    用到的并发技巧:

    - volatile变量(sizeCtl):它是一个标记位,用来告诉其他线程这个坑位有没有人在,其线程间的可见性由volatile保证。

    - CAS操作:CAS操作保证了设置sizeCtl标记位的原子性,保证了只有一个线程能设置成功

    put操作的线程安全

    1. //put操作的线程安全
    2. final V putVal(K key, V value, boolean onlyIfAbsent) {
    3. if (key == null || value == null)
    4. throw new NullPointerException();
    5. //对key的hashCode进行散列
    6. int hash = spread(key.hashCode());
    7. int binCount = 0;
    8. //一个无限循环,直到put操作完成后退出循环
    9. for (Node[] tab = table;;) {
    10. Node f; int n, i, fh;
    11. //当Node数组为空时进行初始化
    12. if (tab == null || (n = tab.length) == 0)
    13. tab = initTable();
    14. //Unsafe类volatile的方式取出hashCode散列后通过与运算得出的Node数组下标值对应的Node对象
    15. //此时的Node对象若为空,则代表还未有线程对此Node进行插入操作
    16. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    17. //直接CAS方式插入数据
    18. if (casTabAt(tab, i, null, new Node(hash, key, value, null)))
    19. //插入成功,退出循环
    20. break; // no lock when adding to empty bin
    21. }
    22. //查看是否在扩容,先不看,扩容再介绍
    23. else if ((fh = f.hash) == MOVED)
    24. //帮助扩容
    25. tab = helpTransfer(tab, f);
    26. else {
    27. V oldVal = null;
    28. //对Node对象进行加锁
    29. synchronized (f) {
    30. //二次确认此Node对象还是原来的那一个
    31. if (tabAt(tab, i) == f) {
    32. if (fh >= 0) {
    33. binCount = 1;
    34. //无限循环,直到完成put
    35. for (Node e = f;; ++binCount) {
    36. K ek;
    37. //和HashMap一样,先比较hash,再比较equals
    38. if (e.hash == hash &&
    39. ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
    40. oldVal = e.val;
    41. if (!onlyIfAbsent)
    42. e.val = value;
    43. break;
    44. }
    45. Node pred = e;
    46. if ((e = e.next) == null) {
    47. //和链表头Node节点不冲突,就将其初始化为新Node作为上一个Node节点的next
    48. //形成链表结构
    49. pred.next = new Node(hash, key,value, null);
    50. break;
    51. }
    52. }
    53. }
    54. }

            值得关注的是tabAt(tab, i)方法,其使用Unsafe类volatile的操作volatile式地查看值,保证每次获取到的值都是最新的:

    1. static final Node tabAt(Node[] tab, int i) {
    2. return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    3. }

            由于其减小了锁的粒度,若Hash完美不冲突的情况下,可同时支持n个线程同时put操作,n为Node数组大小,在默认大小16下,可以支持最大同时16个线程无竞争同时操作且线程安全。hash冲突严重时,Node链表越来越长,将导致严重的锁竞争,此时会进行扩容,将Node进行再散列。

    用到的并发技巧:

    - 减小锁粒度:将Node链表的头节点作为锁,若在默认大小16情况下,将有16把锁,大大减小了锁竞争(上下文切换),就像开头所说,将串行的部分最大化缩小,在理想情况下线程的put操作都为并行操作。同时直接锁住头节点,保证了线程安全

    - Unsafe的getObjectVolatile方法:此方法确保获取到的值为最新。

    扩容操作的线程安全

    在扩容时,ConcurrentHashMap支持多线程并发扩容,在扩容过程中同时支持get查数据,若有线程put数据,还会帮助一起扩容,这种无阻塞算法将并行最大化

    在put值的时候,首先会计算hash值,再散列到指定的Node数组下标中

    //根据key的hashCode再散列int hash = spread(key.hashCode());

    //使用(n - 1) & hash 运算,定位Node数组中下标值,实际上就是执行 hash % n(f = tabAt(tab, i = (n - 1) & hash);

    扩容时的get操作

    1. public V get(Object key) {
    2. Node[] tab; Node e, p; int n, eh; K ek;
    3. int h = spread(key.hashCode());
    4. if ((tab = table) != null && (n = tab.length) > 0 &&
    5. (e = tabAt(tab, (n - 1) & h)) != null) {
    6. if ((eh = e.hash) == h) {
    7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
    8. return e.val;
    9. }
    10. //假如Node节点的hash值小于0,则有可能是fwd节点
    11. else if (eh < 0)
    12. //调用节点对象的find方法查找值
    13. return (p = e.find(h, key)) != null ? p.val : null;
    14. while ((e = e.next) != null) {
    15. if (e.hash == h &&
    16. ((ek = e.key) == key || (ek != null && key.equals(ek))))
    17. return e.val;
    18. }
    19. }
    20. return null;
    21. }

     在get操作的源码中,会判断Node中的hash是否小于0,是否还记得我们的占位Node,其hash为MOVED,为常量值-1,所以此时判断线程正在迁移,委托给fwd占位Node去查找值。

  • 相关阅读:
    强静态类型,真的无敌
    C语言判断一个数转二进制的某位是1或者0
    互联网应用主流框架整合之Spring Boot运维体系
    springboot大学生拼车管理系统毕业设计源码201507
    linux日常记录1
    ACA-PEG-NH2,丙烯酰胺PEG氨基
    局部变量,全局变量与内存
    Vue 条件渲染 与 列表渲染
    IDEA插件开发(3)---捆绑插件 API 源
    SwiftUI 中使用 SpriteKit 创建雨动画效果(教程含源码)
  • 原文地址:https://blog.csdn.net/m0_56627229/article/details/126732317