• 2022-08-05 C++并发编程(八)



    前言

    多线程逻辑是乱序的,然而很多时候,我们却需要一定的逻辑顺序,但这种顺序又不简单是单线程的一贯顺序,此时我们需要通过一些手段,来使得多线程变得有序。

    前几篇文章讲述了数据保护,用互斥锁确实可以达到某个线程等待另一个线程的目的,但这不是它的使用场景,我们将引入条件变量和 std::future.


    一、用条件变量进行同步

    条件变量的语义是当条件成立则上锁进行下一步操作,否则等待。由此,我们可以令多线程有逻辑顺序的执行。

    C++的头文件库 有std::condition_variable 和 std::condition_variable_any 两种实现,其区别是前者仅限与互斥 std::mutex 一同使用,高效,更常用。后者可与符合互斥最低标准类型配合,更自由。

    我们用条件变量实现一个简单的 生产–使用 逻辑的程序,

    void dataPreparationThread() 函数负责生产数据,通过加锁 dataQueue 安全的传输数据,并在每产生一个数据后通知 void dataProcessingThread() 函数的条件变量 dataCond。

    后者会取得锁,并停止阻滞,从 dataQueue 进行安全的数据转移。随后处理数据。

    #include 
    #include 
    #include 
    #include 
    #include 
    
    struct dataChunk
    {
        explicit dataChunk(int aa)
            : a(aa)
        {}
    
        void show() const
        {
            std::cout << a;
        }
    
        auto equalNum(int num) const -> bool
        {
            return a == num;
        }
    
      private:
        int a = 0;
    };
    
    std::mutex mut;
    std::queue<dataChunk> dataQueue;
    std::condition_variable dataCond;
    
    auto prepareData() -> dataChunk
    {
        static int dataNum = 0;
        return dataChunk{dataNum++};
    }
    
    auto moreDataToPrepare() -> bool
    {
        static int times = 10;
        return times-- > 0;
    }
    
    void dataPreparationThread()
    {
        while (moreDataToPrepare())
        {
            const dataChunk data = prepareData();
            {
                std::lock_guard<std::mutex> lk(mut);
                dataQueue.push(data);
            }
            dataCond.notify_one();
        }
    }
    
    void process(const dataChunk &data)
    {
        data.show();
    }
    
    auto isLastChunk(const dataChunk &data) -> bool
    {
        return data.equalNum(8);
    }
    
    void dataProcessingThread()
    {
        while (true)
        {
            std::unique_lock<std::mutex> lk(mut);
            dataCond.wait(lk, [] { return !dataQueue.empty(); });
            dataChunk data = dataQueue.front();
            dataQueue.pop();
            lk.unlock();
            process(data);
            if (isLastChunk(data))
            {
                break;
            }
        }
    }
    
    auto main() -> int
    {}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    条件变量的 wait() 函数的等待过程,是持锁,判断条件,条件成立,继续持锁,并向下执行,否则放开锁,继续等待直到条件成立。由于需要持锁,放开锁,所以需要 std::unique_lock 配合。

    此例中wait() 函数的实参是一个锁和一个返回bool值的 lambda 函数,非常方便,当然我们也可以传入更复杂的判断函数。

    另外,由于wait() 函数存在伪唤醒现象,即未经 dataCond.notify_one() 通知就自行持锁,判断条件这一过程,而且根据标准,伪唤醒频率不确定,所以一般不给 wait() 传入含有副作用的判读函数。

    二、利用条件变量构建线程安全的队列

    c++标准库有自己的队列实现,但并非多线程安全,好在改造为线程安全的并不困难。

    首先,禁止复制

    只留下入队,出队,判断是否为空。

    新增等待出队,利用条件变量

    #include 
    #include 
    #include 
    #include 
    
    template <typename T>
    struct threadSafeQueue
    {
        threadSafeQueue() = default;
    
        threadSafeQueue(const threadSafeQueue &rhs)
        {
            std::lock_guard<std::mutex> lk(rhs.mut);
            dataQueue = rhs.dataQueue;
        }
    
        auto operator=(const threadSafeQueue &rhs) -> threadSafeQueue & = delete;
    
        void push(T newVal)
        {
            std::lock_guard<std::mutex> lk(mut);
            dataQueue.push(newVal);
            dataCond.notify_one();
        }
    
        //通过返回 true false 判断赋值是否正确
        auto tryPop(T &val) -> bool
        {
            std::lock_guard<std::mutex> lk(mut);
            if (dataQueue.empty())
            {
                return false;
            }
            val = dataQueue.front();
            dataQueue.pop();
            return true;
        }
    
        //通过返回 nullptr 或有效 shared_ptr 判定是否正确赋值
        auto tryPop() -> std::shared_ptr<T>
        {
            std::lock_guard<std::mutex> lk(mut);
            if (dataQueue.empty())
            {
                //返回 nullptr
                return std::shared_ptr<T>(nullptr);
            }
            std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
            dataQueue.pop();
            return res;
        }
    
        void waitAndPop(T &val)
        {
            std::unique_lock<std::mutex> lk(mut);
            //如果不为空,加锁,执行下面的程序
            //如果为空,等待传来消息,再判断是否为空,为空则继续等
            //不为空则则锁住,继续下面的程序
            //就是没有通知,也会判断是否为空,为空等,不空锁,向下执行
            dataCond.wait(lk, [this] { return !dataQueue.empty(); });
            val = dataQueue.front();
            dataQueue.pop();
        }
    
        auto waitAndPop() -> std::shared_ptr<T>
        {
            std::unique_lock<std::mutex> lk(mut);
            dataCond.wait(lk, [this] { return !dataQueue.empty(); });
            std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
            dataQueue.pop();
            return res;
        }
    
        auto empty() const -> bool
        {
            std::lock_guard<std::mutex> lk(mut);
            return dataQueue.empty();
        }
    
      private:
        //互斥锁必须可变状态,用 mutable 修饰
        mutable std::mutex mut;
        std::queue<T> dataQueue;
        //条件变量
        std::condition_variable dataCond;
    };
    
    auto main() -> int
    {}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    总结

    条件变量的引入,令多线程编程的逻辑顺序可以容易操控,同时要注意条件变量的伪唤醒。

    通过以前文章的锁,本文的条件变量,很容易实现符合多线程逻辑的队列容器,读者可根据自己需要进行改良。

  • 相关阅读:
    Java异常、继承结构、自定义异常、SpringBoot中捕获处理异常
    python第三方库 pip install速度慢的解决办法
    单源最短路径 dijkstra
    shell小技巧分享
    跨境电商如何利用海外代理IP,提高竞争力?
    [Cesium学习]
    Vue中使用Switch开关用来控制商品的上架与下架情况、同时根据数据库商品的状态反应到前台、前台修改商品状态保存到数据库
    文字弹性跳动CSS3代码
    数据接口工程对接BI可视化大屏(五)数据接口发布
    完整的电商平台后端API开发总结
  • 原文地址:https://blog.csdn.net/m0_54206076/article/details/126171007