• 【JUC系列-11】深入理解LinkedBlockingQueue的底层实现


    JUC系列整体栏目


    内容链接地址
    【一】深入理解JMM内存模型的底层实现原理https://zhenghuisheng.blog.csdn.net/article/details/132400429
    【二】深入理解CAS底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132478786
    【三】熟练掌握Atomic原子系列基本使用https://blog.csdn.net/zhenghuishengq/article/details/132543379
    【四】精通Synchronized底层的实现原理https://blog.csdn.net/zhenghuishengq/article/details/132740980
    【五】通过源码分析AQS和ReentrantLock的底层原理https://blog.csdn.net/zhenghuishengq/article/details/132857564
    【六】深入理解Semaphore底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/132908068
    【七】深入理解CountDownLatch底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133343440
    【八】深入理解CyclicBarrier底层原理和基本使用https://blog.csdn.net/zhenghuishengq/article/details/133378623
    【九】深入理解ReentrantReadWriteLock 读写锁的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133629550
    【十】深入理解ArrayBlockingQueue的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133692023
    【十一】深入理解LinkedBlockingQueue的底层实现https://blog.csdn.net/zhenghuishengq/article/details/133723652

    一,深入理解LinkedBlockingQueue的底层原理

    在上一篇中,了解了阻塞队列的基本api以及使用,同时也了解了通过数组的方式这个BlockingQueue,即ArrayBlockingQueue的底层原理和基本使用,接下来这篇的重点将是通过链表的方式实现这个BlockingQueue,即本文的主角 LinkedBlockingQueue

    链表阻塞队列,又被成为无界阻塞队列,虽然说是无界,但是其最大值是整数的最大值,也可以在构造方法中传入具体的值,这样就不会对机器造成大量的负载

    // 设置容量为10的阻塞队列
    LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
    
    • 1
    • 2

    通过链表的方式实现的阻塞队列依旧是适用于生产者消费者模型,相对于数组的实现,链表的实现在吞吐量方面效率会更高,因为链表内部用了两把互斥锁,即生产者和消费者各司其职,而数组内部只有一把互斥锁,需要不断地阻塞和等待。

    1,LinkedBlockingQueue的基本使用

    和上一篇的ArrayBlockingQueue一样,依旧实现选择使用生产者和消费者模型,来对这个LinkedBlockingQueue的使用做一个基本的展示

    首先定义一个全局的线程池的根据类ThreadPoolUtil,通过线程池来创建和管理线程

    /**
     * 线程池工具
     * @author zhenghuisheng
     * @date : 2023/3/22
     */
    public class ThreadPoolUtil {
    
        /**
         * io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
         *           核心线程数不超过2N即可,可以适当留点空间
         * cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
         *           核心线程数不超过N+1即可
         * 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
         */
    
        public static synchronized ThreadPoolExecutor getThreadPool() {
            if (pool == null) {
                //获取当前机器的cpu
                int cpuNum = Runtime.getRuntime().availableProcessors();
                log.info("当前机器的cpu的个数为:" + cpuNum);
                int maximumPoolSize = cpuNum * 2 ;
                pool = new ThreadPoolExecutor(
                        maximumPoolSize - 2,
                        maximumPoolSize,
                        5L,   //5s
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(),  //数组有界队列
                        Executors.defaultThreadFactory(), //默认的线程工厂
                        new ThreadPoolExecutor.AbortPolicy());  //直接抛异常,默认异常
            }
            return pool;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    定义一个实体类Product,定义产品的一些信息

    /**
     * @Author: zhenghuisheng
     * @Date: 2023/10/9 20:24
     */
    @Data
    public class Product {
        private Integer id;
        private String productName;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    随后创建一个生产者的线程任务Producer类,用于生产产品

    /**
     * 生产者线程
     * @Author: zhenghuisheng
     * @Date: 2023/10/8 20:21
     */
    @Data
    public class Producer implements Runnable {
    
        private LinkedBlockingQueue linkedBlockingQueue;
    
        public Producer(LinkedBlockingQueue linkedBlockingQueue){
            this.linkedBlockingQueue = linkedBlockingQueue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                Product product = new Product();
                product.setId(i);
                product.setProductName("商品" + i + "号");
                try {
                    //加入阻塞队列
                    linkedBlockingQueue.put(product);
                    System.out.println("生产者"  + i + "号生产完毕");
                    Thread.sleep(50);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    随后创建一个消费者线程Consumer类,用于消费产品

    /**
     * 消费者线程
     * @Author: zhenghuisheng
     * @Date: 2023/10/8 20:21
     */
    @Data
    public class Consumer implements Runnable {
        private LinkedBlockingQueue linkedBlockingQueue;
        public Consumer(LinkedBlockingQueue linkedBlockingQueue){
            this.linkedBlockingQueue = linkedBlockingQueue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                try {
                    //消费者消费
                    Object take = linkedBlockingQueue.take();
                    System.out.println(take);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("消费者消费完毕");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    打印结果如下:

    生产者0号生产完毕
    Product(id=0, productName=商品0号)
    Product(id=1, productName=商品1号)
    生产者1号生产完毕
    生产者2号生产完毕
    Product(id=2, productName=商品2号)

    可以发现,其结果和通过数组实现的方式的整个流程是一模一样的,结果也大致相同

    2,LinkedBlockingQueue的底层源码设计

    2.3,LinkedBlockingQueue类的基本属性

    在研究这个put和take的源码之前,先看一下这个类的基本属性和构造方法,该类也是继承了一个抽象类

    public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
    
    • 1

    首先查看他的构造方法,内部有两个构造方法,一个是无参构造,如果在开发中不传这个链表的数量,那么默认是使用整数的最大值,如果传了的话就使用外部传入的值。随后会创建一个Node结点组成的单向链表

    public LinkedBlockingQueue() {	//无参
        this(Integer.MAX_VALUE);	//整型最大值
    }
    public LinkedBlockingQueue(int capacity) {	//可以自定义参数
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);	//创建一个单向链表
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Node结点的属性如下,是一个内部的静态类,由于只有一个next指针,因此可以知道是一个单向链表

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    由于是单向链表,因此会有指针记录链表的头结点和尾结点

    transient Node<E> head;			//链表头结点
    private transient Node<E> last;	//链表尾结点
    
    • 1
    • 2

    除了上面的这些之外,还有两个重要的属性,和数组实现的最大的区别来了,在链表内部使用了两把互斥锁,内部也使用了两个祖苏队列,出队时为空的阻塞队列和入队时满了的阻塞队列

    //take出队使用的互斥锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //出队时队列为空的阻塞队列
    private final Condition notEmpty = takeLock.newCondition();
    //put入队使用的互斥锁
    private final ReentrantLock putLock = new ReentrantLock();
    //put入队队列满了的阻塞队列
    private final Condition notFull = putLock.newCondition();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.2,LinkedBlockingQueue入队操作

    接下来重点研究put入队的操作,put的方法如下,put方法中使用的是putLock这把互斥锁,和数组一样,也是队列没满就加入,队列满了就阻塞。但是,内部多加了几个逻辑,比如内部容量没满则会唤醒生产者,从条件队列中加入到同步队列,并且做了一个数量为0的判断,一开始是-1,当为0时表示有值,则会去唤醒消费者去消费

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;	//入队的互斥锁
        final AtomicInteger count = this.count;		//队列中结点的数量
        putLock.lockInterruptibly();				//该锁支持可中断
        try {	
            while (count.get() == capacity) {		//如果队列满了
                notFull.await();					//将结点加入到条件队列中,线程阻塞
            }
            enqueue(node);							//没满则入队
            c = count.getAndIncrement();			//结点数+1
            if (c + 1 < capacity)					//此时队列中结点没满,可能被消费者批量消费了
                notFull.signal();					//唤醒被阻塞生产者线程,提高吞吐量
        } finally {
            putLock.unlock();						//解锁
        }
        if (c == 0)
            signalNotEmpty();						
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在下面的这段代码中,就是通过判断容量是否为满了来实现自身的唤醒,从而实现条件队列中的结点出队,然后转换到同步队列中,这样子就不必要等到消费者来唤醒,自身生产者就能唤醒,这样增加了吞吐量。

    其主要原因是在消费者take的时候,只有在这个c == 容量的时候,才会去唤醒这个生产者消费,假设生产者每次生产5个,消费者每次消费4个,那么在生产端和消费端不对等的条件下,一定会出现容量不满,但是条件队列有又不被唤醒,因此在自身内部唤醒,大大的提高的系统的吞吐量。况且如果在容量不为满的时候,要是消费者不消费,那不是得一直阻塞着。

    if (c + 1 < capacity)
    	notFull.signal();
    if (c == capacity)
        signalNotFull();
    
    • 1
    • 2
    • 3
    • 4

    入队的方法如下,由于是链表入队,因此比较简单,第一步是last记录的尾指针结点的next指针指向这个Node结点,第二步就是将这个Node结点作为尾指针记录

    private void enqueue(Node<E> node) {
        last = last.next = node;	//结点入队
    }
    
    • 1
    • 2
    • 3

    入队之后会有一个finally,主要是用于解锁,就是将同步队列的结点唤醒,队头出队等功能

     finally {
        putLock.unlock();						//解锁
    }
    
    • 1
    • 2
    • 3

    2.3,LinkedBlockingQueue出队操作

    在讲解完put操作之后,再来讲解消费者的take操作,其实现如下。内部使用的是另外一把互斥锁takeLock,如果队列为空,则阻塞,否则进入出队的逻辑

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;	//互斥锁
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {	//如果队列为空
                notEmpty.await();		//将结点加入条件队列阻塞
            }
            x = dequeue();				//结点不为空则出队
            c = count.getAndDecrement();	//减1
            if (c > 1)					//这里的设计和上面的一样
                notEmpty.signal();	//主要是为put方法c = 0才会唤醒,如果消费和生产不一致,则主动唤醒
        } finally {
            takeLock.unlock();	//解锁
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    出队的逻辑如下,如果队列不为空,则出队。内部主要是将头结点的下一个结点做为头结点。并且为了更好的GC,将原来的头结点自己指向自己,这样减少GC root的引用链,根据可达性的分析的算法,这个结点会被垃圾回收器回收

    private E dequeue() {
        Node<E> h = head;	//获取头结点
        Node<E> first = h.next;	//将头结点的下一个结点作为头结点
        h.next = h; 			//自己指向自己,没有被gcroot引用,会被回收
        head = first;			//head指针指向头结点
        E x = first.item;	
        first.item = null;
        return x;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    内部有一段和put类似的,就是如果是队列还有值,就唤醒因为队列为空而阻塞的线程结点,因为在put方法中只有为0时才会唤醒,如果生产者和消费者的速度不一致,那么肯定会出现队列不为空,但是不唤醒的操作,因此在这了引入了这个自身唤醒的方法,从而提高吞吐量

    if (c > 1)
        notEmpty.signal();
    if (c == 0)	//put方法中唤醒消费者的条件
        signalNotEmpty();	
    
    • 1
    • 2
    • 3
    • 4

    最后就是这个finally中的unlock,主要是用于解锁,就是将同步队列的结点唤醒,队头出队等功能

     finally {
        putLock.unlock();						//解锁
    }
    
    • 1
    • 2
    • 3

    3,总结

    如果说数组实现的精髓在于环状数组+双指针的设计,那么链表实现的精髓就在于这个put和take提前唤醒条件队列中被阻塞的结点,从而提高入队和出队的效率,减少大量节点的阻塞问题。

    链表实现的阻塞队列内部用了两把互斥锁,以及两个阻塞队列,让take和put的职责更加的单一,从而提高整个系统的吞吐量。在队列为空是出队的线程结点会被阻塞,在队列满是入队的线程结点会被阻塞。

    数组和链表实现的阻塞队列都是通过条件等待队列+同步等待队列来实现,但是链表的吞吐量会高于数组,链表需要注意的是容量的设置,如果不设置容量的参数,那么会很容易的出现OOM的情况,因此在实际的开发中,在面临数组和链表实现阻塞队列时,可以优先的考虑使用设置容量的链表阻塞队列,效率相对会更高。

    默认这个LinkedBlockingQueue是一个无界队列,如果在构造方法中传参,那么也可以认为是一个有界队列

  • 相关阅读:
    鸿蒙会成为安卓的终结者吗?
    机器学习中的核方法
    市场整改篇之应用宝报告
    Redis设计与实现笔记 - 数据结构篇
    【开源】基于Vue.js的车险自助理赔系统的设计和实现
    开源 RPA 和并行处理的力量
    DPDK vhost库
    网站测试都要测试哪些?如何进行测试?
    使用Matlab软件绘制函数图像
    CAS机制的的解释和总结
  • 原文地址:https://blog.csdn.net/zhenghuishengq/article/details/133723652