JUC系列整体栏目
内容 | 链接地址 |
---|---|
【一】深入理解JMM内存模型的底层实现原理 | https://zhenghuisheng.blog.csdn.net/article/details/132400429 |
【二】深入理解CAS底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/132478786 |
【三】熟练掌握Atomic原子系列基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/132543379 |
【四】精通Synchronized底层的实现原理 | https://blog.csdn.net/zhenghuishengq/article/details/132740980 |
【五】通过源码分析AQS和ReentrantLock的底层原理 | https://blog.csdn.net/zhenghuishengq/article/details/132857564 |
【六】深入理解Semaphore底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/132908068 |
【七】深入理解CountDownLatch底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/133343440 |
【八】深入理解CyclicBarrier底层原理和基本使用 | https://blog.csdn.net/zhenghuishengq/article/details/133378623 |
【九】深入理解ReentrantReadWriteLock 读写锁的底层实现 | https://blog.csdn.net/zhenghuishengq/article/details/133629550 |
【十】深入理解ArrayBlockingQueue的底层实现 | https://blog.csdn.net/zhenghuishengq/article/details/133692023 |
【十一】深入理解LinkedBlockingQueue的底层实现 | https://blog.csdn.net/zhenghuishengq/article/details/133723652 |
在上一篇中,了解了阻塞队列的基本api以及使用,同时也了解了通过数组的方式这个BlockingQueue,即ArrayBlockingQueue的底层原理和基本使用,接下来这篇的重点将是通过链表的方式实现这个BlockingQueue,即本文的主角 LinkedBlockingQueue
链表阻塞队列,又被成为无界阻塞队列,虽然说是无界,但是其最大值是整数的最大值,也可以在构造方法中传入具体的值,这样就不会对机器造成大量的负载
// 设置容量为10的阻塞队列
LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
通过链表的方式实现的阻塞队列依旧是适用于生产者消费者模型,相对于数组的实现,链表的实现在吞吐量方面效率会更高,因为链表内部用了两把互斥锁,即生产者和消费者各司其职,而数组内部只有一把互斥锁,需要不断地阻塞和等待。
和上一篇的ArrayBlockingQueue一样,依旧实现选择使用生产者和消费者模型,来对这个LinkedBlockingQueue的使用做一个基本的展示
首先定义一个全局的线程池的根据类ThreadPoolUtil,通过线程池来创建和管理线程
/**
* 线程池工具
* @author zhenghuisheng
* @date : 2023/3/22
*/
public class ThreadPoolUtil {
/**
* io密集型:最大核心线程数为2N,可以给cpu更好的轮换,
* 核心线程数不超过2N即可,可以适当留点空间
* cpu密集型:最大核心线程数为N或者N+1,N可以充分利用cpu资源,N加1是为了防止缺页造成cpu空闲,
* 核心线程数不超过N+1即可
* 使用线程池的时机:1,单个任务处理时间比较短 2,需要处理的任务数量很大
*/
public static synchronized ThreadPoolExecutor getThreadPool() {
if (pool == null) {
//获取当前机器的cpu
int cpuNum = Runtime.getRuntime().availableProcessors();
log.info("当前机器的cpu的个数为:" + cpuNum);
int maximumPoolSize = cpuNum * 2 ;
pool = new ThreadPoolExecutor(
maximumPoolSize - 2,
maximumPoolSize,
5L, //5s
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), //数组有界队列
Executors.defaultThreadFactory(), //默认的线程工厂
new ThreadPoolExecutor.AbortPolicy()); //直接抛异常,默认异常
}
return pool;
}
}
定义一个实体类Product,定义产品的一些信息
/**
* @Author: zhenghuisheng
* @Date: 2023/10/9 20:24
*/
@Data
public class Product {
private Integer id;
private String productName;
}
随后创建一个生产者的线程任务Producer类,用于生产产品
/**
* 生产者线程
* @Author: zhenghuisheng
* @Date: 2023/10/8 20:21
*/
@Data
public class Producer implements Runnable {
private LinkedBlockingQueue linkedBlockingQueue;
public Producer(LinkedBlockingQueue linkedBlockingQueue){
this.linkedBlockingQueue = linkedBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
Product product = new Product();
product.setId(i);
product.setProductName("商品" + i + "号");
try {
//加入阻塞队列
linkedBlockingQueue.put(product);
System.out.println("生产者" + i + "号生产完毕");
Thread.sleep(50);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
随后创建一个消费者线程Consumer类,用于消费产品
/**
* 消费者线程
* @Author: zhenghuisheng
* @Date: 2023/10/8 20:21
*/
@Data
public class Consumer implements Runnable {
private LinkedBlockingQueue linkedBlockingQueue;
public Consumer(LinkedBlockingQueue linkedBlockingQueue){
this.linkedBlockingQueue = linkedBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
//消费者消费
Object take = linkedBlockingQueue.take();
System.out.println(take);
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("消费者消费完毕");
}
}
打印结果如下:
生产者0号生产完毕
Product(id=0, productName=商品0号)
Product(id=1, productName=商品1号)
生产者1号生产完毕
生产者2号生产完毕
Product(id=2, productName=商品2号)
可以发现,其结果和通过数组实现的方式的整个流程是一模一样的,结果也大致相同
在研究这个put和take的源码之前,先看一下这个类的基本属性和构造方法,该类也是继承了一个抽象类
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
首先查看他的构造方法,内部有两个构造方法,一个是无参构造,如果在开发中不传这个链表的数量,那么默认是使用整数的最大值,如果传了的话就使用外部传入的值。随后会创建一个Node结点组成的单向链表
public LinkedBlockingQueue() { //无参
this(Integer.MAX_VALUE); //整型最大值
}
public LinkedBlockingQueue(int capacity) { //可以自定义参数
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null); //创建一个单向链表
}
Node结点的属性如下,是一个内部的静态类,由于只有一个next指针,因此可以知道是一个单向链表
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
由于是单向链表,因此会有指针记录链表的头结点和尾结点
transient Node<E> head; //链表头结点
private transient Node<E> last; //链表尾结点
除了上面的这些之外,还有两个重要的属性,和数组实现的最大的区别来了,在链表内部使用了两把互斥锁,内部也使用了两个祖苏队列,出队时为空的阻塞队列和入队时满了的阻塞队列
//take出队使用的互斥锁
private final ReentrantLock takeLock = new ReentrantLock();
//出队时队列为空的阻塞队列
private final Condition notEmpty = takeLock.newCondition();
//put入队使用的互斥锁
private final ReentrantLock putLock = new ReentrantLock();
//put入队队列满了的阻塞队列
private final Condition notFull = putLock.newCondition();
接下来重点研究put入队的操作,put的方法如下,put方法中使用的是putLock这把互斥锁,和数组一样,也是队列没满就加入,队列满了就阻塞。但是,内部多加了几个逻辑,比如内部容量没满则会唤醒生产者,从条件队列中加入到同步队列,并且做了一个数量为0的判断,一开始是-1,当为0时表示有值,则会去唤醒消费者去消费
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock; //入队的互斥锁
final AtomicInteger count = this.count; //队列中结点的数量
putLock.lockInterruptibly(); //该锁支持可中断
try {
while (count.get() == capacity) { //如果队列满了
notFull.await(); //将结点加入到条件队列中,线程阻塞
}
enqueue(node); //没满则入队
c = count.getAndIncrement(); //结点数+1
if (c + 1 < capacity) //此时队列中结点没满,可能被消费者批量消费了
notFull.signal(); //唤醒被阻塞生产者线程,提高吞吐量
} finally {
putLock.unlock(); //解锁
}
if (c == 0)
signalNotEmpty();
}
在下面的这段代码中,就是通过判断容量是否为满了来实现自身的唤醒,从而实现条件队列中的结点出队,然后转换到同步队列中,这样子就不必要等到消费者来唤醒,自身生产者就能唤醒,这样增加了吞吐量。
其主要原因是在消费者take的时候,只有在这个c == 容量的时候,才会去唤醒这个生产者消费,假设生产者每次生产5个,消费者每次消费4个,那么在生产端和消费端不对等的条件下,一定会出现容量不满,但是条件队列有又不被唤醒,因此在自身内部唤醒,大大的提高的系统的吞吐量。况且如果在容量不为满的时候,要是消费者不消费,那不是得一直阻塞着。
if (c + 1 < capacity)
notFull.signal();
if (c == capacity)
signalNotFull();
入队的方法如下,由于是链表入队,因此比较简单,第一步是last记录的尾指针结点的next指针指向这个Node结点,第二步就是将这个Node结点作为尾指针记录
private void enqueue(Node<E> node) {
last = last.next = node; //结点入队
}
入队之后会有一个finally,主要是用于解锁,就是将同步队列的结点唤醒,队头出队等功能
finally {
putLock.unlock(); //解锁
}
在讲解完put操作之后,再来讲解消费者的take操作,其实现如下。内部使用的是另外一把互斥锁takeLock,如果队列为空,则阻塞,否则进入出队的逻辑
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock; //互斥锁
takeLock.lockInterruptibly();
try {
while (count.get() == 0) { //如果队列为空
notEmpty.await(); //将结点加入条件队列阻塞
}
x = dequeue(); //结点不为空则出队
c = count.getAndDecrement(); //减1
if (c > 1) //这里的设计和上面的一样
notEmpty.signal(); //主要是为put方法c = 0才会唤醒,如果消费和生产不一致,则主动唤醒
} finally {
takeLock.unlock(); //解锁
}
if (c == capacity)
signalNotFull();
return x;
}
出队的逻辑如下,如果队列不为空,则出队。内部主要是将头结点的下一个结点做为头结点。并且为了更好的GC,将原来的头结点自己指向自己,这样减少GC root的引用链,根据可达性的分析的算法,这个结点会被垃圾回收器回收
private E dequeue() {
Node<E> h = head; //获取头结点
Node<E> first = h.next; //将头结点的下一个结点作为头结点
h.next = h; //自己指向自己,没有被gcroot引用,会被回收
head = first; //head指针指向头结点
E x = first.item;
first.item = null;
return x;
}
内部有一段和put类似的,就是如果是队列还有值,就唤醒因为队列为空而阻塞的线程结点,因为在put方法中只有为0时才会唤醒,如果生产者和消费者的速度不一致,那么肯定会出现队列不为空,但是不唤醒的操作,因此在这了引入了这个自身唤醒的方法,从而提高吞吐量
if (c > 1)
notEmpty.signal();
if (c == 0) //put方法中唤醒消费者的条件
signalNotEmpty();
最后就是这个finally中的unlock,主要是用于解锁,就是将同步队列的结点唤醒,队头出队等功能
finally {
putLock.unlock(); //解锁
}
如果说数组实现的精髓在于环状数组+双指针的设计,那么链表实现的精髓就在于这个put和take提前唤醒条件队列中被阻塞的结点,从而提高入队和出队的效率,减少大量节点的阻塞问题。
链表实现的阻塞队列内部用了两把互斥锁,以及两个阻塞队列,让take和put的职责更加的单一,从而提高整个系统的吞吐量。在队列为空是出队的线程结点会被阻塞,在队列满是入队的线程结点会被阻塞。
数组和链表实现的阻塞队列都是通过条件等待队列+同步等待队列来实现,但是链表的吞吐量会高于数组,链表需要注意的是容量的设置,如果不设置容量的参数,那么会很容易的出现OOM的情况,因此在实际的开发中,在面临数组和链表实现阻塞队列时,可以优先的考虑使用设置容量的链表阻塞队列,效率相对会更高。
默认这个LinkedBlockingQueue是一个无界队列,如果在构造方法中传参,那么也可以认为是一个有界队列