导语:整理以下多线程的互斥与同步。

mutex 使用起来很简单,需要处理一段临界区代码时,只要调用 lock 或 try_lock 即可。lock 和 unlock 是没有返回值的,try_lock 的返回值是一个 bool,它用于尝试锁定互斥,成功返回 true,失败返回 false。
- #include
- #include
- #include
- #include
-
- std::chrono::milliseconds interval(100);
-
- std::mutex mutex;
- int job_shared = 0; // 两个线程都能修改 'job_shared',mutex 将保护此变量
- int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
-
- // 此线程能修改 'job_shared' 和 'job_exclusive'
- void job_1() {
- std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
-
- while (true) {
- // 尝试锁定 mutex 以修改 'job_shared'
- if (mutex.try_lock()) {
- std::cout << "job shared (" << job_shared << ")\n";
- mutex.unlock();
- return;
- } else {
- // 不能获取锁以修改 'job_shared'
- // 但有其他工作可做
- ++job_exclusive;
- std::cout << "job exclusive (" << job_exclusive << ")\n";
- std::this_thread::sleep_for(interval);
- }
- }
- }
-
- // 此线程只能修改 'job_shared'
- void job_2() {
- mutex.lock();
- std::this_thread::sleep_for(5 * interval);
- ++job_shared;
- mutex.unlock();
- }
-
- int main() {
- std::thread thread_1(job_1);
- std::thread thread_2(job_2);
-
- thread_1.join();
- thread_2.join();
- return 0;
- }
-
- 可能的输出结果:
- job exclusive (1)
- job exclusive (2)
- job exclusive (3)
- job exclusive (4)
- job shared (1)
job_2 会先持有锁,所以 job_1 内的 while 循环会先递增 job_exclusive 的值,直到 job_2 处理完后释放锁,才会打印 job_shared 的值。

- #include
- #include
- #include
- #include
- #include
- #include
-
- std::mutex m;
- std::condition_variable cv;
- std::string data;
- bool ready = false;
- bool processed = false;
-
- void worker_thread() {
- // 等待直至 main() 发送数据
- std::unique_lock
lk(m) ; - cv.wait(lk, []{return ready;});
-
- // 等待后,我们占有锁。
- std::cout << "Worker thread is processing data\n";
- data += " after processing";
-
- // 发送数据回 main()
- processed = true;
- std::cout << "Worker thread signals data processing completed\n";
-
- // 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one )
- lk.unlock();
- cv.notify_one();
- }
-
- int main() {
- std::thread worker(worker_thread);
- std::cout << "wait for a moment" << std::endl;
- std::this_thread::sleep_for(std::chrono::seconds(1));
-
- data = "Example data";
- // 发送 ready 信号到 worker 线程
- {
- std::lock_guard
lk(m) ; - ready = true;
- std::cout << "main() signals data ready for processing\n";
- }
- cv.notify_one();
-
- // 等候 worker
- {
- std::unique_lock
lk(m) ; - cv.wait(lk, []{return processed;});
- }
- std::cout << "Back in main(), data = " << data << '\n';
-
- worker.join();
- return 0;
- }
在 worker_thread 内获取了互斥锁,但是真正的执行要等到 main 函数中触发 notify_one 才真正开始。

- #include
- #include
- #include
- #include
-
- std::condition_variable cv;
- std::mutex cv_m;
- int i = 0;
- bool done = false;
-
- void waits() {
- std::unique_lock
lk(cv_m) ; - std::cout << "Waiting... \n";
- cv.wait(lk, []{return i == 1;});
- std::cout << "...finished waiting. i == 1\n";
- done = true;
- }
-
- void signals() {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- std::cout << "Notifying falsely...\n";
- cv.notify_one(); // 等待线程被通知 i == 0.
- // cv.wait 唤醒,检查 i ,再回到等待
-
- std::unique_lock
lk(cv_m) ; - i = 1;
- while (!done)
- {
- std::cout << "Notifying true change...\n";
- lk.unlock();
- cv.notify_one(); // 等待线程被通知 i == 1 , cv.wait 返回
- std::this_thread::sleep_for(std::chrono::seconds(1));
- lk.lock();
- }
- }
-
- int main()
- {
- std::thread t1(waits);
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- std::thread t2(signals);
- t1.join();
- t2.join();
- return 0;
- }

