• 从一道面试题开始学习C++标准库提供的并发编程工具


    一个空列表,用两个函数(只可调用一次)轮流写入值(一个写奇数,一个写偶数), 最终实现列表的值为1-100,有序排列。

    简单分析:假设这两个函数分别为A和B,A函数往列表中写奇数,B函数往列表中写偶数。因为要求交替写,若A先写,则在B写一个偶数之前需要等待A先把上一个奇数写完,B写完一个偶数之后需要通知A,A写完一个奇数之后要通知B,这就存在同步关系了,自然就想到了使用条件变量。而两个函数只可调用一次,那自然想到了使用线程,让两个函数独立运行,并使用条件变量来同步写操作。

    来看看使用标准库提供的并发API如何实现上述功能,代码示例如下:

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    std::mutex mtx;
    std::condition_variable cv;
    const int NUM = 100;
    int current_tid = 0;    // 通过id来控制线程之间的同步顺序
    
    std::vector<int> nums(NUM);
    
    // 通过参数 tid 来标识线程
    void work_odd(int tid) {
        for (int i = 1; i <= NUM; i++) {
    	    std::unique_lock<std::mutex> locker(mtx);
            cv.wait(locker, [=](){ return current_tid == tid; });
    
            if (i % 2 == 1) {
                nums[i - 1] = i;
            }
    
            current_tid = (current_tid + 1) % 2;
            cv.notify_one();      // 唤醒阻塞在条件变量上的一个线程
        }
    }
    
    
    void work_even(int tid) {
        for (int i = 1; i <= NUM; i++) {
    	    std::unique_lock<std::mutex> locker(mtx);
            cv.wait(locker, [=](){ return current_tid == tid; });
    
            if (i % 2 == 0) {
                nums[i - 1] = i;
            }
    
            current_tid = (current_tid + 1) % 2;
            cv.notify_one();
        }
    }
    
    
    int main()
    {
        std::thread t1(work_odd, 0);
        std::thread t2(work_even, 1);
    
        t1.join();
        t2.join();
    
        std::for_each(nums.begin(), nums.end(), [](auto e){ std::cout << e << ' '; });
        std::cout << std::endl;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    以上面的代码为例,先来快速上手一下,在标准库中,如何使用 thread 开启一个新的线程,如何使用互斥量 mutex 来互斥的访问临界区,以及如何使用条件变量 condition_variable 来实现线程之间的同步。

    std::thread

    thread 的声明如下所示,第一个参数为一个可调用对象,第二参数表示一个可变参数。

    template <class Fn, class... Args>
    explicit thread (Fn&& fn, Args&&... args);
    
    • 1
    • 2

    按照如上函数声明初始化一个 thread 对象后,即开启了一个新的线程。在使用 thread 创建线程进行并发编程时,需要注意以下几点:

    1. 在开启一个新的线程后,需要在恰当的位置调用 join 或 detach。调用 join 函数会使 调用线程 阻塞,直至被调用线程运行结束。调用 detach 函数会使调用线程和被调用线程分离。
    2. thread 对象不能显示地传递返回值给 调用线程,可以间接通过 promise 和 future 来实现。
    3. 当使用 thread 进行并发编程时,若线程执行过程中有异常产生,会直接终止程序。因此在使用 thread 进行并发编程时,需要在被调用线程中进行异常处理。

    这里对上述注意事项中的第三点进行一个补充,代码示例如下:

    void func()
    {
        std::cout << "start func" << std::endl;
        // 运行过程中有异常产生,没有进行捕获
        throw std::runtime_error("runtime error");
        std::cout << "end func" << std::endl;
    }
    
    int main()
    {
        std::cout << "start main" << std::endl;
    
    	// 尝试捕获异常,但是无效!
        try {
            std::thread t1(func);
            t1.join();     // 这样写是不对的,《Effective Modern C++》Item35 和 Item37 有解释
        } catch(const std::exception& e) {
            std::cout << e.what() << std::endl;
        }
        std::cout << "end main" << std::endl;
    }
    
    /*
    运行结果为:
    start main
    start func
    terminate called after throwing an instance of 'std::runtime_error'
      what():  runtime error
    Aborted
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    thread 的其他 API 使用方法,文档中已有详细介绍,这里不再赘述。对于上述列的三点注意事项,展开说来又是一篇文章了,可以参考这篇文章 从语言层面了解线程(std::thread)使用的里里外外

    std::mutex

    使用 thread 开启一个新的线程非常简单,一行代码就搞定。接下来介绍互斥量 (mutex) 的基本使用。

    在C++标准库中,提供了好几种互斥量类型,mutex、recursive_mutex、timed_mutex、recursive_timed_mutex,C++14增加了shared_timed_mutex,C++17增加了shared_mutex。本文只介绍 mutex 的基本使用。

    mutex 是一种排他的互斥量,在并发环境中,进入临界区前先对互斥量进行加锁操作,临界区访问结束后对互斥量进行解锁操作。mutex 的使用也很简单,如下代码示例所示:

    std::mutex mtx;   // 创建了一个互斥量
    
    // 进入临界区前先加锁,若加锁失败(当前线程之前,已有其他线程加锁),当前线程会被阻塞在该处
    mtx.lock(); 
    // 临界区
    // ......
    // 临界区
    mtx.unlock();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    如上示例所示,使用C++标准库提供的 mutex 非常方便。但是上述形式的用法可能存在以下两个问题,在并发编程中要尽量避免。

    1. 上述第8行的 mtx.unlock() 漏写,导致互斥量没被解锁,产生死锁现象。
    2. 临界区内有异常发生且未被正确捕获,则产生异常处之后的代码不会被执行,即 mtx.unlock() 不会被执行,产生死锁。

    为避免上述两种的情况,C++标准库提供了非常方便的 mutex 管理类,lock_guard 和 unique_lock(基于C++11),C++14增加了shared_lock,C++17增加了scoped_lock。本文只介绍 unique_lock,若要全面介绍这四种 mutex 管理类及其使用场景,又是另一篇文章了。

    使用基于 unique_lock 解决使用原始 mutex 可能产生的两个问题,代码示例如下:

    std::mutex mtx;   // 创建了一个互斥量
    
    // 使用花括号限定 unique_lock 的作用域
    {
    	std::unique_lock<std::mutex> locker(mtx);
    	// 临界区
    	// ......
    	// 临界区
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    unique_lock 类定义等价于如下代码:

    class unique_lock {
    public:
    	explicit unique_lock(std::mutex& m):mtx(m) {
    		mtx.lock();
    	} 
    
    	unique_lock(const unique_lock&) = delete;
    
    	~unique_lock() {
    		mtx.unlock();
    	}
    	
    private:
    	std::mutex& mtx;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    因此,使用 unique_lock 来管理 mutex 是一种资源获取即初始化(Resource Acquisition Is Initialization;RAII)的思想。

    std::condition_variable

    在多线程环境中,线程的执行过程在某个时间段内可能存在先后关系,比如B线程运行到某个时刻点时,需要等待A线程的某个特定事件发生后才能继续往下执行,这种关系又称为同步。解决这种线程通信的问题的一种方案为 条件变量。

    在C++标准库中,条件变量 std::condition_variable 的使用和 thread、mutex 一样简单,C++标准库提供了非常简洁的接口。接下来先来看看条件变量的基本用法长什么样,然后结合上述的面试题,来尝试总结如何使用条件变量解决线程间的同步关系。

    条件变量的基本用法如下所示:

    std::condition_variable cv;         //事件的条件变量
    std::mutex mtx;                       //配合cv使用的mutex
    
    // 关键代码部分
    {
    	std::unique_lock<std::mutex> locker(mtx);
    	cv.wait(mtx, [](){ /* 等待事件是否发生的条件判断 */ });
    
    	// 对事件进行反应,执行相关操作。此时 mtx 已经上锁
    	// ...
    
    
    	// 可选的操作,通知一个或所有等待该事件的线程
    	// cv.notify_one();
    	// cv.notify_all();
    }  // 退出该作用域,unique_lock执行析构函数,调用mtx.unlock()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    以上述的面试题为例,看看 std::condition_variable 如何使用。简化的代码示例如下:

    std::condition_variable cv; //事件的条件变量
    std::mutex m; //配合cv使用的mutex
    
    // 用来控制事件变化的变量
    int current_tid = 0;  
    
    void func(int tid)
    {
    	std::unique_lock<std::mutex> locker(mtx);
    	cv.wait(locker, [=](){ return current_tid == tid; });
    
    	// ...
    	// 相关操作
    	// ...
    
    	current_tid = (current_tid + 1) % 2;   // 改变条件
    	cv.notify_one();      // 唤醒阻塞在条件变量上的一个线程
    }
    
    int main()
    {
    	std::thread t1(func, 0);
    	std::thread t2(func, 1);
    
    	// 省略一些代码...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    解释一下上述代码:

    • 若执行 func 函数的线程被阻塞,则有可能有两种情况:
      1. 进入函数体,刚执行第9行语句时,mutex 因被其他线程先调用 mtx.lock() 而被阻塞;
      2. 进入函数体后,std::unique_lock locker(mtx); 语句将 mtx 锁住之后,调用 cv.wait() 语句;因为cv.wait() 语句的第二个参数返回 false (在上面示例中,等价于 current_tid != tid),cv.wait() 该语句将当前线程阻塞,在阻塞前会调用 mtx.unlock() 释放互斥锁;然后当前被阻塞,等待其他线程调用 cv.notify_one() 或 cv.notify_all() 将该线程唤醒。
    • 若执行 func 函数的线程没被阻塞:
      线程顺利获取到 mutex ,然后调用 cv.wait() 语句,第二个参数返回 ture,逻辑流程继续往下执行,然后执行相关操作,然后改变 current_tid (控制事件变化的变量)值,然后调用 cv.notify_one() 唤醒阻塞在该条件变量上的线程。

    小结:
    使用条件变量控制线程之间的同步关系时,关键在如何将事件变化的关系抽象出来,用一个合适的变量(数据结构)来表示该事件的状态,通过改变变量的值(事件的状态)来控制线程之间的同步关系。

  • 相关阅读:
    Django--ORM 多表查询
    SPI 实验
    git PR合并提交(rebase方式)
    机器学习中的数学基础(一)
    Solana流支付协议Zebec完成850万美元融资,CircleVentures等参投
    hive基于新浪微博的日志数据分析——项目及源码
    100 # mongoose 的使用
    虚拟列表方案实现
    JAVA 实现《推箱子升级版》游戏
    扩展卡尔曼滤波EKF
  • 原文地址:https://blog.csdn.net/weixin_42655901/article/details/133936660