• Linux----生产者和消费者模型


    生产者和消费者模型的概念

    优点:
    1,将生产环节和消费环节解耦。
    2,极大的提高效率。
    3,支持并发。

    321原则
    3种关系:
    1,生产者和生产者:竞争 和 互斥。
    2,消费者和消费者:竞争 和 互斥。
    3,生产者和消费者: 同步 和 互斥。

    2种角色:
    生产者和消费者:本质是执行流

    1个交易场所:一段缓冲区(缓冲区)。
    在这里插入图片描述

    基于阻塞队列下的生产者和消费者模型

    当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素,
    当队列满时,往队列里存放元素的操作也会被阻塞。
    主要练习的是条件变量,并对条件变量的操作函数进行掌握
    在这里插入图片描述

    C++代码

    amespace chen
    {
        const int default_cap = 5;
        template <class T>
        class block_queue
        {
        public:
            block_queue()
                : _cap(default_cap)
            {
                pthread_mutex_init(&_mtx, nullptr);
                pthread_cond_init(&_isfull, nullptr);
                pthread_cond_init(&_isempty, nullptr);
            }
    
            //加锁
            void pq_lock()
            {
                pthread_mutex_lock(&_mtx);
            }
    
            //解锁
            void pq_unlock()
            {
                pthread_mutex_unlock(&_mtx);
            }
            
            //让消费者等待,当队列为空的时候
            void consumer_wait()
            {
                pthread_cond_wait(&_isempty, &_mtx);
            }
            
            //唤醒消费者
            void wakeup_consumer()
            {
                pthread_cond_signal(&_isempty);
            }
    
            void product_wait()
            {
                pthread_cond_wait(&_isfull, &_mtx);
            }
    
            void wakeup_product()
            {
                pthread_cond_signal(&_isfull);
            }
    
            bool isfull()
            {
                return _cap == _bq.size();
            }
    
            bool isempty()
            {
                return _bq.size() == 0;
            }
    
            //生产者生产
            void Push(const T& in)
            { 
                //会有多个生产者线程同时访问,临界区需要加锁
                pq_lock();
    
                //为什么要while不用if呢?
                //防止挂起失败或者伪唤醒,如果是if就是直接向下执行了,但是队列任然是满的
                while(isfull()) 
                {
                    cout << "队列为满: " << _cap <<  endl;
                    product_wait();
                } 
                
                _bq.push(in);
                //整个位置说明已经有数据了,如果说消费者被挂起了,应该被唤醒
                wakeup_consumer();
                pq_unlock();
    
            }
              
            //消费者消费
            void Pop(T* out)
            {
                //多个消费者线程访问,需要加锁的
                pq_lock();
                while(isempty())
                {
                    consumer_wait();
                }
    
                *out = _bq.front();
                _bq.pop();
                //消费者已经消费了,说明有位置了,如果生产者被挂起了,这时候应该唤醒它
                wakeup_product();
                pq_unlock();
            }
    
            ~block_queue()
            {
                pthread_mutex_destroy(&_mtx);
                pthread_cond_destroy(&_isfull);
                pthread_cond_destroy(&_isempty);
            }
    
        private:
            queue<T> _bq;
            int _cap;
            pthread_mutex_t _mtx;      //保护临界资源的锁
            pthread_cond_t _isfull;      //条件变量,用来表示队列为满,消费者该消费
            pthread_cond_t _isempty;     //条件变量,表示队列为空,生产者该生产了
        };
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113

    信号量

    什么是信号量呢?
    本质就是一个计数器,描述临界资源的数量。
    临界资源本质上可以划分一个一个的小资源,通过信号量,完全可以让多个线程去
    同时访问临界资源中的不同区域,从而实现并发。

    线程操作函数:
    在这里插入图片描述

    基于循环队列下的生产者和消费者模型

    复习一下循环队列
    在这里插入图片描述
    我们如何实现基于循环队列下的生产者和消费者模型呢? 我们这里不需要多加一个位置
    利用信号量:定义两个信号量。 一个表示sem_blank 临界资源格子的数量, 一个表示sem_data临界资源数据的数量。

    同一个位置,为空或者为满,生产者和消费者需要同步互斥。
    不在同一个位置,可以并发执行。

    C++代码:

    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    using namespace std;
    
    namespace chen
    {
       template <class T>
       class ring_queue
       {
       public:
          ring_queue(int cap = 10)
              : _cap(cap) //循环队列默认容量大小为10
                ,_pstep(0)
                ,_cstep(0)
          {
             _ring_queue.resize(_cap,0);
             pthread_mutex_init(&_pmtx, nullptr);
             pthread_mutex_init(&_cmtx, nullptr);
    
             sem_init(&_blanksem, 0, 10); // 0表示线程间共享,10是个数
             sem_init(&_datasem, 0, 0);
          }
    
          void Push(const T &val)
          {
             //申请信号量
             sem_wait(&_blanksem); // p()操作 --
    
             pthread_mutex_lock(&_pmtx);
             _ring_queue[_pstep] = val;
             cout << "生产下标" << _pstep << endl;
             _pstep++;
             _pstep %= _cap;
             pthread_mutex_unlock(&_pmtx);
             
    
             sem_post(&_datasem); // v()操作 ++
          }
    
          void Pop(T *out)
          {
             sem_wait(&_datasem);
             
             //要多消费者,需要加锁 ,step相当于是临界资源了
             pthread_mutex_lock(&_cmtx);
             *out = _ring_queue[_cstep];
    
             cout << "消费下标" << _cstep << endl;
    
             _cstep++; 
             _cstep %= _cap;
             pthread_mutex_unlock(&_cmtx);
    
             sem_post(&_blanksem);
          }
    
          ~ring_queue()
          {
             pthread_mutex_destroy(&_pmtx);
             pthread_mutex_destroy(&_cmtx);
    
             sem_destroy(&_blanksem);
             sem_destroy(&_datasem);
          }
    
       private:
          vector<T> _ring_queue;
          int _cap;
    
          //两把锁,多生产,多消费,因为pstep,_cstep对于生产者和消费者是临界资源
          pthread_mutex_t _pmtx;
          pthread_mutex_t _cmtx;
    
          sem_t _blanksem; //信号量,格子的个数,生产者关注的
          sem_t _datasem;  //信号量,数据的个数,消费者关注的
    
          int _pstep; //下标位置
          int _cstep;
       };
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    线程池的概念

    在这里插入图片描述

    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    
    using namespace std;
    
    namespace chen
    {
        template<class T>
        class thread_pool
        {
        public:
               thread_pool()
               :_num(5)
               {
                   pthread_cond_init(&_cond, nullptr);
                   pthread_mutex_init(&_mtx, nullptr);
               }
               
               void Lock()
               {
                  pthread_mutex_lock(&_mtx);
               }
    
               void Unlock()
               {
                  pthread_mutex_unlock(&_mtx);
               }
    
               void Wait()
               {
                  pthread_cond_wait(&_cond, &_mtx);
               }
    
               void Wakeup()
               {
                  pthread_cond_signal(&_cond);
               }
    
              // 在类中要让线程执行类内成员方法,是不行的,因为this指针要占用第一个传参位置
              // 必须让线程执行静态方法,
               static void* pthread_run(void* args)
               {
                   pthread_detach(pthread_self()); //线程分离,不需要等待。
    
                   thread_pool<T>* tp = (thread_pool<T>*)args;
    
                   while(true)
                   {
                        tp->Lock();
                        while(tp->_task_queue.empty())
                        {
                            tp->Wait();
                        }
                          
                        tp->pop_task(); 
    
                        tp->Unlock();
    
                        sleep(5);
                   }
                    
               }
    
               void init_thread_pool()
               {
                   pthread_t tid;
                   for(int i = 0; i < _num; i++)
                   {
                       pthread_create(&tid, nullptr, pthread_run, (void*)this); //这里必须要传参
                   }
               }
    
               void push_task(const T& in)
               {
                    //多个线程可能会产生任务,也是需要加锁的,为了方便,这里只用一个线程产生任务
        
                    _task_queue.push(in);
                 
                     
                    Wakeup(); //可以唤醒消费线程来执行了
               }
    
               void pop_task()
               {
                  // *out = _task_queue.front();
                   _task_queue.pop();
                   cout << "我是线程:" << pthread_self() << "正在处理任务" << endl;
               }
    
    
               ~thread_pool()
               {
                   pthread_cond_destroy(&_cond);
                   pthread_mutex_destroy(&_mtx);
               }
        private:
                int _num;
                queue<T> _task_queue;
                pthread_cond_t  _cond; //当任务队列中没有任务的时候,所有的线程都要挂起等待。
                pthread_mutex_t _mtx;  //保护临界区
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    读者-写者模型

    适合的是多读少写的场景
    写独占,读共享,读锁优先级高

    321原则
    3:3种关系:
    读者和读者 (无关系,因为读者不会取走数据,与消费者生产者模型不同,是因为消费者会取走资源)
    写者和写者:互斥
    读者和写者:互斥,同步

    2:2种角色 2个执行流(线程)

    1个交易场所:一段缓冲区 或者一个STL容器(queue,vector)

    优先级的概念:
    读者优先:当读者和写者同时到来的时候,让读者先访问。

    写者优先:当读者和写者同时来的时候, 当前读者和将要晚来的读者都要等待,都不进入临界区,当临界区没有读者的时候
    该写者才进入临界区。

    自旋锁

    在这里插入图片描述

  • 相关阅读:
    JSP学生交流论坛系统myeclipse开发mysql数据库bs框架java编程jdbc
    线性代数与解析几何——Part4 欧式空间 & 酉空间
    【计算机网络】IP协议第二讲(Mac帧、IP地址、碰撞检测、ARP协议介绍)
    Redis学习笔记——操作Hash
    OPPO手机如何添加日程桌面插件?
    django自动生成问卷表的软件的设计与实现毕业设计源码291138
    扫描点读笔搭载北京君正X2000多核异构跨界处理器的案例
    渗透测试流程是什么?7个步骤给你讲清楚!
    分布式之日志系统平台ELK
    Python第三方库之MedPy
  • 原文地址:https://blog.csdn.net/CL2426/article/details/127759446