• c++线程


    pthread(部分内容来自菜鸟教程)

    创建线程

    创建一个 POSIX 线程:

    #include 
    pthread_create (thread, attr, start_routine, arg) 
    
    • 1
    • 2

    pthread_create 创建一个新的线程,并让它可执行。

    参数:

    • thread :指向线程标识符指针
    • attr :一个不透明的属性对象,可以被用来设置线程属性。可以指定线程属性对象,也可以使用默认值 NULL。
    • start_routine :线程运行函数起始地址,一旦线程被创建就会执行。
    • arg :运行函数的参数。它必须通过把引用作为指针强制转换为 void 类型进行传递。如果没有传递参数,则使用 NULL。

    终止线程

    #include 
    pthread_exit (status) 
    
    • 1
    • 2

    向线程传递参数

    在线程回调中传递任意的数据类型,因为它指向 void。

    使用 pthread_create() 函数创建了 5 个线程,并接收传入的参数。每个线程打印一个 “Hello Runoob!” 消息,并输出接收的参数,然后调用 pthread_exit() 终止线程。

    #include 
    #include 
    #include 
     
    using namespace std;
     
    #define NUM_THREADS     5
     
    struct thread_data{
       int  thread_id;
       char *message;
    };
     
    void *PrintHello(void *threadarg)
    {
       struct thread_data *my_data;
     
       my_data = (struct thread_data *) threadarg;
     
       cout << "Thread ID : " << my_data->thread_id ;
       cout << " Message : " << my_data->message << endl;
     
       pthread_exit(NULL);
    }
     
    int main ()
    {
       pthread_t threads[NUM_THREADS];
       struct thread_data td[NUM_THREADS];
       int rc;
       int i;
     
       for( i=0; i < NUM_THREADS; i++ ){
          cout <<"main() : creating thread, " << i << endl;
          td[i].thread_id = i;
          td[i].message = (char*)"This is message";
          rc = pthread_create(&threads[i], NULL,
                              PrintHello, (void *)&td[i]);
          if (rc){
             cout << "Error:unable to create thread," << rc << endl;
             exit(-1);
          }
       }
       pthread_exit(NULL);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    连接和分离线程

    pthread_join (threadid, status) 
    pthread_detach (threadid) 
    
    • 1
    • 2

    pthread_join() 子程序阻碍调用程序,直到指定的 threadid 线程终止为止。当创建一个线程时,它的某个属性会定义它是否是可连接的(joinable)或可分离的(detached)。只有创建时定义为可连接的线程才可以被连接。如果线程创建时被定义为可分离的,则它永远也不能被连接。

    #include 
    #include 
    #include 
    #include 
     
    using namespace std;
     
    #define NUM_THREADS     5
     
    void *wait(void *t)
    {
       int i;
       long tid;
     
       tid = (long)t;
     
       sleep(1);
       cout << "Sleeping in thread " << endl;
       cout << "Thread with id : " << tid << "  ...exiting " << endl;
       pthread_exit(NULL);
    }
     
    int main ()
    {
       int rc;
       int i;
       pthread_t threads[NUM_THREADS];
       pthread_attr_t attr;
       void *status;
     
       // 初始化并设置线程为可连接的(joinable)
       pthread_attr_init(&attr);
       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
     
       for( i=0; i < NUM_THREADS; i++ ){
          cout << "main() : creating thread, " << i << endl;
          rc = pthread_create(&threads[i], NULL, wait, (void *)&i );
          if (rc){
             cout << "Error:unable to create thread," << rc << endl;
             exit(-1);
          }
       }
     
       // 删除属性,并等待其他线程
       pthread_attr_destroy(&attr);
       for( i=0; i < NUM_THREADS; i++ ){
          rc = pthread_join(threads[i], &status);
          if (rc){
             cout << "Error:unable to join," << rc << endl;
             exit(-1);
          }
          cout << "Main: completed thread id :" << i ;
          cout << "  exiting with status :" << status << endl;
       }
     
       cout << "Main: program exiting." << endl;
       pthread_exit(NULL);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    同步实现–锁

    lock有很多实现的方式,linux这里使用的是mutex,由于mutex本身是结构体,所以声名的时候别忘了init一下。

    #include "pthread.h"
    
    pthread_mutex_t mutex; //这里要保证是global var,
    pthread_mutex_init(&mutex, NULL); //init mutex,
    
    pthread_mutex_lock(&mutex); //准备进入临界区
    
    // critical section //
    
    pthread_mutex_unlock(&mutex); //释放锁,别的线程可能要使用
    pthread_mutex_destroy(&mutex); // 销毁锁,锁不能在用了
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Condition vatiables(cv)

    cv的含义其实有点偏向于MPI的思想,传递消息;可以让threads wait,也可以notify or broadcast线程让他们起来干活。主要的操作有一下三种:

    phtread_cond_init();
    
    pthread_cond_wait(&theCV,&somelock); //这里sleep线程的同时也将somelock释放掉了,要不其他线程无法取得lock就没办法执行(甚至是叫醒它了)
    pthread_cond_signal(&theCV); 
    pthread_cond_boardcast(&theCV);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在临界区中处理写share量的时候,需要保证不同线程对其的访问是可控的,否则可能在不同线程中读取到的写share量不一致进而影响CV的工作,因为一般情况下临界区中的写share量就是我们CV工作中的重要判断量。因此,虽然这个条件相对严格,但是是有必要的。

    semaphore

    这个东西可以看作是lock的一个自然延申。也就是一个资源可以同时被多少执行单元使用。我们之前讲到的lock就可以看做是一个binary semaphore。这里就只是简要的谈谈,因为这个东西使用的时候很让人头大,弄不好就会死锁。而且虽然semaphore属于POSIX标准,但是严格来讲的话,它不属于pthread。

    #include 
    
    sem_t sem;
    sem_init(&sem);
    sem_wait(&sem);
    
    // critical section
    
    sem_post(&sem);
    sem_destroy(&sem);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    semaphore可以控制同一个时间有多少thread可以访问临界区,需要注意的就是上面声明–释放的匹配关系不要忘记。

    std::thread

    创建线程

    通过 detach() 函数,将子线程和主线分离,子线程可以独立继续运行,即使主线程结束,子线程也不会结束。

    #include 
    #include 
    using namespace std::literals::chrono_literals;
    using namespace std;
    
    void test() {
        cout << "Hello World" << endl;
    }
    
    int main() {
        std::thread t1(test);
        t1.detach();
        this_thread::sleep_for(10ms); // c++ 17
        // 低于C++17使用这行代码  this_thread::sleep_for(chrono::milliseconds(10));
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    传递参数

    join() 函数可以在当前线程等待线程运行结束。

    #include 
    #include 
    using namespace std::literals::chrono_literals;
    using namespace std;
    
    void test(int Id,string name) {
        cout << "Hello World" << endl;
            cout << "Id : " <
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    线程休眠

    using namespace std::literals::chrono_literals;
    // 让当前线程睡眠 10 毫秒
    this_thread::sleep_for(10ms);
    // 低于C++17使用这行代码  this_thread::sleep_for(chrono::milliseconds(10));
    // 让当前线程睡眠 5 秒
    this_thread::sleep_for(5s);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    condition_variable

    使用 condition_variable 实现生产者和消费者的实验,通过 wait 进入线程等待,知道有其它的线程把当前线程唤醒。

    当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程。

    当前线程调用 wait() 后将被阻塞(调用wait()前先获取线程锁std::unique_lockstd::mutex lock(g_mutex))同时会自动调用 unlock() 释放锁(g_mutex),直到另外某个线程调用 notify_*(g_con.notify_one()、g_con.notify_all()) 唤醒了当前线程。唤醒当前线程时wait() 函数也是自动调用 lock()(g_mutex:加锁,进入临界区),使得 g_mutex 的状态和 wait 函数被调用时相同。

    在线程被阻塞时,该函数会自动调用 lck.unlock() 释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lck.lock(),使得 lck 的状态和 wait 函数被调用时相同。

    #include 
    #include 
    #include 
    using namespace std::literals::chrono_literals;
    using namespace std;
    
    std::mutex g_mutex; // 线程锁
    condition_variable g_con; // cv
    
    list products;
    
    void test() {
        int product_id = 0;
        while (true) {
            products.push_back(++product_id);
            cout << "products 生产: " << product_id << endl;
            std::unique_lock lock(g_mutex);  // 进入临界区
            // 通知消费者消费
            g_con.notify_one(); 						// 唤醒 g_con.wait(lock)
            // g_con.notify_all(); // 唤醒所有线程(多个线程同时使用 g_con g_mutex)
            lock.unlock(); 								// 提前释放锁,供其他线程进入临界区
            if (product_id > 50) {
                break;
            }
            this_thread::sleep_for(2ms);
        }
    }
    
    int main() {
        std::thread t1(test);
        while (true) {
            std::unique_lock lock(g_mutex); // 离开代码作用域后自动解锁
            if (products.empty()) {
                cout << "没有产品,等待" << endl;
                // 进入等待,知道有新产品
                g_con.wait(lock);  // 释放锁 线程阻塞;唤醒时,自动加锁,与调用wait函数之前一行
                
       			if(!products.empty()){
                    int product_id = products.front();
                    products.pop_front();
                    cout << "消费产品 " << product_id << endl;
                    this_thread::sleep_for(2ms);
                    if (product_id > 50) break;                
                }
            }
        }
        t1.join();
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    std::unique_lock 出作用域后会自动解锁

    thread_local

    C++11中提供了thread_local,thread_local定义的变量在每个线程都保存一份副本,而且互不干扰,在线程退出的时候自动销毁。

    #include 
    #include 
    using namespace std::literals::chrono_literals;
    using namespace std;
    thread_local int t_l_counter = 0;
    
    void test() {
        cout << "flag1 t_l_counter: " << t_l_counter << endl;
        t_l_counter = 2;
    }
    
    int main() {
        t_l_counter = 1;
        std::thread t1(test);
        t1.join();
        cout << "flag2 t_l_counter: " << t_l_counter << endl;
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    同步锁

    如果需要同一时间只有一个线程在test函数中执行代码,那么就要加锁,lock() 用于加锁,而unlock() 解锁。

    #include 
    #include 
    using namespace std::literals::chrono_literals;
    using namespace std;
    std::mutex g_mutex;
    
    void test() {
        g_mutex.lock();
        cout << "task start thread ID: " << this_thread::get_id() << endl;
        this_thread::sleep_for(10ms);
        cout << "task end thread ID: " << this_thread::get_id() << endl;
        g_mutex.unlock();
    }
    int main() {
        std::thread t1(test);
        std::thread t2(test);
        std::thread t3(test);
        t1.join();
        t2.join();
        t3.join();
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    除了std::mutex(非递归的互斥量),还有std::timed_mutex(带超时的非递归互斥量),std::recursive_mutex(递归互斥量)、std::recursive_timed_mutex(带超时的递归互斥量)。

    lambda在线程中的使用

    lambda 的语法
    [capture](parameters) mutalble->return-type{statement};
    
    • 1

    编译器会自动生成一个匿名类,该类重载了 () 运算符。

    int id = 0;
    auto f = [id]() mutable {
        cout << "id: " << id << endl;
        ++id;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    capture
    • [] :什么也不捕获
    • [=] : 按值的方式捕获所有变量
    • [&] : 按引用方式捕获所有变量
    • [boo] : 值捕获boo的值
    • [=,&a] : 按值捕获所有局部变量,按引用捕获变量a
    • [=,&a,&b,&c] : 同上
    • [&,a] : 按引用捕获所有局部变量,按值捕获方式捕获a
    • [&,a,b,c] : 同上
    • [this] : 在成员函数中,直接捕获this指针
    mutable

    值捕获后,在匿名函数中对该值是不能做修改的,如果想要做修改,必须加上 mutable 关键字,并且在匿名函数中做的修改结果在函数外是不会生效的。

    parameters

    参数列表也是可以将外部的值传递给匿名函数内部的;与正常函数的形参一样。

    return-type

    对于编译器能自动推导的返回类型,可以省略 return-type,但是如果无法推导的类型,就必须添加上返回类型

    当函数不止一个return语句时,就需要加上返回类型了

    线程中使用lambda
    #include 
    #include 
    using namespace std;
    int main() {
        std::thread t1([]{
            cout << "task start thread ID: " << this_thread::get_id() << endl;
        }};
        t1.join();
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    获取返回值

    future
    #include 
    #include 
    
    void func(std::promise && p) {
        p.set_value(1);
    }
    
    std::promise p;
    auto f = p.get_future();
    std::thread t(&func, std::move(p));
    t.join(); // 必须使用join,等待子线程结束
    int i = f.get();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    std::async

    线程和 future 的更高级别包装器

    #include 
    #include 
    int func() { return 1; }
    std::future ret = std::async(&func);
    int i = ret.get();
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    PS笔记2_钢笔工具的形状和路径
    python多任务、面向对象、命令行启动动态绑定端口号静态web服务器代码实现
    Visual Studio 2022 编译新版 Mission Planner 地面站
    高颜值测试报告- XTestRunner
    安泰:精密电流源电路原理及应用
    初次选购云服务器宽带如何选择?3M够吗?
    PostgreSQL 的时间差DATEDIFF
    【云原生之Docker实战】使用Docker部署个人CMS点播平台
    简单的卷积神经网络编程,卷积神经网络算法代码
    冯诺依曼结构体系
  • 原文地址:https://blog.csdn.net/qq_27953479/article/details/132673742