首先我们来看一看阻塞队列家族
BlockingQueue接口定义了以下方法
无非就是“入队”和“出队”两类动作,全部交由五个不同的实现类实现。
听名字我们就知道他的数据结构一定是数组,除此之外,数组上具有两个指针,分别指向头尾,头表示下一个该出队的元素,尾表示下一个该入队的元素位置。
// 数据结构-数组
final Object[] items;
// 下一个该出队的元素 头指针
int takeIndex;
// 下一个该入队的元素位置 尾指针
int putIndex;
// 数组中存在的元素个数
int count;
下一步,我们考虑阻塞的实现,队列存在两种等待条件,一种是队列空时,出队需要等待,一种是队列满时,入队需要等待,synchronized只有一个等待条件,不符合要求,因此使用ReentrantLock。
// 锁
final ReentrantLock lock;
// 队列空时的等待队列
private final Condition notEmpty;
// 队列满时的等待队列
private final Condition notFull;
数组构造方法一定需要指定容量,因为数组需要预分配空间,
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 预分配空间
this.items = new Object[capacity];
// 初始化锁与条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
首先,我们实现在不使用锁的情况下进行入队方法,再在外层套一层壳即可。
private void enqueue(E x) {
final Object[] items = this.items;
// 直接给尾指针赋值
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
// 队列当前元素加一
count++;
// 通知等待队列不空的线程
notEmpty.signal();
}
下面,为确保线程安全,需要使用锁,而有三个操作都能入队,它们的实现也有细微区别,它们分别是:
先来看看offer的实现,
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列满时,返回false
if (count == items.length)
return false;
// 入队
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
在入队之前,先锁住队列实例,除开本线程外,其他线程既不能入队也不能出队。
add就是在offer基础上套一层判断,
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
而put的实现,无非就是在判断队列满时,当前线程进入notNull等待队列等待,lockInterruptibly是指当线程阻塞时,能响应中断,注意线程等待一般都需要使用while,让线程再度检查。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列满时,等待
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
同样,先看在不加锁时的出队方法,
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 环形
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 通知等待队列不满的线程
notFull.signal();
return x;
}
可以发现,由于双指针,入队和出队从逻辑上首位连到了一起,成了环形数组。
与入队一样,有两个操作都能出队,
先来看poll的实现,
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 若队列空,则返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
在出队之前,先锁住队列实例,并且使用try catch来保证锁释放。
take()的实现已经呼之欲出了,
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
在队列空时,进入notEmpty等待,等待队列不空这个条件。
看到Linked,就知道它的数据结构一定是链表,链表需要定义节点,
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
也就是节点的元素,节点的next指针。
与数组实现类似,该链表需要头尾指针,链表的最大容量(默认为Integer.MAX_VALUE),和当前链表中存储的元素数量(使用并发安全的Integer,这是由于链表实现使用两把锁,不能保证并发安全)。
transient Node<E> head;
private transient Node<E> last;
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
阻塞的实现和ArrayBlockingQueue有些许不同,我们知道,数组形式的实现入队出队使用的是同一把锁,而链表实现入队和出队使用两把不同的锁,入队与出队不再互斥,效率更高。
// 出队锁
private final ReentrantLock takeLock = new ReentrantLock();
// 入队锁
private final ReentrantLock putLock = new ReentrantLock();
// 两个条件变量
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
无参构造,调用有参构造,默认链表容量为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
有参构造,初始化容量,头尾指针指向null的头结点。
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 带头结点的链表
last = head = new Node<E>(null);
}
首先是不使用锁的实现,尾插法,直接将node接到尾节点last后。
private void enqueue(Node<E> node) {
last = last.next = node;
}
然后加上锁,特别的,这里是入队锁(putLock),有两种入队方式
首先看一下offer的实现,
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 容量已满
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 入队锁
putLock.lock();
try {
// 再次检查容量(双重检查)
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
// 通知入队阻塞线程
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
// 如果队列中加入的元素是第一个,通知出队阻塞线程
signalNotEmpty();
return c >= 0;
}
可以发现,链表结构的入队与数组结构的入队大不相同,首先,链表入队使用的是入队锁,其次,入队阻塞线程不完全由出队线程唤醒,而是可以由入队线程唤醒。
如果队列中加入的元素是第一个,则需要去唤醒出队阻塞线程,
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
这是由于使用了两把锁,若入队阻塞线程都由出队线程唤醒,那么,出队线程在出队后,又要去获得入队锁唤醒线程,那么这两把锁就相当于还是一把,因为每次操作都要同时获得两把锁。
put的实现也就在offer基础上,改变了队满的处理方法而已。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 线程阻塞
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
同样先看不使用锁的实现,
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
// 头结点断开
h.next = h; // help GC
head = first;
// 取出第一个结点的元素
E x = first.item;
// 第一个结点成为新的头结点
first.item = null;
return x;
}
出队的实现有些特殊,将头结点断开,取出第一个结点的元素,并让第一个结点成为头节点(置null)。
现在我们加上出队锁,
首先看poll()方法,
public E poll() {
final AtomicInteger count = this.count;
// 队列为空
if (count.get() == 0)
return null;
// 返回的元素,默认为空
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
// 出队锁
takeLock.lock();
try {
// 双重检查,拿到锁后再次检查队列
if (count.get() > 0) {
// 出队
x = dequeue();
c = count.getAndDecrement();
// 若队里还有元素,通知其他出队线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 从满队出队,通知入队线程
if (c == capacity)
signalNotFull();
return x;
}
使用出队锁保证与入队动作无阻塞,同样,出队线程去通知另外的出队线程,当从满队出队时,去通知入队线程。
看懂了poll,take的实现也就小菜一碟了,
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 当队列为空时,等待队列不空
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
使用while循环,当再度被唤醒时,再次校验当前元素数量。