• java并发编程 ConcurrentLinkedQueue详解



    java 并发编程系列文章目录

    1 ConcurrentLinkedQueue是什么

    ConcurrentLinkedQueue是一个无界的并发队列,和LinkedBlockingQueue相比,它是通过完全的cas实现的,是非阻塞的。LinkedBlockingQueue是通过ReentrantLock实现的,提供了一些阻塞方法,如take() put()。

    2 核心属性详解

    	//链表的头和尾节点
        private transient volatile Node<E> head;
        private transient volatile Node<E> tail;
        //Node的数据结构
        private static class Node<E> {
        	//保存的元素
            volatile E item;
            //单向链表的当前Node的next节点
            volatile Node<E> next;
            Node(E item) {
                UNSAFE.putObject(this, itemOffset, item);
            }
    		//cas设置当前item值
            boolean casItem(E cmp, E val) {
                return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }
    		//设置next节点的值
            void lazySetNext(Node<E> val) {
                UNSAFE.putOrderedObject(this, nextOffset, val);
            }
    		//cas设置next节点的值
            boolean casNext(Node<E> cmp, Node<E> val) {
                return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
            //...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    3 核心方法详解

    3.1 add(E e)

    调用offer方法。在

    public boolean add(E e) {
    	return offer(e);
    }
    
    • 1
    • 2
    • 3

    3.2 offer(E e)

    	首先在看这个方法之前,先了解一个掌握逻辑的方法。因为下面代码是无锁自旋(cas)代码,所以有很多触发条件,如果直接看是很难懂,
    所以这里的小技巧是先不管多线程,去看逻辑。如下面的for循环,你先按照单线程调用了3~4次看看数据变化。先掌握它正常逻辑下的数
    据结构的变化。因为是单向链表,看看节点之间是怎么变化的。
    
    • 1
    • 2
    • 3

    看下面流程再回头看这段代码

        public boolean offer(E e) {
            checkNotNull(e);
            final Node<E> newNode = new Node<E>(e);
            Node<E> t = tail;
            Node<E> p = t;
            for (;;) {
                Node<E> q = p.next;
                if (q == null) {
                    // 追加节点 原子性操作,会有失败的情况
                    if (p.casNext(null, newNode)) {
                        // 跃过第一次设置tail
                        if (p != t) // hop two nodes at a time
                        	//设置尾节点
                            casTail(t, newNode);  // Failure is OK.
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
                //poll情况,即存和取同时发生
                else if (p == q)
                    // We have fallen off list.  If tail is unchanged, it
                    // will also be off-list, in which case we need to
                    // jump to head, from which all live nodes are always
                    // reachable.  Else the new tail is a better bet.
                    p = (t != (t = tail)) ? t : head;
                else
                	// 第二次设置的时候q!=null的情况重新设置p节点往后移
                    // Check for tail updates after two hops.
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    1.如果当前链表中无元素,此时根据构造器可知 head = tail = new Node<>(null); 此时添加一个元素。如图所示
    在这里插入图片描述
    此时 p.next == null 成立,所以会进入 casNext语句。此时成功了 p == t 是true, 所以返回true结束,此时数据结构变成下图
    在这里插入图片描述
    此时我再添加一个元素,p.next != null了,p == q也不成立, 所以走到最后一个else:p = (p != t && t != (t = tail)) ? t : q;
    这段逻辑相当于 t =tail; 因为p == t 所以 p 变成了q。再次循环。
    此时p就是NODE1了 q 是null了 走p.casNext设置NODE2 称为NODE1的next节点。注意!! 此时tail.next还是NODE1。如下图
    在这里插入图片描述
    此时再添加一个元素呢
    此时流程中会命中p != t 重新设置tail, 此时node3就是tail
    在这里插入图片描述

    3.3 poll()

    在这里插入图片描述
    首先现在的数据结构是这样。

    1. 执行刚开始的时候 p 指向的是head 此时p的item == null。执行到 else p = q; 注意因为执行到else if ((q = p.next) 此时q = p.next,即p的下标到了head.next。此时在判断item是否是null 此时不是null了,去除n1 然后cas设置为null 此时p != h 因为往后移了一下,又因为 node1.next !=null 所以更新head为未node2。
      此时如下图
      在这里插入图片描述
    2. 如果再次poll
      此时 p指向的是node2, 此时p.item != null 所以直接cas 设置成null 此时p == h成立 直接return,此时如下图
      在这里插入图片描述
      其实head是没动的。下次呢 此时item是空,那么又会向第一步一样。至此正常流程已经分析完
    	public E poll() {
            restartFromHead:
            for (;;) {
            	Node<E> h = head;
            	Node<E> p = h;
            	Node<E> q = null;
                for (;;) {
                    E item = p.item;
    
                    if (item != null && p.casItem(item, null)) {
                        // Successful CAS is the linearization point
                        // for item to be removed from this queue.
                        if (p != h) // hop two nodes at a time
                            updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
                    else if ((q = p.next) == null) {
                        updateHead(h, p);
                        return null;
                    }
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3.4 size()

    注意,因为他没有维护count字段,所以他计算数量是遍历计算的。不维护是因为上面是通过cas方式+循环保证原子性的,如果在加一个count字段,那失败重试的概率将大大增加

    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.5 并发情况分析

    上面已经分析了核心的入队列和出队列的两个方法,他不是实时更新head和tail节点,而是通过一次循环之后更新head和tail节点.
    此时并发情况下,cas保证了原子性的设置。

    1. offer方法
      **p.casNext(null, newNode)**保证了原子性的追加链表元素。成功了设置tail 此时第一步成功不代表第二步(casTail(t, newNode))一定成功,因为此时可能别的线程已经改了tail。失败了怎么办呢? 失败了其实就是其他线程在offer的时候多循环几次,但是总有一个线程可以把第二步成功,也就是tail最后会回到尾部的。
      p == q的情况,即p = p.next 出现这种情况就是此时p已经被移除

    2. poll方法
      (q = p.next) == null的情况是p从链表中删除,此时重新循环链表

    4 总结

    相对于LinkedBlockingQueue, 它实现了无锁化的方式。因为cas+for这种方式的逻辑很难梳理。所以大致了解思路吧。

  • 相关阅读:
    工作杂记-YUV的dump和read
    在 Docker 容器内集成 Crontab 定时任务
    基于Springboot+MYSQL+Maven实现的宠物医院管理系统(源码+数据库+运行指导文档+项目运行指导视频)
    vue+elementUI el-table实现单选
    运用谷歌浏览器的开发者工具,模拟搜索引擎蜘蛛抓取网页
    用户运营体系中,用户精细化运营闭环是怎样的
    【好文翻译】Difference Between Next.js vs. Nuxt.js vs. Nest.js
    Python 图片处理
    MySQL事务管理
    Vue路由简介
  • 原文地址:https://blog.csdn.net/weixin_46082526/article/details/132656078