• 从语言层面了解线程(std::thread)使用的里里外外


    从语言层面了解线程(std::thread)使用的里里外外

    使用 C++标准库提供的 std::thread,配合 std::mutex、std::condition_variable 等工具可以很方便地编写并发程序。C++标准库针对不同的操作系统中并发编程相关的系统调用做了很好的封装,提供了非常简洁易用的接口,使得我们可以快速方便地进行并发编程,开发出可移植性好的程序。并发编程是每个程序员成长的必经之路,而迈入并发编程的第一步,应该是用好操作系统或者是编程语言提供的并发工具,熟练使用这些基本工具后,然后在实战中不断成长。

    本文将主要从语言层面总结 C++ 标准库中 thread 的用法,不深究操作系统层级的线程的概念,从一个较高的抽象角度理解 thread 中的 joinable、unjoinable,以及与 thread 配合使用的 promise,future等概念,然后总结如何正确使用它们。

    理解 thread 的 joinable 和 unjoinable

    在开始本话题之前,先进行以下几点说明:

    • 本文用 线程 指代操作系统层级 软件线程 的概念,想较于 软件线程 而言的是 硬件线程
      硬件线程 是真实执行计算的线程,现代计算机体系结构为每个CPU核心提供一个或者多个 硬件线程
      软件线程 是操作系统管理的在 硬件线程 上执行的线程。
    • 本文用 thread 表示 C++ 语言层面对 软件线程 概念的抽象,也特指C++标准库中的 std::thread

    从下面的一段程序开始,理解 thread 的 joinable 和 unjoinable 状态,也可以把它看成 thread 的一种属性。

    void func()
    {
        std::cout << "func start..." << std::endl;
    
        // 一些耗时操作,这里用 sleep_for 模拟耗时操作
        std::this_thread::sleep_for(std::chrono::seconds(3));
    
        std::cout << "func end..." << std::endl;
    }
    
    int main()
    {
        std::cout << "main start..." << std::endl;
    
        std::thread t1(func);
        // 执行一些耗时操作
        std::this_thread::sleep_for(std::chrono::seconds(1));
    
        std::cout << "main end..." << std::endl;
    }
    
    /*
    运行结果:
    main start...
    func start...
    main end...
    terminate called without an active exception
    Aborted
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    简单解释一下上面的程序,以及程序产生的结果:

    • main 函数(主线程)先执行第13行代码,然后创建了一个子线程(第15行)。
    • 主线程和子线程并行(并发)执行,主线程执行一些耗时操作;子线程执行第3行进行输出,然后执行一些耗时操作。
    • 主线程先比子线程执行完毕;线程执行结束后需要释放占用的相关系统资源,因此主线程需要销毁其创建的临时变量,对于上述程序,需要销毁 thread 对象 t1。先调用 t1 的析构函数,在 thread 的析构函数中,会先调用 thread 的成员函数 joinable() 检查 thread 对象是 joinable 还是 unjoinable,若 thread 对象是 joinable,则调用 std::terminate()
      调用 std::terminate() 会导致程序非正常终止,并报 terminate called without an active exception 的提示信息。而对于上面的程序,第15行代码创建 thread 对象 t1 后,t1 处于 joinable 状态,因此主线程先结束后销毁 t1 对象,导致程序异常终止。

    通过上面的一段代码,我们引出了 thread 的 joinable 和 unjoinable 概念。joinable 和 unjoinable 表示的是 thread 的一种状态。如何理解呢?

    A thread object is joinable if it represents a thread of execution.

    joinable 和 unjoinable 指的是主线程和子线程之间的某种状态关系! 当 thread 表示一个可执行的软件线程,且没有和主线程脱离关系(被主线程 detach 掉),则表示该 thread 是 joinable;反之,该 thread 是 unjoinable。

    (主线程 detach 子线程是怎么个事?请先保持该困惑,但不要受其影响,下文会解释)

    接下来看看 thread 对象在什么情况下处于 joinable 状态,什么情况下处于 unjoinable 状态。

    在以下任何一种情况下,thread 对象是 unjoinable:

    • thread 对象是默认构造的,即初始化 thread 对象时没有传入可调用对象作为第一个参数。
      因为默认构造没有指定软件线程要调用的实体(传入的可调用对象参数),不符合 joinable 的含义,因此该情况下是 unjoinable。
    • thread 对象已经被移动(用于构造另一个 thread 对象。thread 对象只能移动构造和移动赋值,不能拷贝构造和拷贝赋值)。
      这点也很好理解,被移动后,当前 thread 对象就不再具有 thread 对象所管理的 线程实体 的拥有权,因此该情况下是 unjoinable。
    • thread 对象调用 join() 或者 detach() 函数后。
      • thread 对象调用 detach() 函数后,该 thread 对象就会变为 unjoinable。这点也很好理解,因为主线程和子线程分离了。
      • 如何理解 thread 对象调用 join() 函数后,该 thread 对象变为 unjoinable?
        因为调用 join 方法后,调用线程会被该线程对象阻塞,直到该 thread 对象所管理的软件线程被操作系统调度并执行完毕。而线程执行完毕,就不再是可执行的,因此为 unjoinable。

    thread 对象要么是 joinable,要么是 unjoinable,因此除上述 unjoinable 的情况外,thread 对象都是 joinable。

    好像啰里啰唆用了一大段来解释 thread 对象的 joinable 和 unjoinable,为什么要费老大劲来解释呢?答案藏在最开始的那段代码示例中。当thread 对象是 joinable 时,其析构函数会调用 std::terminate(),导致程序终止。我们可不想写的并发程序那么脆弱,因此在并发编程中避免 thread 对象在调用析构函数时是 joinable 变得非常重要。

    为什么 thread 的析构函数的行为要这样设计呢? 《Effective Modern C++》的 Item 37 给出很好的解释,建议认真看看。

    主线程 detach 子线程

    thead 对象在调用析构函数时若是 joinable,会导致程序终止。为了避免这种情况的发生,可以在创建 thread 对象后 thread 对象析构前,调用 join() 函数,使 thread 对象变为 unjoinable,而调用 join() 函数会使调用线程阻塞,直到被调用线程运行结束。在并发编程中,各个线程之间的执行流程是不可控的,即各个线程何时执行完成是不可预期的,需要通过信号量等机制来通信线程间的运行状态,这里不对此情况进行讨论。

    另一种方式是调用 detach() 函数,使主线程(调用线程)和子线程(被调用线程)进行分离,这样两个线程之间的运行互不干扰。(注意:这里说的互补干扰并不是指线程之间就不存在了干扰,指的是在没有共享变量、互斥量、信号量等互斥同步操作的情况下,主线程和子线程分离后,两者处于独立的运行流程,主线程先结束不会影响子线程)。

    看下面一个示例,主线程指定的函数对象 func 运行结束了,其分离的子线程的写日志操作(writeLog 函数)任然能够完成。

    void writeLog(const char* filename)
    {
        std::ofstream outputFile(filename);
    
        if (!outputFile.is_open()) {
            std::cerr << "无法打开文件" << std::endl;
            return;
        }
    
        outputFile << "input first line.\n";
        std::this_thread::sleep_for(std::chrono::seconds(20));
        outputFile << "input second line.\n";
        std::this_thread::sleep_for(std::chrono::seconds(30));
        outputFile << "input third line.\n";
    }
    
    void func()
    {
        std::cout << "start func()" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(15));
    
        std::thread t(writeLog, "log.txt");
    
        std::this_thread::sleep_for(std::chrono::seconds(15));
        t.detach();
        std::this_thread::sleep_for(std::chrono::seconds(10));
    
        std::cout << "end func()" << std::endl;
    }
    
    int main()
    {
        std::cout << "start main" << std::endl;
    
        std::thread t(func);
        t.detach();
        std::this_thread::sleep_for(std::chrono::seconds(120));
    
        std::cout << "end main" << std::endl;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    需要注意的是,主线程 detach 子线程后,主线程和子线程应该避免之间存在局部变量的相互依赖。若主线程 detach 子线程后,主线程先结束退出,子线程仍在运行,且子线程捕获了主线程中的局部变量,则可能产生未定义的行为,如下示例所示:

    void func()
    {
        std::vector<int> nums{1, 2, 3, 4, 5};
        
        std::thread t([&nums](){
            for (auto num : nums) {
                std::cout << num << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(2));
            }
        });
        t.detach();
    
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }
    
    int main()
    {
        func();
        std::this_thread::sleep_for(std::chrono::seconds(20));
    }
    
    /*
    一种可能的运行结果:
    1
    2
    29003792
    0
    5
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    小结:创建 thread 对象后调用 detach() 函数,主线程不用关心子线程何时结束,但是需要注意,不要在分离的子线程中捕获主线程中的局部变量,这可能导致未定义的行为。

    使用future和promise在线程之间通信

    线程之间通信的方式有:互斥量、条件变量、信号量、屏障、消息队列、共享内存。本小节不会去介绍这些“传统”的线程通信方法,而是介绍现代C++标准库提供的两个线程之间通信的利器:future 和 promise,主要从语言层面来学习如何使用这两个工具进行并发编程,而不过多的去讨论这两者的底层实现。

    可以把 promise 和 future 理解为一个管道(channel)的两端,promise 相当于通道的 发送方,future 相当于通道的接收方。如下图所示。

    在这里插入图片描述

    (该图截至《Effective Modern C++》Item38)

    从语言层面抽象来看,promise 和 future 是连接不同线程(thread)之间的管道,为不同线程之间的通信(数据传输)提供便利。接下来用一小段白话来解释上图所阐述的 promise 和 future 之间的关系,试图理解C++大师们为了简化并发编程的开发难度而设计这两个利器的良苦用心。

    如上图所示,promise 是数据的发送方,future 是数据的接收方,Shared State 是用来”承载“数据的,也即我们抽象理解为的管道。一种简单的理解方法为,不同线程之间的通信需要不同的管道,通信即传递数据,我们用变量来保存数据,即管道为变量的一种抽象。那上图中的 Shared State 就好理解了,是用来承载数据的变量的一种抽象。下文同一将 Shared State 表述为管道。

    Shared State 作为连接 promise 和 future 之间的载体,理解了上图中的 Shared State,promise 和 future 之间的关系就很好理解了。把 promise 理解为管道的创建者,得先有 promise 在有 future,即 promise 和 future 是一种强绑定的关系。promise 和 future 通过 ”管道“ 这一抽象进行绑定,如何理解这点呢?你可以这样理解,promise 类在初始化一个对象时,会先创建 ”管道“ 这一数据结构(暂不去剖析其底层实现),然后创建 future 对象,二者通过 ”管道“ 连接起来。

    解释下来还是有点饶,在凝练一点,promise 和 future 的设计其实就是对使用共享内存进行线程通信的一种高度抽象。(个人理解,还没看过源码,有待证实,但其本质就是通过抽象来屏蔽底层的实现细节,简化并发编程的难度)

    那继续往下思考,这种抽象带来了多大的好处,其中蕴含了怎样的设计思想?我们也可以自己手动的管理共享变量来实现线程之间的通信,这种手动的方式会带来哪些不便?也许学会了使用 promise 和 future 在线程之间通信,这些疑惑自然就烟消云散了。

    接下来先从整体上看看如何使用 promise 和 future 在线程之间通信,然后在各自讨论 promise 和 future 在实际编码中的一些注意要点。

    在具体实现上,由调用方(通常指主线程)创建一个 promise 对象(promise 和 future 都是类模板),然后将该 promise 对象作为 thread 对象(通常指子线程)的参数传入,然后子线程就可以调用 promise 的 set_value 成员函数向这个管道中发送数据,当管道中有数据后,主线程就可以调用 promise 所“绑定”的 future 对象的 get 成员函数来获取管道中的数据了。

    先来看下面一段代码示例:

    #include 
    #include 
    #include 
    
    int main()
    {
        std::promise<int> prom;
        auto fu = prom.get_future();
    
        std::thread caller([&prom]{
            prom.set_value(10);
        });
    
        std::cout << fu.get() << std::endl;
    
        caller.join();
    }
    
    /*
    运行结果:
    10
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    简单解释一下:

    • 第7、8行代码,主线程(main函数)创建了 promise 对象 prom 以管道中数据的接收方 future 对象 fu。promise 是一个类模板,需要指定发送的数据类型。
    • 第10行代码,主线程创建了一个子线程,然后将管道的发送权力给子线程(lambda表达式的引用捕获),子线程 caller 往管道中发送数据 10,调用 promise 的 set_value 成员函数来实现。
    • 第14行代码,主线程通过调用 future 的 get 成员函数来获取管道中的数据。

    如上示例所示,使用 promise 和 future 在线程之间通信很简单,两个类提供的成员函数也很少,掌握如何正确使用两者难度不大,难点在理解其背后的设计思想以及其底层是如何实现的。promise 和 future 的底层实现是一个非常值得探讨的问题,当然这超出了本文的讨论方位。

    好了,接下来继续讨论 promise 和 future 在使用上的一些细节。

    promise

    文档中对 promise 的接口描述的很详细,这里不再赘述,本文强调以下几点:

    1. promise 既可以往管道中传数据也可以传异常。
    2. promise 不能拷贝,只能移动。
    3. promise 类的 get_future 成员函数只能调用一次,再次调用会抛出 future_already_retrieved 异常。
    4. promise 类的 set_value 和 set_exception 成员函数只能调用一次,再次调用会抛出 std::future_error: Promise already satisfied 异常。

    下面依次解释一下以上几点。

    1. promise 既可以往管道中传数据也可以传异常。
      promise 对象通过调用 set_value() 函数来往管道内发送数据,并在传入成功后设置管道可用。若使用 thread 创建的线程在执行中有异常抛出且没有被捕获,则会导致整个程序终止。promise 类提供了 set_exception 成员函数将在当前线程中捕获的异常通过管道发送给调用线程,调用线程只需要和获取管道中的数据一样调用 get 成员函数就可以获取被调用线程的异常信息,然后使用 try catch 语句块进行捕获即可。如下代码示例所示:
    #include 
    #include 
    #include 
    
    int main()
    {
        std::promise<int> p;
        auto f = p.get_future();    // f 的类型为 std::future
    
        std::thread t([&p] {
            try {
                // 模拟在线程中抛出了一个异常
                throw std::runtime_error("Example");
            }
            catch (...) {   // 对线程中可能出现的异常进行捕获
                
                try {
                    // 通过 set_exception 函数将异常发送至管道
                    p.set_exception(std::current_exception());
                }
                catch (...) {
                    // set_exception 可能还会再抛出异常
                }
            }
        });
    
        try {   // 使用 future 对象的 get 成员函数获取管道中的数据
                // 管道中存储的也可能是异常信息
            std::cout << f.get() << std::endl;
        }
        catch (const std::exception& e) {
            std::cout << "Exception from the thread: " << e.what() << std::endl;
        }
        t.join();
    }
    
    /*
    运行结果:
    Exception from the thread: Example
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    1. promise 不能拷贝,只能移动。
      这点其实很好理解。promise 用来创建两个线程之间通信的管道,因此在两个线程之间,该通信管道的所有权应该是唯一的,即 promise 应该只能被移动而不能被拷贝。

    这里看一个示例,这段代码和上面的代码几乎相同,只不过是把 lambda 表达式写成了一个单独的函数。这里需要注意的是第2行代码中的引用传参和第13行代码中 std::ref 的用法。这里涉及了移动语义的知识,就不过多叙述了。

    // promise 不能值传递
    void func(std::promise<int> &p)
    {
        // 省略相关步骤
    }
    
    int main()
    {
        std::promise<int> p;
        auto f = p.get_future();    // f 的类型为 std::future
    
    	// 需要调用 std::ref(p) 进行包装
        std::thread t(func, std::ref(p));
    
    	// 省略相关步骤
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里补充一点,promise 创建并管理了线程之间的通信 “管道”,即你可以理解为 promise 类对象内部有一个指针指向了该 ”管道“ 对象,且 promise 对象内部的这个指针是由 shared_ptr 来管理。

    1. promise 类的 get_future 成员函数只能调用一次,再次调用会抛出 future_already_retrieved 异常。
      这是因为 get_future 返回一个 future 对象,future 对象是不可拷贝,只能移动的。而你也可以继续理解为 future 对象内部也会有一个指向该 ”管道“ 对象的指针。而 future 作为管道中数据的接收方,其所有权应该也需要是唯一的。

    2. promise 类的 set_value 和 set_exception 成员函数只能调用一次,再次调用会抛出 std::future_error: Promise already satisfied 异常。
      这点我暂时还没能理解,为什么要这样设计。(哭,等弄明白了再来更新)

    future

    future 的使用和 promise 一样简单,详细可以看文档。下面强调几点:

    1. future 是不可拷贝的,只能移动。
    2. future 的 get 成员函数,可能导致阻塞。
    3. future 的 get 成员函数只能调用一次,再次调用会抛出 std::future_error: Promise already satisfied 异常。

    下面依次解释一下以上几点。

    1. future 是不可拷贝的,只能移动。
      promise 中第三点注意事项进行了解释。

    2. future 的 get 成员函数,可能导致阻塞。
      这里需要补充一点,promise 和 future 之间的这个管道是一种“有状态”的管道,一种“会阻塞”的管道。只有当调用 promise 的 set_xxx (set_value、set_exception 时调用后管道可读,set_value_at_thread_exit、set_exception_at_thread_exit)函数后,管道才处于“可读”的状态,否则管道属于“不可读”的状态。当调用 future 的 get 函数时,调用线程会阻塞在该处,直到管道处于“可读”的状态。

    3. future 的 get 成员函数只能调用一次,再次调用会抛出 std::future_error: Promise already satisfied 异常。
      promise 和 future 之间的通信是一次性的,即只可往管道只能有一次数据的往来。(不太明白为什么这样设计,感觉这样的限制不能发挥 promise 和 future 在线程之间通信的最大功力。)

    总结

    • 本文主要从语言层面介绍了C++标准库中 thread 的使用。thread 是对底层操作系统提供的多线程接口的高级封装,使得我们可以很简单的使用它进行并发编程,当你需要使用底层操作系统的原始接口进行打交道时,可以使用 std::thread::native_handle
    • 标准库中还提供了许多其他的并发编程工具,例如,async、packaged_task,多用多查自然孰能生巧。
    • 并发编程离不开大量的实战练习,可以自己构思一些并发应用的场景并实现。

    参考链接

    https://blog.csdn.net/Dontla/article/details/127724101
    https://stackoverflow.com/questions/7381757/c-terminate-called-without-an-active-exception

  • 相关阅读:
    PHP刷leetcode第一弹: 两数之和
    为了拿捏后端打工人都要会的 Redis 数据结构,我画了 20 张图
    three.js学习之vR展厅
    乐鑫发布 Arduino ESP32 v3.0.0
    jenkins 从机连接主机显示 404 Not Found
    Revit API: Pipe & Duct -管道和风管
    LLM 构建Data Muti-Agents 赋能数据分析平台的实践之①:数据采集
    齐聚绿城 | 锦江都城酒店聚焦中高端酒店投资新方向
    国家开放大学 考试试题训练
    大模型开发06:LangChain 概述
  • 原文地址:https://blog.csdn.net/weixin_42655901/article/details/134009103