• 一文带你吃透阻塞队列


    什么是阻塞队列

    我们在线程池参数中经常用到一个参数叫做阻塞队列,很多人不知道阻塞队列应该怎么选择,为了解决这个问题,我们需要先了解什么是阻塞队列。

    要了解这个,我们需要先知道什么是队列。

    队列是一种先进先出的线性表,在一端插入,另一端输出,可以把它想象成一个没有感情的单行道,里面的元素就类似于道上的车子。

    那么让我们来看看阻塞队列又是什么呢?

    一个队列的实现无非就是数组、链表等数据结构,它也有自己的大小,这里可以分为有界队列和无界队列,有界队列的大小是有限的,无界队列的大小为Integer.MAX,一定程度上可以理解为无限大,那么当有界队列满了,一端还在继续插入元素的时候,就应该阻塞,又或者队列空了,一端不断获取元素,也应该阻塞。

    阻塞队列的作用

    我们了解了什么是阻塞队列,那么它到底有什么用呢?阻塞队列在线程池和各种消息队列中经常用到,主要作用是缓冲,为了防止大量请求对服务器造成的压力过大,开辟了一块缓冲区,来限制流量。

    阻塞队列的原理

    阻塞队列的本质是利用了生产者消费者模式,利用lock锁以及Condition对线程进行操作来控制队列的状态,如果满了则对线程挂起并释放锁,即Condition.await(),如果添加元素成功或者删除元素成功则对指定线程进行唤醒,即Condition.signal()。

    我们来看下简单实现

    public class ConditionBlockedQueueExample {
        //表示阻塞队列中的容器
        private List<String> items;
    
        //元素个数(表示已经添加的元素个数)
        private volatile int size;
    
        //数组的容量
        private volatile int count;
    
        private Lock lock = new ReentrantLock();
    
        //让take方法阻塞
        private final Condition notEmpty = lock.newCondition();
        //让add方法阻塞
        private final Condition notFull = lock.newCondition();
    
        public ConditionBlockedQueueExample(int count) {
            this.count = count;
            items = new ArrayList<>(count);
        }
    
        //添加一个元素并且阻塞添加
        public void put(String item) throws InterruptedException {
            lock.lock();
            try {
                if (size >= count) {
                    System.out.println("阻塞队列满了,需要等待");
                    notFull.await();
                }
                ++size;
                items.add(item);
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public String take() throws InterruptedException {
            lock.lock();
            try {
                if (size == 0) {
                    System.out.println("阻塞队列空了,需要等待");
                    notEmpty.await();
                }
                --size;
                String item = items.remove(0);
                notFull.signal();
                return item;
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            ConditionBlockedQueueExample cbqe = new ConditionBlockedQueueExample(10);
            Thread t1 = new Thread(() -> {
                Random random = new Random();
                for (int i = 0; i < 1000; i++) {
                    String item = "item-" + i;
                    try {
                        cbqe.put(item);
                        System.out.println("生产一个元素:" + item);
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t1.start();
    
            Thread.sleep(1000);
    
            Thread t2 = new Thread(() -> {
                Random random = new Random();
                for (;;) {
                    try {
                        String item = cbqe.take();
                        System.out.println("消费一个元素:" + item);
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t2.start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    阻塞队列的方法

    了解了阻塞队列的原理后,我们回想线程池中的拒绝策略,类比这里,是不是所有的阻塞队列满了以后就都是进行阻塞呢?
    答案显然是否定的。
    在阻塞队列中,不管是添加还是移除元素,都存在不同的策略。
    我们以ArrayBlockingQueue为例。

    阻塞队列抛出异常有返回值等待阻塞等待超时
    入队方法add()offer()put()offer(值,超时时间,超时单位)
    出队方法remove()poll()take()poll(超时时间,超时单位)
    获取队顶方法element()peek()
    队满或队空输出结果抛出异常返回false或null一直阻塞等待超时

    JUC中的阻塞队列

    JUC中有很多种阻塞队列,不同的阻塞队列的使用场景不同。

    • ArrayBlockingQueue 基于数组结构
    • LinkedBlockingQueue 基于链表结构
    • PriorityBlockingQueue 基于优先级队列
    • DelayQueue 允许延时执行的队列
    @ToString
    public class DelayQueueExampleTask implements Delayed {
        private String orderId;
        private long start = System.currentTimeMillis();
        private long time;
    
        public DelayQueueExampleTask(String orderId, long time) {
            this.orderId = orderId;
            this.time = time;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert((start+time)-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed o) {
            return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    public class DelayQueueTest {
        private static DelayQueue<DelayQueueExampleTask> delayQueue = new DelayQueue<>();
        public static void main(String[] args) throws InterruptedException {
            delayQueue.offer(new DelayQueueExampleTask("1001",5000));
            delayQueue.offer(new DelayQueueExampleTask("1002",4000));
            delayQueue.offer(new DelayQueueExampleTask("1003",3000));
            delayQueue.offer(new DelayQueueExampleTask("1004",2000));
            delayQueue.offer(new DelayQueueExampleTask("1005",1000));
            while (!delayQueue.isEmpty()){
                DelayQueueExampleTask task = delayQueue.take();
                System.out.println(task);
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    • SynchronousQueue 没有任何存储结构的队列(在线程池中newCacheThreadPool中可用到,可以处理非常大请求的任务)
    • LinkedTransferQueue 由链表构建的无界阻塞队列,保证生产者和消费者是一对一的关系
    • LinkedBlockingDeque 双向链表组成的队列,支持双向的插入和移除,在一定程度上能够解决多线程的竞争问题,ForkJoin的使用的就是这种。

    阻塞队列的使用

    阻塞队列在平时很多场景都可以使用,比如责任链模式,构建一条执行链路。

    首先定义一个责任链接口

    public interface IRequestProcessor {
        void processRequest(Request request);
    }
    
    • 1
    • 2
    • 3

    再构建一个链式的实现

    public class ValidProcessor extends Thread implements IRequestProcessor{
    
        protected IRequestProcessor nextProcessor;
    
        protected BlockingQueue<Request> requests = new LinkedBlockingDeque<>();
    
        public ValidProcessor(IRequestProcessor nextProcessor) {
            this.nextProcessor = nextProcessor;
        }
    
        @Override
        public void processRequest(Request request) {
            requests.add(request);
        }
    
        @Override
        public void run() {
            while (true){
                try {
                    Request request = requests.take();
                    System.out.println("执行了" + this.getClass().getSimpleName() + "责任链");
                    Optional.ofNullable(nextProcessor).ifPresent(k->nextProcessor.processRequest(request));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    public class SaveRequestProcessor extends Thread implements IRequestProcessor{
    
        protected IRequestProcessor nextProcessor;
    
        protected BlockingQueue<Request> requests = new LinkedBlockingDeque<>();
    
        public SaveRequestProcessor(IRequestProcessor nextProcessor) {
            this.nextProcessor = nextProcessor;
        }
    
        @Override
        public void processRequest(Request request) {
            requests.add(request);
        }
    
        @Override
        public void run() {
            while (true){
                try {
                    Request request = requests.take();
                    System.out.println("执行了" + this.getClass().getSimpleName() + "责任链");
                    Optional.ofNullable(nextProcessor).ifPresent(k->nextProcessor.processRequest(request));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    public class PrintProcessor extends Thread implements IRequestProcessor{
    
        protected IRequestProcessor nextProcessor;
    
        protected BlockingQueue<Request> requests = new LinkedBlockingDeque<>();
    
        public PrintProcessor(IRequestProcessor nextProcessor) {
            this.nextProcessor = nextProcessor;
        }
    
        @Override
        public void processRequest(Request request) {
            requests.add(request);
        }
    
        @Override
        public void run() {
            while (true){
                try {
                    Request request = requests.take();
                    System.out.println("执行了" + this.getClass().getSimpleName() + "责任链");
                    Optional.ofNullable(nextProcessor).ifPresent(k->nextProcessor.processRequest(request));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    public class FinalRequestProcessor  extends Thread implements IRequestProcessor {
    
        protected IRequestProcessor nextProcessor;
    
        protected BlockingQueue<Request> requests = new LinkedBlockingDeque<>();
    
        @Override
        public void processRequest(Request request) {
            requests.add(request);
        }
    
        @Override
        public void run() {
            while (true){
                try {
                    Request request = requests.take();
                    System.out.println("执行了" + this.getClass().getSimpleName() + "责任链");
                    Optional.ofNullable(nextProcessor).ifPresent(k->nextProcessor.processRequest(request));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    通过线程将责任链的请求加到阻塞队列中,在构建链式结构后启动线程,即可通过生产者消费者模式异步处理链上的请求。

    public class ChainExample {
        public static void main(String[] args) {
            FinalRequestProcessor finalRequestProcessor = new FinalRequestProcessor();
            finalRequestProcessor.start();
            SaveRequestProcessor saveRequestProcessor = new SaveRequestProcessor(finalRequestProcessor);
            saveRequestProcessor.start();
            PrintProcessor printProcessor = new PrintProcessor(saveRequestProcessor);
            printProcessor.start();
            ValidProcessor validProcessor = new ValidProcessor(printProcessor);
            validProcessor.start();
            Request request = new Request();
            validProcessor.processRequest(request);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    通过阻塞队列可以做很多非实时的数据处理,比如订单详情信息,物流实时信息等,这种方式可以防止大量请求堆积,在某些场景下可以提升较多的系统性能。

  • 相关阅读:
    算法通关村第三关-青铜挑战数组专题
    PyQt5开发相关
    在自定义数据集上微调Alpaca和LLaMA
    mysql的监控大屏
    golang、gin、gorm、casbin访问权限控制
    多通道图片的卷积过程
    Google Guava精讲(一)-Guava是什么?能做什么?
    [附源码]计算机毕业设计微录播室预约管理系统Springboot程序
    leetcode刷题集:单调栈(python代码)
    网络编程打开的第一节预备课-----关于socket
  • 原文地址:https://blog.csdn.net/qq_40359381/article/details/125614543