• C++ mutex 与 condition_variable


    导语:整理以下多线程的互斥与同步。

    mutex 使用起来很简单,需要处理一段临界区代码时,只要调用 lock 或 try_lock 即可。lock 和 unlock 是没有返回值的,try_lock 的返回值是一个 bool,它用于尝试锁定互斥,成功返回 true,失败返回 false。

    1. #include
    2. #include
    3. #include
    4. #include
    5. std::chrono::milliseconds interval(100);
    6. std::mutex mutex;
    7. int job_shared = 0; // 两个线程都能修改 'job_shared',mutex 将保护此变量
    8. int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
    9. // 此线程能修改 'job_shared' 和 'job_exclusive'
    10. void job_1() {
    11. std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
    12. while (true) {
    13. // 尝试锁定 mutex 以修改 'job_shared'
    14. if (mutex.try_lock()) {
    15. std::cout << "job shared (" << job_shared << ")\n";
    16. mutex.unlock();
    17. return;
    18. } else {
    19. // 不能获取锁以修改 'job_shared'
    20. // 但有其他工作可做
    21. ++job_exclusive;
    22. std::cout << "job exclusive (" << job_exclusive << ")\n";
    23. std::this_thread::sleep_for(interval);
    24. }
    25. }
    26. }
    27. // 此线程只能修改 'job_shared'
    28. void job_2() {
    29. mutex.lock();
    30. std::this_thread::sleep_for(5 * interval);
    31. ++job_shared;
    32. mutex.unlock();
    33. }
    34. int main() {
    35. std::thread thread_1(job_1);
    36. std::thread thread_2(job_2);
    37. thread_1.join();
    38. thread_2.join();
    39. return 0;
    40. }
    41. 可能的输出结果:
    42. job exclusive (1)
    43. job exclusive (2)
    44. job exclusive (3)
    45. job exclusive (4)
    46. job shared (1)

    job_2 会先持有锁,所以 job_1 内的 while 循环会先递增 job_exclusive 的值,直到 job_2 处理完后释放锁,才会打印 job_shared 的值。

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. std::mutex m;
    8. std::condition_variable cv;
    9. std::string data;
    10. bool ready = false;
    11. bool processed = false;
    12. void worker_thread() {
    13. // 等待直至 main() 发送数据
    14. std::unique_lock lk(m);
    15. cv.wait(lk, []{return ready;});
    16. // 等待后,我们占有锁。
    17. std::cout << "Worker thread is processing data\n";
    18. data += " after processing";
    19. // 发送数据回 main()
    20. processed = true;
    21. std::cout << "Worker thread signals data processing completed\n";
    22. // 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one )
    23. lk.unlock();
    24. cv.notify_one();
    25. }
    26. int main() {
    27. std::thread worker(worker_thread);
    28. std::cout << "wait for a moment" << std::endl;
    29. std::this_thread::sleep_for(std::chrono::seconds(1));
    30. data = "Example data";
    31. // 发送 ready 信号到 worker 线程
    32. {
    33. std::lock_guard lk(m);
    34. ready = true;
    35. std::cout << "main() signals data ready for processing\n";
    36. }
    37. cv.notify_one();
    38. // 等候 worker
    39. {
    40. std::unique_lock lk(m);
    41. cv.wait(lk, []{return processed;});
    42. }
    43. std::cout << "Back in main(), data = " << data << '\n';
    44. worker.join();
    45. return 0;
    46. }

    在 worker_thread 内获取了互斥锁,但是真正的执行要等到 main 函数中触发 notify_one 才真正开始。

    1. #include
    2. #include
    3. #include
    4. #include
    5. std::condition_variable cv;
    6. std::mutex cv_m;
    7. int i = 0;
    8. bool done = false;
    9. void waits() {
    10. std::unique_lock lk(cv_m);
    11. std::cout << "Waiting... \n";
    12. cv.wait(lk, []{return i == 1;});
    13. std::cout << "...finished waiting. i == 1\n";
    14. done = true;
    15. }
    16. void signals() {
    17. std::this_thread::sleep_for(std::chrono::seconds(1));
    18. std::cout << "Notifying falsely...\n";
    19. cv.notify_one(); // 等待线程被通知 i == 0.
    20. // cv.wait 唤醒,检查 i ,再回到等待
    21. std::unique_lock lk(cv_m);
    22. i = 1;
    23. while (!done)
    24. {
    25. std::cout << "Notifying true change...\n";
    26. lk.unlock();
    27. cv.notify_one(); // 等待线程被通知 i == 1 , cv.wait 返回
    28. std::this_thread::sleep_for(std::chrono::seconds(1));
    29. lk.lock();
    30. }
    31. }
    32. int main()
    33. {
    34. std::thread t1(waits);
    35. std::this_thread::sleep_for(std::chrono::milliseconds(100));
    36. std::thread t2(signals);
    37. t1.join();
    38. t2.join();
    39. return 0;
    40. }

     

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. using namespace std::chrono_literals;
    7. std::condition_variable cv;
    8. std::mutex cv_m;
    9. int i;
    10. void waits(int idx) {
    11. std::unique_lock lk(cv_m);
    12. if(cv.wait_for(lk, idx*100ms, []{return i == 1;}))
    13. std::cerr << "Thread " << idx << " finished waiting. i == " << i << '\n';
    14. else
    15. std::cerr << "Thread " << idx << " timed out. i == " << i << '\n';
    16. }
    17. void signals() {
    18. std::this_thread::sleep_for(120ms);
    19. std::cerr << "Notifying...\n";
    20. cv.notify_all();
    21. std::this_thread::sleep_for(100ms);
    22. {
    23. std::lock_guard lk(cv_m);
    24. i = 1;
    25. }
    26. std::cerr << "Notifying again...\n";
    27. cv.notify_all();
    28. }
    29. int main() {
    30. std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
    31. t1.join(); t2.join(), t3.join(), t4.join();
    32. }
    33. 输出:
    34. Thread 1 timed out. i == 0
    35. Notifying...
    36. Thread 2 timed out. i == 0
    37. Notifying again...
    38. Thread 3 finished waiting. i == 1

    std::lock_gaurd 和 std::unique_lock 的区别

    lock_guard:没有提供加锁和解锁的接口。通过构造函数和析构函数控制锁的作用范围,创造对象的时候加锁,离开作用域的时候解锁;

    unique_lock:提供了 lock() 和 unlock() 接口,可以通过构造函数和析构函数控制锁的作用范围。在构造函数中延时加锁,可以在需要的时候手动加锁和解锁。在析构的时候,会根据当前状态来决定是否要进行解锁(lock_guard 就一定会解锁)。

    1. #include
    2. #include
    3. #include
    4. struct Box {
    5. explicit Box(int num) : num_things{num} {}
    6. int num_things;
    7. std::mutex m;
    8. };
    9. void transfer(Box &from, Box &to, int num) {
    10. // don't actually take the locks yet
    11. std::unique_lock lock1{from.m, std::defer_lock};
    12. std::unique_lock lock2{to.m, std::defer_lock};
    13. // lock both unique_locks without deadlock
    14. std::lock(lock1, lock2);
    15. from.num_things -= num;
    16. to.num_things += num;
    17. // 'from.m' and 'to.m' mutexes unlocked in 'unique_lock' dtors
    18. }
    19. int main() {
    20. Box acc1{100};
    21. Box acc2{50};
    22. std::thread t1{transfer, std::ref(acc1), std::ref(acc2), 10};
    23. std::thread t2{transfer, std::ref(acc2), std::ref(acc1), 5};
    24. t1.join();
    25. t2.join();
    26. std::cout << "acc1: " << acc1.num_things << "\\n"
    27. "acc2: " << acc2.num_things << '\\n';
    28. }
    1. std::mutex mu;
    2. void Test() {
    3. int b = 1;
    4. std::unique_lock lock(mu, std::defer_lock);
    5. lock.lock();
    6. b++;
    7. lock.unlock();
    8. cout << "b: " << b << endl;
    9. }

    使用 unique_lock 时,设置 std::defer_lock 才能手动加锁,没有设置则不能。延迟加锁的设置不会影响解锁,不管有没有设置都可以手动解锁。而 lock_guard 则没有加锁和解锁的接口。 

    std::shared_mutex

    shared_mutex 是 C++ 的原生读写锁实现,有共享和独占两种锁模式,适用于并发高的读场景下,通过 reader 之前共享锁来提升性能。在 C++17 之前,只能自己通过独占锁和条件变量自己实现读写锁或使用 C++14 加入的性能较差的 std::shared_timed_mutex。以下是通过 shared_mutex 实现的线程安全计数器:

    1. #include
    2. #include // 对于 std::unique_lock
    3. #include
    4. #include
    5. class ThreadSafeCounter {
    6. public:
    7. ThreadSafeCounter() = default;
    8. // 多个线程/读者能同时读计数器的值。
    9. unsigned int get() const {
    10. std::shared_lock lock(mutex_);
    11. return value_;
    12. }
    13. // 只有一个线程/写者能增加/写线程的值。
    14. void increment() {
    15. std::unique_lock lock(mutex_);
    16. value_++;
    17. // 可以在程序结尾处手动解锁
    18. }
    19. // 只有一个线程/写者能重置/写线程的值。
    20. void reset() {
    21. std::unique_lock lock(mutex_);
    22. value_ = 0;
    23. }
    24. private:
    25. mutable std::shared_mutex mutex_;
    26. unsigned int value_ = 0;
    27. };
    28. int main() {
    29. ThreadSafeCounter counter;
    30. std::mutex mutex_;
    31. auto increment_and_print = [&counter, &mutex_]() {
    32. for (int i = 0; i < 3; i++) {
    33. counter.increment();
    34. std::lock_guard lock(mutex_);
    35. std::cout << std::this_thread::get_id() << ' ' << counter.get() << '\\n';
    36. // 注意:写入 std::cout 实际上也要由另一互斥同步。省略它以保持示例简洁。
    37. }
    38. };
    39. std::thread thread1(increment_and_print);
    40. std::thread thread2(increment_and_print);
    41. thread1.join();
    42. thread2.join();
    43. }

  • 相关阅读:
    把握出租车行驶的数据脉搏 :出租车轨迹数据给你答案!
    ESP32学习记录 PICO DK 踩坑记录
    D-Bus:busctl的使用
    python字符串相关
    第28讲:多表查询之子查询概念以及典型案例
    [附源码]计算机毕业设计点餐系统
    asp.net mvc实现系统登录及验证功能
    4 H3C网络设备模拟器
    年薪高达50W的测开,到底是做什么的?
    【Matlab】蒙特卡罗法模拟圆周率+对应解析的GIF生成【超详细的注释和解释】
  • 原文地址:https://blog.csdn.net/qq_38289815/article/details/126681933