• C++多线程学习09 条件变量


    一、条件变量

    以上一节中的生产者消费者模型为例,其消费者是这样的:

    void XMsgServer::Main()
    {
        while (!is_exit())
        {
            sleep_for(10ms);
            unique_lock<mutex> lock(mux_);
            if (msgs_.empty())
                continue;
            while (!msgs_.empty())
            {
                //消息处理业务逻辑
                cout << "recv : " << msgs_.front() << endl;
                msgs_.pop_front();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这样的生产者消费者模型存在的问题:
    若是让消费者不停地while去等待生产者生产会占用CPU资源,若是每次循环后加上sleep则可能会错过生产者刚好完成生产的时间点,因此无法从时间上精细控制沉睡时间,因此需要有一种方式,能够在生产者没生产好时不占cpu,一直阻塞在那里直到生产好,因此我们需要引入条件变量。
    条件变量定义:

    condition_variable cv;
    
    • 1

    条件变量与信号量的区别:
    信号量是有值的,每次v操作后资源值会+1,每次P前先检查资源数量,资源数量>=0时才往后执行
    条件变量是没有值的,仅实现了排队等待的功能

    条件变量的两种操作:
    1、wait函数,wait()可依次拆分为三个操作:释放互斥锁、等待在条件变量上、再次获取互斥锁:
    (1)wait(unique_lock &lck)

    当前线程的执行会被阻塞,直到收到 notify 为止。

    (2)wait(unique_lock &lck,Predicate pred)

    当前线程仅在pred=false时阻塞;如果pred=true时,不阻塞。

    2、notify_one:
    notify_one():没有参数、没有返回值,用于给wait()发信号

    二、写线程

    01.准备好条件变量
    02.用互斥量锁住
    03.生产完毕后解锁
    04.条件变量发送信号

    void ThreadWrite()
    {
        for (int i = 0;;i++)
        {
            stringstream ss;
            ss << "Write msg " << i;
            unique_lock<mutex> lock(mux);
            msgs_.push_back(ss.str());
            lock.unlock();
            cv.notify_one(); //发送信号
            //cv.notify_all();
            this_thread::sleep_for(3s);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    一般在进程退出时才cv.notify_all()通知所有线程退出

    三、读线程

    01 获得与改变共享变量线程共同的mute
    02条件变量cv通过wait() 等待信号通知,在收到通知前一直保持阻塞状态,不会占资源也不会有时延
    进入wait()后先解锁,再去尝试获取信号,
    wait()的两种表达:

    void wait(unique_lock<mutex>& _Lck) { // wait for signal
    // Nothing to do to comply with LWG‐2135 because std::mutex lock/unlock are
    nothrow
    _Check_C_return(_Cnd_wait(_Mycnd(), _Lck.mutex()>_Mymtx()));
    }
    template <class _Predicate>
    void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // wait for signal and test
    predicate
    while (!_Pred())
    { wait(_Lck);
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    没有使用lamda表达式:当收到信号通知时会立刻把锁锁住,之后运行下面的代码就是线程安全的
    使用lamda表达式:当收到信号后表达式返回true时运行下面的代码,返回false继续阻塞(这里的话就是当队列非空时运行下面的代码,空的时候继续阻塞),然后让其他消费者去判断
    03获取信号后锁定并开始消费

    void ThreadRead(int i)
    {
        for (;;)
        {
            cout << "read msg" << endl;
            unique_lock<mutex> lock(mux);
            //cv.wait(lock);//解锁、阻塞等待信号
            cv.wait(lock, [i] 
                {
                    cout << i << " wait" << endl;
                    return !msgs_.empty(); 
                });
            //获取信号后锁定
            while (!msgs_.empty())
            {
                cout << i << " read " << msgs_.front() << endl;
                msgs_.pop_front();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    三、main

    生产者线程每隔三秒生成往队列中放入一个数据,并用条件变量发送信号,然后将生产者线程detach,不用去管理其线程资源
    生成三个消费者线程,每个线程也做detach(),

    int main(int argc, char* argv[])
    {
        thread th(ThreadWrite);
        th.detach();
        for (int i = 0; i < 3; i++)
        {
            thread th(ThreadRead, i + 1);
            th.detach();
        }
        getchar();
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    四、将msgserve改为条件变量版本

    产生消息了之后可以立即相应,不像以前还要在while里不停判断当前队列是否非空:
    以前的msgserver:

    void XMsgServer::Main()
    {
        while (!is_exit())
        {
            sleep_for(10ms);
            unique_lock<mutex> lock(mux_);
            if (msgs_.empty())
                continue;
            while (!msgs_.empty())
            {
                //消息处理业务逻辑
                cout << "recv : " << msgs_.front() << endl;
                msgs_.pop_front();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    条件变量版本:不要忘了this指针哦,当线程退出时也要通知wait该退出了

    void XMsgServer::Main()
    {
        while (!is_exit())
        {
            //sleep_for(10ms);
            unique_lock<mutex> lock(mux_);
            cv_.wait(lock, [this] 
                {
                    cout << "wait cv" << endl;
                    return !msgs_.empty(); 
                });
            while (!msgs_.empty())
            {
                //消息处理业务逻辑
                cout << "recv : " << msgs_.front() << endl;
                msgs_.pop_front();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    给消费者发消息的也需要改:先解锁,再给一个消费者发通知

    void XMsgServer::SendMsg(std::string msg)
    {
        unique_lock<mutex> lock(mux_);
        msgs_.push_back(msg);
        lock.unlock();
        cv_.notify_one();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    消费者线程的停止也需要修改,此时要给所有的消费者线程发信号(虽然本例中只有一个消费者进程):

    void XMsgServer::Stop()
    {
        is_exit_ = true;
        cv_.notify_all();
        Wait();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述
    可以发现最后卡在cv表达式的lamda表达式的cout中,这是因此最后一次进入CV的wait后,队列非空,!msgs_.empty(); 返回false,将阻塞在这。因此当线程退出is_exit=TRUE时,将不再进行!msgs_.empty(); 的判断,而是直接返回true执行cv表达式后的代码,直到线程结束:

    void XMsgServer::Main()
    {
        while (!is_exit())
        {
            //sleep_for(10ms);
            unique_lock<mutex> lock(mux_);
            cv_.wait(lock, [this] 
                {
                    cout << "wait cv" << endl;
                    if (is_exit())return true;
                    return !msgs_.empty(); 
                });
            while (!msgs_.empty())
            {
                //消息处理业务逻辑
                cout << "recv : " << msgs_.front() << endl;
                msgs_.pop_front();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    正常退出了:
    在这里插入图片描述

  • 相关阅读:
    Java通过反射注解赋值
    基于docker和cri-dockerd部署kubernetes v1.25.3
    C/C++ Linux 用户态协议栈的实现
    详解Python的pyyaml模块
    Linux操作系统——http协议(一)
    基于Yolov8的工业端面小目标计数检测(1)
    QSqlTableModel使用简介
    sqlserver刷新全部视图
    【Rust日报】2022-11-05 Slint语言的新变化
    在 Clojure 中,如何实现高效的并发编程以处理大规模数据处理任务?
  • 原文地址:https://blog.csdn.net/qq_42567607/article/details/126060425