• 【Linux】详解线程第三篇——线程同步和生产消费者模型


    在这里插入图片描述
    本篇博客由 CSDN@先搞面包再谈爱 原创,转载请标注清楚,请勿抄袭。

    前言

    本篇线程同步的内容是完全基于线程互斥来讲的,如果屏幕前的你对于线程互斥还不是很了解的话,可以看看我上一篇博客:【Linux】详解线程第二篇——用黄牛抢陈奕迅演唱会门票的例子来讲解【 线程互斥与锁 】

    正式开始

    上篇线程互斥中重点讲了互斥锁,虽然解决了多线程并发导致的临界资源不安全的问题,但是还存在一个比较重要的问题:访问临界资源合理性的问题。

    再次用黄牛抢票来讲解线程同步的思想

    再举一下我上篇博客中黄牛抢票的例子。

    上一篇博客的例子中,只有黄牛和票这两个元素,对应的就是线程和临界资源,既然互斥锁已经讲了,那么就能多一个锁这个元素了,也就可以理解为多了一个售票员。所以这里一共三个主要元素:黄牛(线程)、票(临界资源)、售票员(锁)。

    前面博客中我未加usleep对黄牛买票进行限制的例子中,出现了一个黄牛将所有票抢完的例子,也就是说整个流程中,只有一个线程对临界资源进行了访问,其他的线程虽然想要访问临界资源但是都没有访问到,这种情况不能说有错误,只能说设计的不合理,会造成其他线程的饥饿问题,一个人把所有的活全包了,其他人挣不到钱,就没饭吃了,虽然这个例子有点极端。但是确实是一个问题。

    还有一个问题,假如此刻票被抢完了,但是票卖完后隔一段时间还会再次补票,但是无法确定票补充的时间,会在随机时刻进行补发(临界资源未准备就绪,但有可能随意时刻准备好)。这样的话,所有的黄牛都想抢票,但是都不知道什么时候会补票,于是所有的黄牛无时无刻都在问售票员票是否补发,这样的话,就会很浪费所有黄牛和售票员的时间,也即浪费所有线程向锁申请资源的时间,导致运行效率下降。虽然没做错,但是不合理。

    上面这两个问题都是访问临界资源的合理性的问题。而引入线程同步就是为了解决这个问题。

    想一想我们现实生活中是怎样售票的,当票卖完后,我们不需要一直询问售票员是否有票,只要等待通知即可,比如说12306,补到票了会通知你,当你进行补票的时候,是会排队的,也就是为什么你点击补票会显示当前人数是多还是中等还是少的信息。如果你补的比较早,那么你排队就会靠前一点,同理,补得比较晚,就会排的靠后一点。这里最重要的一点就是排队,使得整个流程有了一定的顺序。也就是让所有的线程按照一定的顺序去访问临界资源。这里排队可以解决一部分问题,也就是不断询问是否有票的问题。

    那么还有一个黄牛将所有的票抢完的问题。再拿12306来说,我们每个人只能买一张票,如果想买多张,就要多给一个乘员信息,比如说你给你自己买了票,还想给你朋友买票的话,得要你朋友的身份证号码等信息。也就票和人是一对一的。那这里搞得极端一点,规定每次让黄牛只能买一张票,买完票后禁止继续买票,再加上上面的队列,如果卖完票的黄牛还想再买票,就必须去队尾重新排队购买。

    上面其实已经把线程同步的思想讲解出来了,不过都是以黄牛的角度来说的,这里改口成线程来总结一下:

    1. 所有的线程想要访问临界资源时,都必须排队。
    2. 访问完临界资源的线程,若想要继续对临界资源进行访问,就必须跑到队尾等待其前面的线程访问完才能轮到当前线程。

    这样让访问临界资源的线程,按照一定的顺序进行临界资源的访问,就是线程同步最重要的思想。

    那么如何实现线程同步呢?

    通过条件变量来实现线程同步

    我们在申请临界资源前,先要做临界资源是否存在的检测,而检测也是需要访问临界资源的,那么对临界资源的检测也是一定需要在加锁和解锁之间的。

    我把前面博客中的的例子改一改,加上补票机制:

    #include 
    using namespace std;
    
    #include 
    #include 
    #include 
    
    #include 
    
    // 线程数据,包含线程名字,也就是黄牛名字,还有线程对应的互斥锁
    class ThreadData
    {
    public:
        ThreadData(const string& name, pthread_mutex_t* pmtx)
            :_name(name),
             _pmtx(pmtx)
        {}
    
    public:
        string _name;
        pthread_mutex_t* _pmtx;
    };
    
    // 黄牛人数
    #define THREAD_NUM 5
    
    // 陈奕迅演唱会的票数tickets
    int tickets = 1000;
    
    // 黄牛抢票执行的方法
    void* getTicket(void* arg)
    {
        ThreadData* ptd = (ThreadData*)arg;
        // 每个黄牛疯狂抢tickets
        while(1)
        {
            pthread_mutex_lock(ptd->_pmtx);
            if(tickets > 0)
            {
                usleep(rand() % 2000);
                tickets--;
                printf("%s抢到票了, 此时ticket num ::%d\n", ptd->_name.c_str(), tickets);
                
                // 当对临界资源访问完后就解锁
                pthread_mutex_unlock(ptd->_pmtx);
            }
            else
            {
                // 当对临界资源访问完后就解锁,这里是当tickets == 0的情况,也要解锁
                pthread_mutex_unlock(ptd->_pmtx);
                printf("%s没抢到票, 票抢空了\n", ptd->_name.c_str());
            	// 这里不再让黄牛抢不到票就退出,而是继续检查是否有票
            }
            
            // 抢到或没抢到票都执行一下后续动作,这里直接用usleep替代
            usleep(rand() % 5000);
        }
    
    
        // 记得要释放掉线程数据,不然内存泄漏
        delete ptd;
    
        return nullptr;
    }
    
    int main()
    {
        // 局部锁
        pthread_mutex_t mtx;
        // 默认给空就行
        pthread_mutex_init(&mtx, nullptr);
    
        // 种一颗随机数种子
        srand((unsigned int)time(nullptr) ^ getpid() ^ 0x24233342);
        // 假设此时有三个黄牛进行抢票
        pthread_t tid[THREAD_NUM];
    
        for(int i = 0; i < THREAD_NUM; ++i)
        {
            string tmp;
            tmp += to_string(i + 1) + "号黄牛";
            ThreadData* ptd = new ThreadData(tmp, &mtx);
    
            // 每个黄牛去抢票
            pthread_create(tid + i, nullptr, getTicket, (void*)ptd);
        }
    
    	// 补票机制
    	while(1)
    	{
    		if(tickets == 0)
    		{
    			cout << "票抢光了,准备补票中..." << endl;
    			sleep(rand() % 10);
    			tickets = 500;
    			break;
    		}
    		sleep(1);
    	}
    
        // 等待每个黄牛抢完票后退出
        for(int i = 0; i < THREAD_NUM; ++i)
        {
            pthread_join(tid[i], nullptr);
        }
    
        pthread_mutex_destroy(&mtx);
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109

    运行:
    我截了三张图,一张是第一次票抢光之后准备补票:

    在这里插入图片描述

    第二张是票补上了之后:
    在这里插入图片描述

    第三张是补票抢完之后:
    在这里插入图片描述

    这里可以发现,当票抢光之后,5个线程一直在查询当前是否有票,正如开始所说的那样。

    这里的代码,会在 if(tickets > 0) 前进行加锁,因为tickets > 0就访问了临界资源,没票之后会补票,但是线程不会退出,而是一直在查询是否补票,这样效率就很低,一直循环着申请锁和解锁,虽然没错,但是不合理。故这种常规方式检测条件也就注定了其必须频繁的申请加锁和解锁。

    但是我们可以改一改:

    1. 不要让线程再频繁的去自己检测临界资源是否准备就绪,如果未准备就绪就让当前线程等待,也就是进入S状态。
    2. 当临界资源就绪的时候,再唤醒想要访问临界资源的线程。

    这样效率就会大大提高。

    想要实现这个功能的话,可以通过条件变量来实现。

    条件变量接口介绍

    条件,condition,pthread库中用起来条件变量的接口都是中间加上cond。

    初始化和销毁

    定义一个条件变量和定义一个锁一样,也分全局和局部,初始化也是全局可以直接用宏,局部要用init:
    在这里插入图片描述

    全局初始化的宏:
    在这里插入图片描述

    局部初始化的时候第一个参数就是条件变量的地址,第二个参数给空让它以默认方式初始化就行。
    还是,以pthread开头的接口返回值绝大部分都是正确返回0,错误返回错误码。

    destroy,就是销毁条件变量,没啥好讲的。

    pthread_cond_wait

    在这里插入图片描述
    就是传一个条件变量指针,然后一个锁,为啥要传一个锁后面再说。只要调用了这个函数的线程就会进入阻塞状态,也就是从运行队列进入了等待队列中。这里就相当于是黄牛开始排队了,而且注意,这里的队伍是在等待cond这个条件准备就绪,也就是说,这里的队伍是专门为cond开辟的一个队,不同于普通的等待队列。

    这里就相当于是前面的if判断资源是否存在了,但是是直接让线程进入等待队列中。

    还有一个timewait,多了一个参数,前两个参数和wait一样,第三个参数是一个时间,让线程等待时,当第三个参数一到会自动醒来。

    signal和broadcast

    在这里插入图片描述

    上面的pthread_cond_wait是让线程进入到与cond相关的等待队列中,当signal被调用时,就会有一个线程出队,就相当于是等待资源准备就绪了,此时就会唤醒一个线程。不过这里前提是signal的参数中的cond要和wait的参数中的cond指向是一样的。不匹配就无法唤醒wait对应cond的队列中的线程。broadcast,广播的意思,当调用这个函数时,会将cond对应等待队列中的所有线程都唤醒,此时所有的线程会按照顺序出队。

    下面我写一个全新的例子来演示一下这里的cond用法:

    // 线程个数
    const static int THREAD_NUM = 4;
    
    // 不同线程的执行方法
    /**********************************************************************************/
    void Thread_1_Func(const string& name)
    {
        cout << name << "is doing" << " 加密工作" << endl;
    }
    
    void Thread_2_Func(const string& name)
    {
        cout << name << "is doing" << " 持久化工作" << endl;
    }
    
    void Thread_3_Func(const string& name)
    {
        cout << name << "is doing" << " 查询工作" << endl;
    }
    
    void Thread_4_Func(const string& name)
    {
        cout << name << "is doing" << " 管理工作" << endl;
    }
    /**********************************************************************************/
    
    // 函数指针,指向线程所要执行的函数
    typedef void(*pfunc)(const string& name);
    
    // 每个线程最有用的数据
    class ThreadData
    {
    public:
        ThreadData(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond, pfunc pf)
            :_name(name),
             _pmtx(pmtx),
             _pcond(pcond),
             _pf(pf)
        {}
    
    public:
        string _name; // 线程名
        pthread_mutex_t* _pmtx; // 锁
        pthread_cond_t* _pcond; // 条件变量
        pfunc _pf; // 回调函数
    };
    
    // 判断线程是否退出
    static bool quit = false;
    
    // pthread_create的回调方法
    void* ThreadRoutine(void* arg)
    {
        ThreadData* ptd = (ThreadData*)arg;
    
        while(!quit)
        {
            // 访问临界资源前上锁
            pthread_mutex_lock(ptd->_pmtx);
    
            // 相当于if判断,此时线程直接阻塞
            pthread_cond_wait(ptd->_pcond, ptd->_pmtx);
            
            if(!quit)
            {
                // 去调用线程对应的方法
                ptd->_pf(ptd->_name);
            }
            else
            {
                // 退出
                cout << ptd->_name << " quit" << endl;    
            }
    
            // 访问完后解锁
            pthread_mutex_unlock(ptd->_pmtx);
        }
    
    
        // 记得释放传来的对象,不然内存泄漏了
        delete ptd;
        ptd = nullptr;
    
        return nullptr;
    }
    
    int main()
    {
        // 局部条件变量
        pthread_cond_t cond;
        pthread_cond_init(&cond, nullptr);
    
        // 局部锁
        pthread_mutex_t mtx;
        pthread_mutex_init(&mtx, nullptr);
    
        // 各个函数执行的方法
        pfunc funcs[THREAD_NUM] = 
        { Thread_1_Func
        , Thread_2_Func
        , Thread_3_Func
        , Thread_4_Func
        };
    
        // 多个线程
        pthread_t tids[THREAD_NUM];
    
        // 创建THREAD_NUM个线程
        for(int i = 0; i < THREAD_NUM; ++i)
        {
            ThreadData* tmp = new ThreadData(to_string(i + 1) + "号线程", &mtx, &cond, funcs[i]);
            pthread_create(tids + i, nullptr, ThreadRoutine, (void*)tmp);
        }
    
        cout << "prepare to do the jobs" << endl;
    
        sleep(1);
        cout << "start doing jobs" << endl;
    
        // 发signal,让等待队列中的线程执行其方法
        int count = 0;
        while(count != 8)
        {
            pthread_cond_signal(&cond);
            ++count;
            sleep(1);
        }
    
        quit = true;
        // quit改为true时,其他线程已经在等待队列中了,
        //得让各个线程都执行其一次方法才会循环上去判断quit改变了
        pthread_cond_broadcast(&cond);
    
        cout << "jobs done" << endl;
    
        for(int i = 0; i < THREAD_NUM; ++i)
        {
            pthread_join(tids[i], nullptr);
        }
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142

    上面的代码中,各线程执行pthread_cond_wait就会进入cond对应的等待队列中,当main线程执行一次pthread_cond_signal就会唤醒一个线程。

    运行起来:
    在这里插入图片描述

    这里用的是signal来唤醒各个线程的,还可以用broadcast:
    在这里插入图片描述
    这样一次就会唤醒一批线程。

    运行:
    在这里插入图片描述

    生产消费者模型

    带着两个问题来讲一讲生产消费者模型:

    1. 条件满足的时候我们再唤醒指定的线程,但是我们怎么知道条件是否满足?
    2. 互斥所在线程同步中的意义,以及为何将pthread_cond_wait放在加锁和解锁之间?

    学习生产消费者模型可以帮助我们解决第一个问题。
    在编写生产消费者模型的某种场景的代码时,可以帮助我们理解互斥锁的意义。

    下面来举一个生活中的生产消费者模型的例子。

    现在有一个超市,屏幕前的你是一名消费者。

    超市中的商品,并不是由超市直接提供的,而是由供应商提供的,超市本质上是一个商品缓冲区。如图:
    在这里插入图片描述
    上图中,有【两种角色:消费者和生产者。一个交易场所。】

    我们买东西时不会去供应商处买,而是去超市,这样能够提高效率。假如说消费者想要买一包方便面(下面都以方便面来说),如果直接去供应商那里买的话,供应商还要开机器给你做一包,这样对于供应商来说成本太高了,一包就得开一次机器,电费都比那一包方便面贵,没这个必要。直接一次性生产一大堆,然后提供给超市,这样,想要买方便面的消费者就去超市。这样就不会浪费那么多的人力和物力去一包一包的生产方便面。

    有了超市,供货商只负责生产,不用为消费者准备东西。消费者只需要去超市,不用再跑到供货商处,这样逻辑上就实现了解耦。

    三种关系

    来说说各角色间的关系。

    • 生产者和生产者

    假设图中的供应商都生产的是同种资源,比如说都生产的是方便面,不过品牌不一样,这家是康师傅的,这家是今麦郎的……,那么各个生产者之间就是竞争关系,用计算机术语来说的话,生产者和生产者之间是互斥关系。

    • 消费者和消费者

    如果说疫情前快要封城了,所有的居民都要去超市抢购物资,此时超市的方便面已经快被抢空了,极端点,就剩一包了,那么有很多的居民都想要这一包方便面,居民也就是消费者,此时的消费者和消费者之间就是竞争关系,都去竞争那一包方便面,还是用计算机术语来说。在资源很少而需要同种资源的消费者很多的情况下,消费者之间是存在互斥关系的。

    上面这两种互斥还可以这样来解释:

    超市的空间是一个共享资源,比如说某一货架,不能让供应商全部都抢着去摆放资源,这样物品会放乱,同种不同类的方便面都摆放在一起,这样会造成混乱。也不能让顾客去某一空间抢着争夺资源,这样可能会导致消费者本来想拿康师傅但拿成了今麦郎。所以必须要保证生产者生产的过程是安全的,消费者消费的过程也要是安全的。

    • 故生产者之间是互斥关系,消费者之间也是互斥关系。
    • 生产者和消费者

    生产者生产时不能让消费者消费,不然数据未传输完毕时,部分数据已经被消费者拿走,且消费者不会再次消费,这样就会导致生产者和消费者数据不一致问题。比如说一包方便面只造了不到一半就被消费者拿走,此时生产者仍然在生产,而且从开始到结束不会停止,直到生产完整一包方便面才停止,所以生产者所知道的信息是其会生产一整包方便面,但是消费者把一半拿走了,消费者得到的信息是其只拿到了半包方便面,此时就可以说二者的数据不一致。所以说生产者和消费者之间存在互斥关系。

    再比如说过年期间,超市生意非常好,所有的居民都忙着进年货,当超市中的某一种商品被抢空时就要通知对应的生产者去生产,如果未通知就会出现本篇刚开始将的抢票问题,会不断有消费者询问是否有商品被补充。同样,当超市商品已摆放满了,也要通知生产者停止生产。故当缺资源的时候通知生产者生产,当资源补充上来时就通知消费者消费;当资源盈余时就通知生产者停止生产,同时刺激消费者消费。即同步。所以生产者和消费者之间也存在同步关系。

    所以说生产者和消费者之间存在同步和互斥两种关系。

    那么我们写代码的时候怎么编写一个生产消费者模型呢?
    只需要掌握住三个原则即可。

    1. 一个交易场所。
    2. 两个角色——生产者和消费者。
    3. 三种关系,生生(互斥)、消消(互斥)、生消(同步和互斥)。

    不过第三点有个特例,当只有一个生产者 和 一个消费者时就只需要维护生消这一种关系,这一点后面会再谈,这里先暂不考虑。

    通过锁和条件变量来体现出三种关系。

    用基本工程师思维再次理解

    代码中我们要用线程来体现出生产者和消费者,也就是要给线程进行角色划分。

    超市是用某种数据结构来表示的缓冲区。

    商品即某些数据。

    超市里是否有新增货物,生产者最清楚。因为生产者一成功生产就会新增货物。
    超市里剩余多少空间让生产者生产,消费者最清楚。因为消费者每次消费都会新增空余空间。

    所以这里就可以解决最初说的第一个问题:条件满足的时候我们再唤醒指定的线程,但是我们怎么知道条件是否满足?

    生产者生产商品后就可立马通知消费者来消费,消费者将商品拿走后就可通知生产者去生产。故可以让生产者线程和消费者线程互相同步完成对应的生产消费者模型。这句话中的通知就是唤醒。加粗的字样就是对这个问题的解答。

    再来看一个图:
    在这里插入图片描述

    这里要强调一点,生产和消费的过程不仅仅是从生产者生产到仓库再让消费者拿走。重要的点不在这,而是生产者从获取到数据开始生产和消费者拿掉数据开始处理的两个过程,中间传递数据的过程耗时是非常短的,至于为什么等会写代码的时候就知道了。

    我前面讲进程间通信的时候说过:进程间通信的本质是让两个进程看到同一块空间。
    校正一下:进程间通信的前提是让两个进程看到同一块空间,进程间通信的本质就是生产消费者模型。比如说管道,自带同步和互斥的属性,正常情况下一端写一端读,当管道为空的时候读端会阻塞,当管道满的时候写端会阻塞。这里就和生产消费者模型很像,写端读入数据后就让读端去读,读端读好了数据后就让写端去写。当写端关闭时读端就相当于读到了文件末尾,read会返回零;当读端关闭的时候写端直接出错,进程就退出了。
    【注】如果我这里讲的内容你不太会的话,可以看看我这篇博客:【Linux】进程间通信(匿名管道、命名管道、共享内存等,包含代码示例)

    基于生产消费者模型的阻塞队列

    下面就来写写生产消费者模型的代码。

    我会写两个版本,第一个版本细节比较少,第二个版本会基于第一个版本稍微进行一点优化。

    版本一

    先来说说大致思路:

    1. 一个交易场所,前面超市的那个例子说了,超市就是用某个数据结构表示的缓冲区,这里我就以队列来表示这个缓冲区,不过我不会再自己手搓一个队列,直接用STL库的那个,如果有同学不懂STL的队列,可以看看这篇:【C++】STL栈和队列基本功能介绍、题目练习和模拟实现(容器适配器)

    2. 两种角色,生产者和消费者的表示,这里我就先来简单一点的,一个生产者的线程和一个消费者的线程。后面的那个例子再来多个生产者和消费者。

    3. 3种关系中,生生、消消、生消都要保持两两互斥,生消还要有一个同步的关系。我们可以用一个类来实现,其中可以用一个锁来表示所有的互斥,用两个条件变量来表示生消的同步,这个类还可以将第一点中的队列包含在在内。

    先把这个类简单给出来:

    
    template<class T>
    class CPqueue
    {
    public:
        // 构造
        CPqueue(int capacity)
            : _capacity(capacity)
        {
        	// 互斥锁、条件变量都要用接口来初始化
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_full, nullptr);
            pthread_cond_init(&_empty, nullptr);
        }
        
    private:
        // 用队列这个数据结构来表示
        std::queue<T> _q;
        // 超市中能够存放的最大容量
        int _capacity;
        // 判断超市是否已经放满了
        pthread_cond_t _full;
        // 判断超市是否是空的
        pthread_cond_t _empty;
        // 互斥锁,用来两两互斥
        pthread_mutex_t _mtx;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    上面的两个条件变量,一个是判断当前队列中存放的数据满了没有,一个是判断当前队列中是否空了。

    如果满了,就得让生产者线程阻塞,不能继续生产了,并通知消费者来消费,等接到生产信号了再去生产;
    如果空了,就得让消费者线程阻塞,不能继续消费了,并通知生产者去生产,等接到消费信号了再去消费。

    而这里对应的生产的动作就是往队列中push数据,消费的动作就是从队列中pop数据。

    所以要提供两个接口,一个是让生产者push的,一个是让消费者pop的。

    下面的代码是在CPqueue中的:

    // 判断队列是否为空
    bool IsEmpty()
    {
        return _q.size() == 0;
    }
    
    // 判断队列是否已满
    bool IsFull()
    {
        return _q.size() == _capacity;
    }
    
    void PushData(const T& data)
    {
        // 进来先上锁
        pthread_mutex_lock(&_mtx);
    
        /* 上了锁之后先判断临界资源是
        否准备就绪,也就是队列是否满了*/
        if(IsFull()) pthread_cond_wait(&_full, &_mtx);
    
        // 到此处就说明队列不满,就可以push数据了
        _q.push(data);
    
        // push完了先发送让消费者消费的信号
        pthread_cond_signal(&_empty);
    
        // 解锁
        pthread_mutex_unlock(&_mtx);
    }
    
    void PopData(T& data)
    {
        // 先上锁
        pthread_mutex_lock(&_mtx);
    
        /* 上锁后,先判断临界资源是
        否准备就绪,也就是队列是否为空*/
        if(IsEmpty()) pthread_cond_wait(&_empty, &_mtx);
    
        // 到此处说明队列不为空,就可以pop了
        data = _q.front(); // 先拿数据再pop
        _q.pop();
    
        // pop完了发送让生产者生产的信号
        pthread_cond_signal(&_full);
        
        // 解锁
        pthread_mutex_unlock(&_mtx);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    这里就是同步和互斥的逻辑。包含了一个交易场所和三种关系中的生消同步和互斥。

    下面来写生产者和消费者的线程:

    // 生产者执行的方法
    void* Productor(void* arg)
    {
        CPqueue<int>* pq = (CPqueue<int>*)arg;
        
        int data = 10;
        while(1)
        {
            std::cout << "productor send data ::" << data << std::endl;
            pq->PushData(data);
            data++;
            sleep(1);
        }
    }
    
    // 消费者执行的方法
    void* Consumer(void* arg)
    {
        CPqueue<int>* pq = (CPqueue<int>*)arg;
    
        while(1)
        {
            int data = 0;
    
            pq->PopData(data);
            std::cout << "consumer get data ::" << data << std::endl  << std::endl;
        }
    }
    
    int main()
    {
        CPqueue<int>* pq = new CPqueue<int>(6);
    
        // 消费者线程
        pthread_t consumerThread;
        // 生产者线程
        pthread_t productorThread;
    
        // 消费者线程初始化
        pthread_create(&consumerThread, nullptr, Consumer, (void*)pq);
        // 生产者线程初始化
        pthread_create(&productorThread, nullptr, Productor, (void*)pq);
    
    	// 等待从线程退出
        pthread_join(consumerThread, nullptr);
        pthread_join(productorThread, nullptr);
    
        delete pq;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    运行起来效果就是这样的:
    在这里插入图片描述

    还可以控制消费者的消费时间点,比如说队列满了再让消费者消费:
    在这里插入图片描述

    运行:
    在这里插入图片描述
    同样的还可以队列装一半了再退出,这里就不演示了。

    这里可以说一下pthread_cond_wait的作用了:

    • pthread_cond_wait是在临界区中的,来一个问题:说如果线程等待了,会持有锁等待吗?

      答案是不会的,pthread_cond_wait的第二个参数是一把锁的指针,意义在于,当pthread_cond_wait调用成功后,传入pthread_cond_wait的锁会被自动解开,所以不用担心线程在pthread_cond_wait的时候会带锁wait。

      线程阻塞并恢复之后,从哪里阻塞就会从哪里唤醒,也就是pthread_cond_wait这里,当线程被唤醒时,pthread_cond_wait会自动帮助线程申请其本来调用pthread_cond_wait成功时解开的那把锁,因线程被唤醒时仍然在临界区中。

    不过这里唤醒有多种情况,就先说一个生产者和一个消费者的。
    比如说我这里代码写的是先唤醒对方线程,再解锁:
    在这里插入图片描述

    当另一方醒来时wait会自动申请锁,但此时锁是被当前方的线程占用的,所以另一方又会在锁上面进行阻塞等待(申请锁时,若锁被占用,则当前线程就阻塞等待锁资源),等到当前方解锁后,另一方就会自动拿到锁。

    先解锁后唤醒时,比如这样:
    在这里插入图片描述
    此时就是另一方被唤醒时锁未被占用,那么就会直接得到锁。

    这里是单生产者和单消费者的,还有多生产者和多消费者的情况,同理,不过是在锁上等待的线程会更多,这里就不再多讲了。

    需要注意的是,唤醒和解锁的先后顺序是都可以的,只要发生了生成——消费这一行为即可。
    不过我个人更推荐先唤醒后解锁。

    前面说了,生产消费者模型能够提高工作效率。如果单从生产 -> 消费的角度来看问题的话,其实这里并没有什么效率上的提高。因为这一步只是进行了简单的拷贝而已,真正的效率提高在生产者和消费者可同时工作这一点上。生产者生产完后,消费者去取数据并执行后续操作,在这个过程中,生产者还可以继续接受生产的任务,比如网络发来的数据 / 标准输入的数据,接收好就送到缓冲区(超市)中,重在二者可以并发执行。就像餐馆里面的大厨和服务员一样,大厨做好饭后不需要亲自将饭送到餐桌上,而是让端盘的服务员去送,服务员送的同时,大厨仍可继续做饭,服务员送完菜后也可
    处理顾客的其他要求(拿酒、盛米饭等)。这样服务员和大厨间的工作是互不影响的,效率就高了。

    真正的时间消耗在生产者获取到数据过程 和 消费者获取到数据后的后续处理过程,中间的拷贝耗时相对来说是非常短的,当消费者的后续处理动作很耗时时,可以搞多个消费者线程并发执行该动作(总线程个数尽量不要超过CPU核数,相等最好),这样效率会更高。

    版本二
    • pthread_cond_wait是一个函数,只要是一个函数就可能会调用失败,拿push来说:
    if(IsFull()) pthread_cond_wait();
    
    • 1

    pthread_cond_wait是有返回值的,是一个int,调用失败了返回的是一个错误码。
    在这里插入图片描述

    如果这里pthread_cond_wait调用失败了,那就可能导致队列是满的但是代码仍向下执行了,就会超出规定的队列容量(STL的queue中会自动扩容,我们写的那个capacity只是我们自己规定的,如果出现不合法的行为也是不好检测的),那么这样就不合理了。pthread_cond_wait还可能存在伪唤醒的情况,意思就是条件变量_full/_empty并不满足条件但是当前线程被唤醒了。比如若其他线程误操作发送了信号,就会导致当前线程跳出其所阻塞的队列中,并进行后续操作,也是不合理的。

    所以要把if改一改,换成while:
    在这里插入图片描述

    这样就算调用失败了,或者伪唤醒了,都会再上去判断一次IsFull(),此时如果队列还是满的,就会再次调用pthread_cond_wait,若还失败了,又会上去再判断……直到真正被唤醒并且队列不是满的了就会跳出while循环,这里push是这样,pop也是同理的:
    在这里插入图片描述

    故当跳出while后,后面的代码是一定能正确的进行生产和消费的。

    还有这里的阻塞队列不用关心哪一个线程先执行。
    如果是生产者先执行的话,就直接生产即可。
    如果是消费者先执行的话,队列为空,会阻塞住,此时就还会让生产者来生产。
    所以逻辑上,一定是能让生产者先往队列中生产者东西的。

    注意我上面代码中用了模版,而且上面演示的类型是int,这里我要改一改了,改成一个自定义类型。简单实现一个计算器,其成员变量为两个值,一个function包装器(如果有屏幕前的你对包装器不了解的话,可以看这篇:【C++】C++11中比较重要的内容介绍):

    // 对function包装器重命名一下
    typedef std::function<int(int, int)> func;
    
    class Caculator
    {
    public:
        Caculator(){}
    
        Caculator(int x, int y, func fun)
            : _x(x)
            , _y(y)
            , _fun(fun)
        {}
    
        int operator()()
        {
            return _fun(_x, _y);
        }
    
    public:
        int _x;
        int _y;
        func _fun;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    阻塞队列的代码不变。线程执行方法变一下:

    // 存放各个函数的做法和接口
    std::vector<std::pair<char, func>> kv;
    
    // 这里搞了4个计算功能,加减乘除
    const int KVSIZE = 4;
    
    // 生产者执行方法
    void* Productor(void* arg)
    {
        CPqueue<Caculator>* pq = (CPqueue<Caculator>*)arg;
        
        while(1)
        {
       		// 随机分配给消费者任务
            int task = rand() % KVSIZE;
            func fun = kv[task].second;
            int x = rand() % 200;
            int y = rand() % 500;
            std::cout << "productor send task ::" << task << "-->" << x << (kv[task].first) << y << '=' << '?' << std::endl;
    
            pq->PushData(Caculator(x, y, fun));
            sleep(1);
        }
    }
    
    // 消费者执行方法
    void* Consumer(void* arg)
    {
        CPqueue<Caculator>* pq = (CPqueue<Caculator>*)arg;
    
        while(1)
        {
            Caculator tmp;
            pq->PopData(tmp);
            std::cout << "consumer get task ::" << tmp() << std::endl  << std::endl;
        }
    }
    
    int main()
    {
        srand((unsigned int)time(nullptr) ^ getpid() ^ 0x323424);
        func MyAdd = [](int x, int y){ return x + y; };
        func MySub = [](int x, int y){ return x - y; };
        func MyMul = [](int x, int y){ return x * y; };
        func MyDiv = [](int x, int y){ return x / y; };
    	
    	// 这里用到了lambda表达式,如果屏幕前的你不懂,可以看看我刚刚给的那个链接
        kv.push_back(std::pair<char, func>('+', MyAdd));
        kv.push_back(std::pair<char, func>('-', MySub));
        kv.push_back(std::pair<char, func>('*', MyMul));
        kv.push_back(std::pair<char, func>('/', MyDiv));
    
        CPqueue<Caculator>* pq = new CPqueue<Caculator>(4);
    
        // 消费者线程
        pthread_t consumerThread;
        // 生产者线程
        pthread_t productorThread;
    
        // 消费者线程初始化
        pthread_create(&consumerThread, nullptr, Consumer, (void*)pq);
        // 生产者线程初始化
        pthread_create(&productorThread, nullptr, Productor, (void*)pq);
    
        pthread_join(consumerThread, nullptr);
        pthread_join(productorThread, nullptr);
    
        delete pq;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    再加上阻塞队列,运行起来就是这样:
    在这里插入图片描述

    这里除法有一丁点问题,我就不改了,各位想改的自己动手试试,改正浮点数就行。

    多生多消

    这里搞简单点,就两个生产者、两个消费者:
    在这里插入图片描述

    其它代码就只用改消费者打印换行一次,所有从线程打印一下线程id就行。

    运行:
    在这里插入图片描述

    可以看到,有两个消费者和两个生产者,成功。

    这里就和线程池有点像了,不过我不打算在这篇讲线程池,下一篇再详谈。

    利用RAII来对锁进行优化

    RAII,学过智能指针的同学应该知道是啥,如果你不懂,看这篇:【C++】智能指针

    这里写一个类,专门用来管理锁资源:

    class LockGuard
    {
    public:
        LockGuard(pthread_mutex_t* pmtx)
            :_pmtx(pmtx)
        {
            pthread_mutex_lock(_pmtx);
        }
    
        ~LockGuard()
        {
            pthread_mutex_unlock(_pmtx);
        }
    
    public:
        pthread_mutex_t* _pmtx;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    再加锁的时候就不需要调用pthread库中的函数了,直接定义一个局部的对象就行,定义时自定调用构造,就会进行加锁,析构就会调用解锁。

    void PushData(const T& data)
    {
        // 直接让对象来管理锁
        LockGuard LG(&_mtx); // 构造加锁
        
        /* 上了锁之后先判断临界资源是
        否准备就绪,也就是队列是否满了*/
        while(IsFull()) pthread_cond_wait(&_full, &_mtx);
    
        // 到此处就说明队列不满,就可以push数据了
        _q.push(data);
    
        // 发送消费者消费的信号
        pthread_cond_signal(&_empty);
        
    }	// 析构解锁
    
    void PopData(T& data)
    {
        // 直接让对象来管理锁
        LockGuard LG(&_mtx);// 构造加锁
    
        /* 上锁后,先判断临界资源是
        否准备就绪,也就是队列是否为空*/
        while(IsEmpty()) pthread_cond_wait(&_empty, &_mtx);
    
        // 到此处说明队列不为空,就可以pop了
        data = _q.front(); // 先拿数据再pop
        _q.pop();
    
        // pop完了发送让生产者生产的信号
        pthread_cond_signal(&_full);
        
    }	// 析构解锁
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    运行就是:
    在这里插入图片描述

    和前面没啥区别。不过RAII的思想放到这里非常的妙。
    这里也就是RALL风格的加锁。

    这篇就讲到这,下一篇细说信号量等知识。

    到此结束。。。

  • 相关阅读:
    STM32在STM32CubeIDE平台下的RT-Thread Nano移植
    Vue3+TypeScript学习
    Ubuntu上Jenkins自动化部署Gitee上VUE项目
    Java设计模式之解释器模式
    shell 提醒工具
    对象关系数据库中的SQL
    javaweb足球资讯网站
    再下一城 | “GBASE数据库中华行—上海站”圆满落幕
    【C++ 程序】函数积分(使用 std::function)
    百度测开初面面试题分享
  • 原文地址:https://blog.csdn.net/m0_62782700/article/details/133280314