• 生产者消费者模型


    生产者消费者模型

    什么是生产者消费者模型

    我们可以把这个模型想象成工厂里的两条流水线,我们管他们叫生产者流水线和消费者流水线,生产者流水线生产出来的产品给消费者流水线使用,其中生产者流水线先把生产出来的产品放在仓库,然后消费者流水线再去仓库拿。这个仓库就叫做阻塞队列。
    那么,这个仓库的实现有什么要求呢?

    • 第一,仓库满的时候,不能继续往里放了,生产者流水线要停止生产。
    • 第二,仓库空的时候,拿不出来了,消费者流水线要停止取商品。
    • 第三,生产者流水线和消费者流水线都可以使用仓库。也就是说仓库是共享变量,要注意线程安全。

    下面我们先来设计一下这个仓库(阻塞队列):
    关于阻塞队列的设计,有几点需要我们思考:

    • ①为什么要给阻塞队列容量的限制?如果生产者生成的效率比消费者消费的效率快,那么生产者产生的“产品”会不断的在队列中累积起来,最终耗尽内存。如果使用有界队列,那么当队列满时,生产者会阻塞并且不能继续工作,而消费者可以赶上工作进度。
    • ②如何保证阻塞队列的线程安全?由于生产者和消费者都是对阻塞队列进行修改操作,所以我们既需要保证操作的可见性又需要保证原子性,实现的方法有很多种:一种是将阻塞队列设计成线程安全的类,另一种是将生产者和消费者对阻塞队列的操作设计成原子操作。
    • ③阻塞队列是如何实现阻塞的?wait/notify 或者 await/signal都可以实现阻塞与唤醒操作。

    生产者消费者模型的代码实现

    将阻塞队列设计成线程安全的类

    import java.util.LinkedList;
    import java.util.Queue;
    
    public class BlockQueueplus {
        Queue queue = new LinkedList();
        int capacity ; //阻塞队列的容量
    
        public BlockQueueplus(int capacity) {
            this.capacity = capacity;
    
        }
    
        /**
         * 将数据放入阻塞队列中
         * @param i 放入的元素
         * @throws InterruptedException
         */
        public synchronized void put(Integer i) throws InterruptedException {
            while(capacity <= queue.size()){
                wait();
            }
            queue.offer(i);
            System.out.println(Thread.currentThread().getName() + "生产了value, value的当前值是" + i );
            notify();
        }
    
        /**
         * 从阻塞队列中取出数据
         * @return
         * @throws InterruptedException
         */
        public synchronized int take() throws InterruptedException {
            if(size() == 0){
                wait();
            }
            Integer result = queue.poll();
            System.out.println(Thread.currentThread().getName() + "消费了value, value的当前值是" + result );
            notify();
            return result;
        }
    
        public int size(){
            return queue.size();
        }
    
        public Boolean isEmpty(){
            return queue.isEmpty();
        }
    
        public Boolean isFull(){
            return this.size()==this.capacity;
        }
    
    }
    
    

    synchronized关键字用在方法上的作用

    有的同学可能不懂synchronized关键字用在方法上有什么作用,我来讲解一下:
    在我的BlockQueueplus类中,put()和take()都添加了synchronized关键字,当进入synchronized修饰的方法时,锁住的是当前实例类,所以当调用put()方式时,put()方法拿到了当前实例的锁,take()想运行也需要拿到锁,就要等待put()方法运行完释放锁。所以就实现了put操作和take操作的互斥。

    wait和notify的使用

    首先呢,Java 中每个对象都有一把称之为 monitor 监视器的锁,调用synchronized方法时,会获取monitor锁。当调用wait方法时,会释放monitor锁。在我的BlockQueueplus类中,当队列满时,put方法会调用wait方法,进入阻塞,此时take方法就可以获取锁,运行取操作。take操作在执行完取操作之后,会调用notify()方法,通知一个正在wait阻塞中的线程让它继续运行。

    生产者线程与消费者线程

    用Work把阻塞队列封装一下,只提供插入和取两种方法。

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Work{
        private static BlockQueueplus blockQueueplus = new BlockQueueplus(100);
        public void set(int i)
        {
                try {
                    blockQueueplus.put(i);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
    
        public void get() 
        {
            try {
                Integer i = blockQueueplus.take();
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    

    测试类

    开启两个线程,生产者不停的往阻塞队列中插入数据,消费者不停的从阻塞队列中取数据

    class WorkTest {
        public static void main(String[] args) {
            Work work = new Work();
            Runnable producerRunnable = new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < Integer.MAX_VALUE; i++)
                        work.set(i);
                }
            };
            Runnable customerRunnable = new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < Integer.MAX_VALUE; i++)
                        work.get();
                }
            };
            Thread ProducerThread = new Thread(producerRunnable);
            ProducerThread.setName("Producer");
            Thread ConsumerThread = new Thread(customerRunnable);
            ConsumerThread.setName("Consumer");
            ProducerThread.start();
            ConsumerThread.start();
        }
    }
    

    运行结果:

    从运行结果中可以发现,生产者线程与消费者线程交接的时候,他们生产的数和消费的数的差正好为99,也证明了阻塞队列设计的成功。

    还可以改进的地方

    生产者线程可能是多个,消费者线程也可以是多个,如果继续使用暴力的wait和notify,就有可能会出现生产者A唤醒生产者B的错误,我们可以尝试使用await和signal来优雅的唤醒需要唤醒的线程。


    __EOF__

  • 本文作者: classic
  • 本文链接: https://www.cnblogs.com/classicltl/p/17147741.html
  • 关于博主: 评论和私信会在第一时间回复。或者直接私信我。
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    SQLite 日期 & 时间
    Nvidia Jetson/Orin +FPGA+AI大算力边缘计算盒子:无人机自主飞行软件平台
    摘要与关键词 写作
    【功能栏】基于session的模拟短信注册功能
    【设计模式】外观模式
    Mac basictex缺少xxx.sty文件
    知识储备 网站收集
    服务注册发现_Eureka概述
    Linux 文本处理三剑客:grep、sed 和 awk
    SaaSBase:什么是讯众通信?
  • 原文地址:https://www.cnblogs.com/classicltl/p/17147741.html