- #include
- #include
- #include
- #include
- #include
-
- using namespace std::chrono_literals;
-
- std::condition_variable cv;
- std::mutex cv_m;
- int i;
-
- void waits(int idx) {
- std::unique_lock
lk(cv_m) ; - if(cv.wait_for(lk, idx*100ms, []{return i == 1;}))
- std::cerr << "Thread " << idx << " finished waiting. i == " << i << '\n';
- else
- std::cerr << "Thread " << idx << " timed out. i == " << i << '\n';
- }
-
- void signals() {
- std::this_thread::sleep_for(120ms);
- std::cerr << "Notifying...\n";
- cv.notify_all();
- std::this_thread::sleep_for(100ms);
- {
- std::lock_guard
lk(cv_m) ; - i = 1;
- }
- std::cerr << "Notifying again...\n";
- cv.notify_all();
- }
-
- int main() {
- std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
- t1.join(); t2.join(), t3.join(), t4.join();
- }
-
- 输出:
- Thread 1 timed out. i == 0
- Notifying...
- Thread 2 timed out. i == 0
- Notifying again...
- Thread 3 finished waiting. i == 1
lock_guard:没有提供加锁和解锁的接口。通过构造函数和析构函数控制锁的作用范围,创造对象的时候加锁,离开作用域的时候解锁;
unique_lock:提供了 lock() 和 unlock() 接口,可以通过构造函数和析构函数控制锁的作用范围。在构造函数中延时加锁,可以在需要的时候手动加锁和解锁。在析构的时候,会根据当前状态来决定是否要进行解锁(lock_guard 就一定会解锁)。
- #include
- #include
- #include
-
- struct Box {
- explicit Box(int num) : num_things{num} {}
-
- int num_things;
- std::mutex m;
- };
-
- void transfer(Box &from, Box &to, int num) {
- // don't actually take the locks yet
- std::unique_lock lock1{from.m, std::defer_lock};
- std::unique_lock lock2{to.m, std::defer_lock};
-
- // lock both unique_locks without deadlock
- std::lock(lock1, lock2);
-
- from.num_things -= num;
- to.num_things += num;
-
- // 'from.m' and 'to.m' mutexes unlocked in 'unique_lock' dtors
- }
-
- int main() {
- Box acc1{100};
- Box acc2{50};
-
- std::thread t1{transfer, std::ref(acc1), std::ref(acc2), 10};
- std::thread t2{transfer, std::ref(acc2), std::ref(acc1), 5};
-
- t1.join();
- t2.join();
-
- std::cout << "acc1: " << acc1.num_things << "\\n"
- "acc2: " << acc2.num_things << '\\n';
- }
- std::mutex mu;
- void Test() {
- int b = 1;
- std::unique_lock
lock(mu, std::defer_lock) ; - lock.lock();
- b++;
- lock.unlock();
- cout << "b: " << b << endl;
- }
使用 unique_lock 时,设置 std::defer_lock 才能手动加锁,没有设置则不能。延迟加锁的设置不会影响解锁,不管有没有设置都可以手动解锁。而 lock_guard 则没有加锁和解锁的接口。
shared_mutex 是 C++ 的原生读写锁实现,有共享和独占两种锁模式,适用于并发高的读场景下,通过 reader 之前共享锁来提升性能。在 C++17 之前,只能自己通过独占锁和条件变量自己实现读写锁或使用 C++14 加入的性能较差的 std::shared_timed_mutex。以下是通过 shared_mutex 实现的线程安全计数器:
- #include
- #include
// 对于 std::unique_lock - #include
- #include
-
- class ThreadSafeCounter {
- public:
- ThreadSafeCounter() = default;
-
- // 多个线程/读者能同时读计数器的值。
- unsigned int get() const {
- std::shared_lock
lock(mutex_) ; - return value_;
- }
-
- // 只有一个线程/写者能增加/写线程的值。
- void increment() {
- std::unique_lock
lock(mutex_) ; - value_++;
- // 可以在程序结尾处手动解锁
- }
-
- // 只有一个线程/写者能重置/写线程的值。
- void reset() {
- std::unique_lock
lock(mutex_) ; - value_ = 0;
- }
-
- private:
- mutable std::shared_mutex mutex_;
- unsigned int value_ = 0;
- };
-
- int main() {
- ThreadSafeCounter counter;
- std::mutex mutex_;
- auto increment_and_print = [&counter, &mutex_]() {
- for (int i = 0; i < 3; i++) {
- counter.increment();
- std::lock_guard
lock(mutex_) ; - std::cout << std::this_thread::get_id() << ' ' << counter.get() << '\\n';
-
- // 注意:写入 std::cout 实际上也要由另一互斥同步。省略它以保持示例简洁。
- }
- };
-
- std::thread thread1(increment_and_print);
- std::thread thread2(increment_and_print);
-
- thread1.join();
- thread2.join();
- }