• 生产者消费者模型


    生产者消费者模型

    为何要使用生产者消费者模型

    如果我们在主函数中调用某一函数,主函数调用某一函数其实就是主函数把数据交给其函数进行处理的过程,但是我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。
    如下图:
    函数和函数之间交互的本质,其实也是数据通信。
    在这里插入图片描述函数和函数间的交互非常低效,必须要等待另一方处理完成后才能往下执行。但是如果我们把两个函数变成两个执行流时,两个执行流通过临界资源进行交互,此时两个函数就有可能并行运行了。

    生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

    如下图:
    在这里插入图片描述

    生产者消费者模型的特点

    生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

    • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
    • 两种角色: 生产者和消费者。(通常由进程或线程承担)
    • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)

    我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护。

    生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

    介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。它们都需要竞争锁资源,所以他们之间存在互斥关系。

    生产者和消费者之间为什么会存在同步关系?

    • 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
    • 反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。

    虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的"饥饿问题",是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

    生产者消费者模型优点

    • 解耦。
    • 支持并发。
    • 支持忙闲不均。

    基于BlockingQueue的生产者消费者模型

    进程通信,不管是管道还是共享内存,首先要让不同的进程看到同一块资源。我们换个视角来看待进程通信其实就是生产者消费者模型,所以管道自带同步和互斥的机制,管道和共享内存就是交易场所。

    BlockingQueue

    在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

    我们先设计一个单消费者单生产者模型的阻塞队列,一个生产者线程给队列放入数据,另一个消费者线程往队列拿数据,队列就是交易场所,现在我们有了1个交易场所,两中角色,现在我们只需要维护角色之间的一个个关系即可,因为我们是单消费者单生产者,生产者和生产者,消费者和消费者,不需要维护。队列为先进先出,锁此时我们的生产者必须要在队列不满时放入数据,消费者必须在队列不为空时拿数据,消费者和生产者使用条件变量能清楚的知道读写条件是否满足,条件变量只是用于阻塞或唤醒线程,例如接下来的代码当队列满时生产者等待并且友好的唤醒消费者,消费者也类似。在这里插入图片描述
    代买实现:

    code.cpp 文件代码

    //.h .cc .cpp
    //.hpp -> 开源软件使用 -> 声明和实现可以放在一个文件里
    
    #include "./Block_Queue.hpp"
    #include 
    #include 
    #include 
    
    using namespace ns_blockqueue;
    
    void *consumer(void *args)
    {
        BlockQueue<int> *bq = (BlockQueue<int>*)args;
        while(true){
            sleep(2);
            int data = 0;
            bq->Pop(&data);
    
            std::cout << "消费者消费了一个数据: " <<  data << std::endl;
        }
    }
    
    void *producter(void *args)
    {
        BlockQueue<int> *bq = (BlockQueue<int>*)args;
        while(true){
            // sleep(2);
            //1. 制造数据,生产者的数据(task)从哪里来??
            int data = rand()%20 + 1;
            std::cout << "生产者生产数据: " << data << std::endl;
            bq->Push(data);
        }
    }
    
    int main()
    {
        srand((long long)time(nullptr));
        BlockQueue<int> *bq = new BlockQueue<int>();
    
        pthread_t c,p;
        pthread_create(&c, nullptr, consumer, (void*)bq);
        pthread_create(&p, nullptr, producter, (void*)bq);
    
        pthread_join(c, nullptr);
        pthread_join(p, nullptr);
    
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    说明:

    • 创建生产者和消费者线程,生产者不断的往队列里加入数据,消费者不断的往队列里拿数据,分别调用Block_Queue对象的push()和pop()函数。

    Block_Queue.hpp代码:

    #pragma once
    #include 
    #include 
    #include 
    
    namespace ns_blockqueue
    {
        const int default_cap = 5;
    
        template <class T>
        class BlockQueue
        {
        private:
            std::queue<T> bq_; //我们的阻塞队列
            int cap_;          //队列的元素上限
            pthread_mutex_t mtx_; //保护临界资源的锁
        public:
            BlockQueue(int cap = default_cap):cap_(cap)
            {   }
            ~BlockQueue() 
            {  }
        public:
            void Push(const T &in)
            {
                //向队列中放数据,生产函数
                bq_.push(in);
            }
    
            void Pop(T *out)
            {
                //从队列中拿数据,消费函数函数 
            }
        };
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    说明:

    • 将Block_Queue当中存储的数据模板化,方便以后需要时进行复用。
    • 这里设置Block_Queue存储数据的上限为5。
    • 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
    • 总体的框架出来了,这时候我们要处理push(),pop()函数的逻辑达到消费者与生产者为互斥和同步的效果。

    push()函数函数说明

    • 当生产满了的时候,就应该不要生产了(不要竞争锁了),而应该让消费者来消费。
    • 条件变量is_full_ 代表bq_满的, 当队列为空时消费者在该条件变量下等待。
    • 不论是生产者线程还是消费者线程,它们都是先申请到锁进入临界区后再判断是否满足生产或消费条件的,如果对应条件不满足,那么对应线程就会被挂起。但此时该线程是拿着锁的,为了避免死锁问题,在调用pthread_cond_wait函数时就需要传入当前线程手中的互斥锁,此时当该线程被挂起时就会自动释放手中的互斥锁,而当该线程被唤醒时又会自动获取到该互斥锁。

    pop()函数说明

    • 当消费空了,就不应该消费(不要竞争锁了),应该让生产者来进行生产。
    • 条件变量is_empty_ 代表bq_空的, 当队列为满时生产者在该条件变量下等待。

    总体代码实现

    #pragma once
    #include 
    #include 
    #include 
    
    namespace ns_blockqueue
    {
        const int default_cap = 5;
    
        template <class T>
        class BlockQueue
        {
        private:
            std::queue<T> bq_; //我们的阻塞队列
            int cap_;          //队列的元素上限
            pthread_mutex_t mtx_; //保护临界资源的锁
            //1. 当生产满了的时候,就应该不要生产了(不要竞争锁了),而应该让消费者来消费
            //2. 当消费空了,就不应该消费(不要竞争锁了),应该让生产者来进行生产
            pthread_cond_t is_full_; //bq_满的, 消费者在该条件变量下等待
            pthread_cond_t is_empty_; //bq_空的,生产者在该条件变量下等待
        private:
            bool IsFull()  { return bq_.size() == cap_; }
            bool IsEmpty()  {    return bq_.size() == 0;}
            void LockQueue()   {    pthread_mutex_lock(&mtx_);}
            void UnlockQueue()   {  pthread_mutex_unlock(&mtx_);   }
            void ProducterWait()
            {
                //pthread_cond_wait
                //1. 调用的时候,会首先自动释放mtx_!,然后再挂起自己
                //2. 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
                pthread_cond_wait(&is_empty_, &mtx_);
            }
            void ConsumerWait()  {    pthread_cond_wait(&is_full_, &mtx_);    }
            void WakeupComsumer()   { pthread_cond_signal(&is_full_); }
            void WakeupProducter()   {   pthread_cond_signal(&is_empty_); }      
        public:
            BlockQueue(int cap = default_cap):cap_(cap)
            {
                pthread_mutex_init(&mtx_, nullptr);
                pthread_cond_init(&is_empty_, nullptr);
                pthread_cond_init(&is_full_, nullptr);
            }
            ~BlockQueue() 
            {
                pthread_mutex_destroy(&mtx_);
                pthread_cond_destroy(&is_empty_);
                pthread_cond_destroy(&is_full_);
            }
        public:
      
            void Push(const T &in)
            {
                LockQueue();
                //临界区
                if(IsFull()){ //bug?
                    //等待的,把线程挂起,等待条件变量唤醒,我们当前是持有锁的
                    ProducterWait();
                }
                //向队列中放数据,生产函数
                bq_.push(in);
    
                // 满足条件时唤醒消费者
                if(bq_.size() > cap_/2 ) WakeupComsumer();
                UnlockQueue();
                //WakeupComsumer();
    
            }
    
            void Pop(T *out)
            {
                LockQueue();
                //从队列中拿数据,消费函数函数
                if(IsEmpty()){ //bug?
                    //无法消费,等待条件变量唤醒
                    ConsumerWait();
                }
                *out = bq_.front();
                bq_.pop();
                // 满足条件时唤醒生产者
                if(bq_.size() < cap_/2 ) WakeupProducter();
                UnlockQueue();
                
            }
        };
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    说明:

    • 判断是否满足具备生成条件或者具备消费条件时不能用if,而是应该用while:
    • 原因是pthread_cond_wait函数会造成伪唤醒的两种情况。
    • 第一种:pthread_cond_wait函数调用失败(调用失败后函数会往下执行,如果这时候就应该等待但却没有等待,就出问题了)
    • 第二种:被其他线程伪唤醒,例如调用了pthread_cond_broadcast函数。
    • 所以我们需要在循环的判断,目的就是确保线程具备生产条件或者具备消费条件。

    基于计算任务的生产者消费者模型

    上述生产者把数据交到交易场所,消费再从交易场所拿取数据,如果只是单纯的数据传送是没有意义的。我们应该有两步:1、让生产者负责生产数据(tack),让消费者拿取数据(tack)。2、生产者继续在满足条件时生产,消费者在拿取数据(tack)后运行处理数据函数。这里我们就可以看出生产者消费者模型优点了。

    Task 代码

    #pragma once
    #include 
    #include 
    class Task 
    {
    private:
        int _a;
        int _b;
        char _op; //运算符
    
    public:
        Task (int a = 0, int b = 0, char op = 0)
            : _a(a), _b(b), _op(op)
        {
        }
    
        int run()
        {
            int ret;
            switch (_op)
            {
            case '+':
                ret = _a + _b;
                break;
            case '-':
                ret = _a - _b;
                break;
            case '*':
                ret = _a * _b;
                break;
            case '/':
                ret = _a / _b;
                break;
            default:
                break;
            }
            printf("消费者:%lu %d %c %d = %d\n", pthread_self(), _a, _op, _b, ret);
            return ret;
        }
        //仿函数
        int operator()()
        {
            return run();
        }
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    消费者生产者代码

    void *consumer(void *args)
    {
        BlockQueue<Task > *bq = (BlockQueue<Task >*)args;
        while(true){
            sleep(2);
            Task data;
            bq->Pop(&data);
            //运行处理任务函数
            data();
        }
    }
    
    void *producter(void *args)
    {
        BlockQueue<Task > *bq = (BlockQueue<Task >*)args;
        const char* arr = "+-*/%";
        while(true){
            // sleep(2);
            //1. 制造数据,生产者的数据(task)从哪里来??
            int x = rand() % 100;
    		int y = rand() % 100;
    		char op = arr[rand() % 5];
    		Task t(x, y, op);
            printf("producer:%d %c %d =?\n",x,op,y);
    		bq->Push(t); //生产数据
        }
    }
    
    int main()
    {
        srand((long long)time(nullptr));
        BlockQueue<Task > *bq = new BlockQueue<Task >();
    
        pthread_t c,p;
        pthread_create(&c, nullptr, consumer, (void*)bq);
        pthread_create(&p, nullptr, producter, (void*)bq);
    
        pthread_join(c, nullptr);
        pthread_join(p, nullptr);
    
    
        return 0;
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    运行结果:

    在这里插入图片描述

    总结:

    第一步:生产者生产数据前,生产者的数据(Task)从哪里来?// 网络
    第二步:生产者将任务派发到任务队列。
    我们应该意识到数据的获取是需要时间,并且数据的处理也需要时间,时间是主要矛盾,生产者消费者模型主要解决的是如果:制造数据比较慢,在任务队列还有任务的前提下,消费者就能同步的继续消费,这里就体现了并发性了。

  • 相关阅读:
    MySQL大量脏数据,如何只保留最新的一条?
    编写一个Book类,需要进行封装,对外提供set和get方法且对页数进行控制
    [ACTF2020 新生赛] Include1
    数据挖掘对道路地理位置的修正
    数据结构——归并排序
    (小脚本) (python) 批量修改文件后缀名
    2022年最新前端面试题
    Gitlab服务器配置LDAP指导
    vue video 多个视频切换后视频不显示的解决方法
    docker启动出现Error response from daemon: Cannot restart container的报错
  • 原文地址:https://blog.csdn.net/weixin_58004346/article/details/126641359