目录
SynchronousQueue同步队列,jdk8源码中的文档注释对其解释如下,
* A {@linkplain BlockingQueue blocking queue} in which each insert * operation must wait for a corresponding remove operation by another * thread, and vice versa. A synchronous queue does not have any * internal capacity, not even a capacity of one. You cannot * {@code peek} at a synchronous queue because an element is only * present when you try to remove it; you cannot insert an element * (using any method) unless another thread is trying to remove it; * you cannot iterate as there is nothing to iterate. The * head of the queue is the element that the first queued * inserting thread is trying to add to the queue; if there is no such * queued thread then no element is available for removal and * {@code poll()} will return {@code null}. For purposes of other * {@code Collection} methods (for example {@code contains}), a * {@code SynchronousQueue} acts as an empty collection. This queue * does not permit {@code null} elements.简译如下:
(线程A)对一个同步队列(SynchronousQueue)的insert-写入元素操作,必须在另一个线程B的remove-移除/读出元素之后执行。
一个同步队列是不含内部容量的(亦即:capacity=0),因为元素element-a仅在线程B尝试remove-读出它(元素a)的瞬间存在;同样,开发者也无法使用线程A主动向同步队列insert-写入元素,除非此时存在一个线程B正在准备读取(remove)同步队列中的元素。
(上面提到,同步队列的capacity为0,因此,)开发者无法遍历/迭代同步队列。
同步队列的头元素,就是正在排队的第一个线程正在写入的那个元素;但是,如果不存在负责写入元素操作的线程,那么,正在等待从同步队列中读出元素的线程B,通过poll()方法读取到的元素只能是null。
同步队列就等价于一个空的Collection集合对象,也不允许存入null-空元素。
如下,我们创建线程A,用于从同步队列中读出数据;创建线程B,用于从同步队列中写入数据。
首先使用take()方法读取数据,此时,当同步队列为空时,负责读出元素的线程A就会进入长期的阻塞状态。示例代码如下,
- import java.util.PriorityQueue;
- import java.util.concurrent.SynchronousQueue;
-
-
- public class SynchronousQueue_Class {
- //properties
-
- //methods
- public static void main(String[] args) throws InterruptedException {
- //SynchronousQueue-同步队列
- SynchronousQueue
synchronousQueue = new SynchronousQueue<>(); - //创建一个线程-先取出数据-take()方法-取出数据
- new Thread(() -> {
- while (true){
- try {
- Object take = synchronousQueue.take();
- System.out.print(Thread.currentThread().getName()+"取出数据-");
- System.out.println(take);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- //创建一个线程-写入数据-put方法会导致线程阻塞
- new Thread(() -> {
- try {
- System.out.println(Thread.currentThread().getName()+"写入数据...");
- synchronousQueue.put("aaa");
- synchronousQueue.put("bbb");
- synchronousQueue.put("ccc");
- synchronousQueue.put("ddd");
- synchronousQueue.put("eee");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- }
- }

其次使用poll()方法读取数据,当负责读出元素的线程A预先设定阻塞时间时,当同步队列为空时,就会在等待若干时长之后,若返回值仍然为null空,就会自动结束阻塞状态。示例代码如下,
- public static void main(String[] args) throws InterruptedException {
- poll_test();
- }
-
- private static void poll_test(){
- SynchronousQueue
synchronousQueue = new SynchronousQueue<>(); - //线程A-poll方法-读出数据
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true){
- String poll = null;//阻塞5s,若返回值仍然为null,自动结束子线程A
- try {
- poll = synchronousQueue.poll(5, TimeUnit.SECONDS);
- if (Objects.isNull(poll))
- break;
- else
- System.out.println(Thread.currentThread().getName()+"读出数据:"+poll);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- //线程B-put()方法-写入数据
- new Thread(new Runnable() {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+"写入数据...");
- try {
- synchronousQueue.put("aaa");
- synchronousQueue.put("bbb");
- synchronousQueue.put("ccc");
- synchronousQueue.put("ddd");
- synchronousQueue.put("eee");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- }
