目录
生产者消费者模型是多线程的一个经典案例,在多个线程并发执行时,会有线程来生产数据这就是生产者,同时还有线程来消费数据这是消费者,生产者和消费者共同完成多线程并发任务.
1.生产者和消费者在进行协作时必然会出现一些问题,比如生产者生产数据过快,而消费者消费数据过慢就导致生产者会处于阻塞等待,同理消费者消费数据过快,而生产者生产过慢,就会导致生产者会阻塞等待.
2.生产者和消费者在进行协作时,如果有一个出现异常,就会影响另外一个.
生产者和消费者模型是通过阻塞队列来实现,通过阻塞队列能够使生产者和消费者耦合性降低,通过阻塞队列,将一些压力全部交给阻塞队列能够"削峰减谷",平衡生产者和消费者的处理能力.
在软件开发中会有一个名词: 高内聚,低耦合,首先明确耦合度低比较好,内聚高比较好.
什么是耦合性高?? 什么是耦合性低??
举例 : 比如在工作敲代码的时候,你不能离开你的电脑,要保持工作状态,电脑和我关联性就很大,谁也离不开谁,这就是耦合高,相反如果在不工作,玩的时候,两者关系不是十分紧密,这就是耦合性低.
放到服务器上也是一样的,当两个服务器直接进行交互,联系十分紧密,这种就是耦合性比较高,相反第二种两个服务器不直接进行交互,耦合性较低.
而在上述情景,如果两个服务器联系十分紧密,就会导致出现,一个服务器出现异常,另一个也会受到影响,对于第二种情况,两者不进行交互,一个服务器出现异常,不会导致另外一个也出现异常.
所以,通过一个阻塞队列能够使得生产者和消费者之间的耦合性降低
对于左边情景来说:
如果多个客户端访问服务器A,就可能导致A服务器挂掉,而A又与B进行直接交互,A的异常会直接影响到服务器B.
对于右边情景来说:
多个客户端访问服务器A时,服务器A将所有数据全部交给阻塞队列,而阻塞队列正常的给服务器B发送数据,将压力交给阻塞队列,不会导致出现异常.--->这就能在高峰期避免服务器异常.A与B不直接进行交互,而是A与阻塞队列交互,B与阻塞队列交互.
所以,通过阻塞队列能够平衡生产者和消费者处理能力,能够"削峰减谷".
在java标准库里阻塞队列是利用BlockingQueue,这个接口实现了LinkedBlockingQueue,能够保证线程安全
在BlockingQueue中的put方法和take方法是保证线程安全的,而对于其他offer,add,poll,等等不不保证线程安全.
- //实现一个阻塞队列
- public class MyBlockingQueue {
- private int[] elem;//将元素放到一个数组里面private volatile int usedSize;//数组有效个数--也用来判断队列空还是满private int front;//对头private int rear;//队尾//构造方法--初始化public MyBlockingQueue(){
- this.elem = new int[1000];//数组初始化1000个元素this.usedSize =0;//数组有效个数初始化this.front =0;//初始对头下标为0this.rear = 0;//初始队尾下标为0
- }
-
- //实现put方法public void put(int val) throws InterruptedException {
- //这里加锁是为了有修改操作,可能有的线程在读有的在写,并且wait要在synchronized中使用
- synchronized(this) {
- //判断队列是否为满,如果满需要等待.//为了避免阻塞时间过长,要再次进行判断--使用whilewhile (this.usedSize == this.elem.length) {
- this.wait();
- }
- //添加元素将val放在rear位置上,rear往后走this.elem[rear] = val;
- rear++;
- //如果rear走到末尾位置,就要从到开始,避免浪费空间if(this.rear==this.elem.length){
- this.rear= 0;
- }
- this.usedSize++;//添加元素成功有效个数+1this.notify();//用来通知已经添加了一个元素--队列不为空
- }
- }
-
- //实现take方法--删除对头元素并返回public Integer take() throws InterruptedException {
- int ret =0;
- synchronized(this){
- //用来判断队列是否为空,如果为空就要进行等待;//等待队列添加元素,在通知--呼应put方法的通知notifywhile(this.usedSize==0){
- this.wait();
- }
- //存放对头元素,front往后走相当于将对头元素删除
- ret = this.elem[front];
- this.front++;
- //front走到队尾越界就从头开始--避免队列浪费空间if(front==this.elem.length){
- this.front = 0;
- }
- this.usedSize--;
- this.notify();//删除对头元素来通知队列不满了可以添加元素--呼应put判断是否满
- }
- return ret;//将对头元素返回去
- }
-
- //测试 生产者-消费者模型public static void main(String[] args) {
- MyBlockingQueue queue = new MyBlockingQueue();
- //生产者
- Thread producer = new Thread(()->{
- int n =0;
- while(true){
- try {
- queue.put(n);
- System.out.println("生产了 " + n);
- n++;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- producer.start();
-
- //消费者
- Thread customer = new Thread(()->{
- while(true){
- try {
- int x = queue.take();
- System.out.println("消费了"+x);
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- customer.start();
- }
-
- }
注意理解这里的等待和通知
这里使用循环的原因是,因为在等待时要释放锁,然后notify拿到锁之后在进行通知,在synchronized代码块执行完之后,wait在尝试获取锁,这里的间隔时间不确定,避免条件再次成立,所以使用while,再次确保条件成立