• 多线程之JUC队列与数组


    1 LinkedBlockingQueue

    1 入队出队

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
     implements BlockingQueue<E>, java.io.Serializable {
     static class Node<E> {
     E item;
     /**
     * 下列三种情况之一
     * - 真正的后继节点
     * - 自己, 发生在出队时
     * - null, 表示是没有后继节点, 是最后了
     */
     Node<E> next;
     Node(E x) { item = x; }
     }
    }
    

    初始化链表 last = head = new Node(null); Dummy 节点用来占位,item 为 null.

    在这里插入图片描述

    当一个节点入队 last = last.next = node;

    在这里插入图片描述

    再来一个节点入队 last = last.next = node;

    在这里插入图片描述

    出队

    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
    

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    E x = first.item;
    first.item = null;
    return x;
    

    在这里插入图片描述

    2 加锁

    用了两把锁和 dummy 节点

    • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
    • 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
      • 消费者与消费者线程仍然串行
      • 生产者与生产者线程仍然串行

    说明:

    • 当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争
    • 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
    • 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
    // 用于 put(阻塞) offer(非阻塞)
    private final ReentrantLock putLock = new ReentrantLock();
    // 用户 take(阻塞) poll(非阻塞)
    private final ReentrantLock takeLock = new ReentrantLock();
    

    put操作

    public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
     int c = -1;
     Node<E> node = new Node<E>(e);
     final ReentrantLock putLock = this.putLock;
     // count 用来维护元素计数
     final AtomicInteger count = this.count;
     putLock.lockInterruptibly();
     try {
     // 满了等待
     while (count.get() == capacity) {
     // 倒过来读就好: 等待 notFull
     notFull.await();
     }
     // 有空位, 入队且计数加一
     enqueue(node);
     c = count.getAndIncrement(); 
     // 除了自己 put 以外, 队列还有空位, 由自己叫醒其他 put 线程
     if (c + 1 < capacity)
     notFull.signal();
     } finally {
     putLock.unlock();
     }
     // 如果队列中有一个元素, 叫醒 take 线程
     if (c == 0)
     // 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争
     signalNotEmpty();
    }
    

    take操作

    public E take() throws InterruptedException {
     E x;
     int c = -1;
     final AtomicInteger count = this.count;
     final ReentrantLock takeLock = this.takeLock;
     takeLock.lockInterruptibly();
     try {
     while (count.get() == 0) {
     notEmpty.await();
     }
     x = dequeue();
     c = count.getAndDecrement();
     if (c > 1)
     notEmpty.signal();
     } finally {
     takeLock.unlock();
     }
     // 如果队列中只有一个空位时, 叫醒 put 线程
     // 如果有多个线程进行出队, 第一个线程满足 c == capacity, 但后续线程 c < capacity
     if (c == capacity)
     // 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争
     signalNotFull()
     return x;
    }
    

    由 put 唤醒 put 是为了避免信号不足

    3 性能

    LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较:

    • Linked 支持有界,Array 强制有界
    • Linked 实现是链表,Array 实现是数组
    • Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
    • Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
    • Linked 两把锁,Array 一把锁

    2 ConcurrentLinkedQueue

    ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue类似:

    • 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
    • dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争

    不同点:

    • 只是这【锁】使用了 cas 来实现

    Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了 ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用.

    案例

    package cf..concurrent.thirdpart.test;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.Queue;
    import java.util.concurrent.atomic.AtomicReference;
    public class Test3 {
     public static void main(String[] args) {
     MyQueue<String> queue = new MyQueue<>();
     queue.offer("1");
     queue.offer("2");
     queue.offer("3");
     System.out.println(queue);
     }
    }
    
    class MyQueue<E> implements Queue<E> {
     @Override
     public String toString() {
     StringBuilder sb = new StringBuilder();
     for (Node<E> p = head; p != null; p = p.next.get()) {
     E item = p.item;
     if (item != null) {
     sb.append(item).append("->");
     }
     }
     sb.append("null");
     return sb.toString();
     }
     @Override
     public int size() {
     return 0;
     }
     @Override
     public boolean isEmpty() {
     return false;
     }
     @Override
     public boolean contains(Object o) {
     return false;
     }
     @Override
     public Iterator<E> iterator() {
     return null;
     }
     @Override
     public Object[] toArray() {
     return new Object[0];
     }
     @Override
     public <T> T[] toArray(T[] a) {
     return null;
     }
     @Override
     public boolean add(E e) {
     return false;
     }
     @Override
     public boolean remove(Object o) {
     return false;
     }
     @Override
     public boolean containsAll(Collection<?> c) {
     return false;
     }
     @Override
     public boolean addAll(Collection<? extends E> c) {
     return false;
     }
     @Override
     public boolean removeAll(Collection<?> c) {
     return false;
     }
     @Override
     public boolean retainAll(Collection<?> c) {
     return false;
     }
     @Override
     public void clear() {
     }
     @Override
     public E remove() {
     return null;
     }
     @Override
     public E element() {
     return null;
     }
     @Override
     public E peek() {
     return null;
     }
     public MyQueue() {
     head = last = new Node<>(null, null);
     }
     private volatile Node<E> last;
     private volatile Node<E> head;
     private E dequeue() {
     /*Node h = head;
     Node first = h.next;
     h.next = h;
     head = first;
     E x = first.item;
     first.item = null;
     return x;*/
     return null;
     }
     @Override
     public E poll() {
     return null;
     }
     @Override
     public boolean offer(E e) {
     return true;
     }
     static class Node<E> {
     volatile E item;
     public Node(E item, Node<E> next) {
     this.item = item;
     this.next = new AtomicReference<>(next);
     }
     AtomicReference<Node<E>> next;
     }
    }
    
    public boolean offer(E e) {
     Node<E> n = new Node<>(e, null);
     while(true) {
     // 获取尾节点
     AtomicReference<Node<E>> next = last.next;
     // S1: 真正尾节点的 next 是 null, cas 从 null 到新节点
     if(next.compareAndSet(null, n)) {
     // 这时的 last 已经是倒数第二, next 不为空了, 其它线程的 cas 肯定失败
     // S2: 更新 last 为倒数第一的节点
     last = n;
     return true;
     }
     }
    }
    

    3 CopyOnWriteArrayList

    CopyOnWriteArraySet 是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更 改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。

    新增

    public boolean add(E e) {
     synchronized (lock) {
     // 获取旧的数组
     Object[] es = getArray();
     int len = es.length;
     // 拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)
     es = Arrays.copyOf(es, len + 1);
     // 添加新元素
     es[len] = e;
     // 替换旧的数组
     setArray(es);
     return true;
     }
    }
    

    上面为Java11版本源码, Java8中是可重入锁.

    读取

    未加锁

    public void forEach(Consumer<? super E> action) {
     Objects.requireNonNull(action);
     for (Object x : getArray()) {
     @SuppressWarnings("unchecked") E e = (E) x;
     action.accept(e);
     }
    }
    

    适合读多写少场景

    存在问题

    在这里插入图片描述

    在线程0读取数组, 线程1读取数组, 然后线程1去删除数组元素,线程0任然可以读取数据.

    迭代器弱一致性

    CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
    list.add(1);
    list.add(2);
    list.add(3);
    Iterator<Integer> iter = list.iterator();
    new Thread(() -> {
     list.remove(0);
     System.out.println(list);
    }).start();
    sleep1s();
    while (iter.hasNext()) {
     System.out.println(iter.next());
    }
    
    • 数据库的 MVCC 都是弱一致性的表现

    • 并发高和一致性是矛盾的,需要权衡

  • 相关阅读:
    【JVM内存区域及创建对象的过程】
    数据统计-EXCEL中常用函数及操作
    设计模式面试知识点总结
    C++中嵌入汇编语言的方法(这个方法被证明在64位电脑上使用visual studio没有用)
    Vue-router的动态路由:获取传递的值
    使用kafka的几种场景
    1796. 字符串中第二大的数字
    力扣第463题 岛屿的周长 C++ 深度优先搜索 + 思维判断的边界
    【java】【重构一】分模块开发设计实战
    Salesforce业务分析师(BA)认证—备考指南
  • 原文地址:https://blog.csdn.net/ABestRookie/article/details/127034662