目录
方法2:提供不抛出异常的拷贝构造函数,或不抛出异常的移动构造函数
3.2.6 运用std::unique_lock<>灵活加锁
C++标准中也定义了数据竞争这个术语:并发的去修改一个独立对象(参见5.1.2节),数据竞争是未定义行为的起因。
最简单的办法就是对数据结构采用某种保护机制,确保只有修改线程才能看到不变量的中间状态。从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。
另一个选择是对数据结构和不变量进行修改,修改完的结构必须能完成一系列不可分割的变化,也就保证了每个不变量的状态,这就是所谓的无锁编程。
另一种处理条件竞争的方式,是使用事务的方式去处理数据结构的更新(这里的"处理"就如同对数据库进行更新一样)。所需的一些数据和读取都存储在事务日志中,然后将之前的操作进行合并,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”(software transactional memory (STM))
访问共享数据前,将数据锁住,在访问结束后,再将数据解锁。线程库需要保证,当线程使用互斥量锁住共享数据时,其他的线程都必须等到之前那个线程对数据进行解锁后,才能进行访问数据。
互斥量是C++保护数据最通用的机制,但也需要编排代码来保护数据的正确性(见3.2.2节),并避免接口间的条件竞争(见3.2.3节)也非常重要。不过,互斥量也会造成死锁(见3.2.4节),或对数据保护的太多(或太少)(见3.2.8节)。
3.2.1 在C++中使用互斥
C++标准库为互斥量提供了RAII模板类std::lock_guard
,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。
- #include
- #include
- #include
-
- std::list<int> some_list; // 1
- std::mutex some_mutex; // 2
-
- void add_to_list(int new_value)
- {
- std::lock_guard
guard(some_mutex) ; // 3 - some_list.push_back(new_value);
- }
-
- bool list_contains(int value_to_find)
- {
- std::lock_guard
guard(some_mutex) ; // 4 - return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
- }
C++17中添加了一个新特性,称为模板类参数推导,类似std::lock_guard
这样简单的模板类型,其模板参数列表可以省略。③和④的代码可以简化成:
std::lock_guard guard(some_mutex);
具体的模板参数类型推导则交给C++17的编译器完成。3.2.4节中,会介绍C++17中的一种加强版数据保护机制——std::scoped_lock
,所以在C++17的环境下,上面的这行代码也可以写成:
std::scoped_lock guard(some_mutex);
代码3.2 无意中传递了保护数据的引用:
- class some_data
- {
- int a;
- std::string b;
- public:
- void do_something();
- };
-
- class data_wrapper
- {
- private:
- some_data data;
- std::mutex m;
- public:
- template<typename Function>
- void process_data(Function func)
- {
- std::lock_guard
l(m) ; - func(data); // 1 传递“保护”数据给用户函数
- }
- };
-
- some_data* unprotected;
-
- void malicious_function(some_data& protected_data)
- {
- unprotected=&protected_data;
- }
-
- data_wrapper x;
- void foo()
- {
- x.process_data(malicious_function); // 2 传递一个恶意函数
- unprotected->do_something(); // 3 在无保护的情况下访问保护数据
- }
这段代码的问题在于根本没有保护,只是将所有可访问的数据结构代码标记为互斥。函数foo()
中调用unprotected->do_something()
的代码未能被标记为互斥。这种情况下,C++无法提供任何帮助,只能由开发者使用正确的互斥锁来保护数据。从乐观的角度上看,还是有方法的:切勿将受保护数据的指针或引用传递到互斥锁作用域之外。
为了能让线程安全地删除一个节点,需要确保防止对这三个节点(待删除的节点及其前后相邻的节点)的并发访问。如果只对指向每个节点的指针进行访问保护,那就和没有使用互斥量一样,条件竞争仍会发生——除了指针,整个数据结构和整个删除操作需要保护。这种情况下最简单的解决方案就是使用互斥量来保护整个链表。
代码3.3 std::stack
容器的实现:
- template<typename T,typename Container=std::deque
> - class stack
- {
- public:
- explicit stack(const Container&);
- explicit stack(Container&& = Container());
- template <class Alloc> explicit stack(const Alloc&);
- template <class Alloc> stack(const Container&, const Alloc&);
- template <class Alloc> stack(Container&&, const Alloc&);
- template <class Alloc> stack(stack&&, const Alloc&);
-
- bool empty() const;
- size_t size() const;
- T& top();
- T const& top() const;
- void push(T const&);
- void push(T&&);
- void pop();
- void swap(stack&&);
- template <class... Args> void emplace(Args&&... args); // C++14的新特性
- };
非共享的栈对象,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:
- stack<int> s;
- if (! s.empty()){ // 1
- int const value = s.top(); // 2
- s.pop(); // 3
- do_something(value);
- }
不仅在单线程代码中安全,而且在空堆栈上调用top()是未定义的行为也符合预期。对于共享的栈对象,这样的调用顺序就不再安全,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
第一个选项是将变量的引用作为参数,传入pop()函数中获取“弹出值”:
- std::vector<int> result;
- some_stack.pop(result);
这种方式还不错,缺点也很明显:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。对于其他的类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
对于有返回值的pop()函数来说,只有“异常安全”方面的担忧(当返回值时可以抛出一个异常)。很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对“右值引用”的支持(详见附录A,A.1节),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,也不会抛出异常。一个有用的选项可以限制对线程安全栈的使用,并且能让栈安全的返回所需的值,而不抛出异常。
虽然安全,但非可靠。尽管能在编译时可使用std::is_nothrow_copy_constructible
和std::is_nothrow_move_constructible
,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。用户自定义的类型中,会有不抛出异常的拷贝构造函数或移动构造函数的类型, 那些有抛出异常的拷贝构造函数,但没有移动构造函数的类型往往更多。如果这些类型不能存储在线程安全的栈中,那将是多么的不幸。
第三个选择是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常,这样就能避免Cargill提到的异常问题了。缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。对于这个方案,使用std::shared_ptr
是个不错的选择,不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,就不需要new和delete操作。这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。
对于通用的代码来说,灵活性不应忽视。当已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择最合适,最经济的方案。
代码3.5 线程安全的栈容器类:
- #include
- #include
- #include
- #include
-
- struct empty_stack: std::exception
- {
- const char* what() const throw() {
- return "empty stack!";
- };
- };
-
- template<typename T>
- class threadsafe_stack
- {
- private:
- std::stack
data; - mutable std::mutex m;
-
- public:
- threadsafe_stack()
- : data(std::stack
()){} -
- threadsafe_stack(const threadsafe_stack& other)
- {
- std::lock_guard
lock(other.m) ; - data = other.data; // 1 在构造函数体中的执行拷贝
- }
-
- threadsafe_stack& operator=(const threadsafe_stack&) = delete;
-
- void push(T new_value)
- {
- std::lock_guard
lock(m) ; - data.push(new_value);
- }
-
- std::shared_ptr
pop() - {
- std::lock_guard
lock(m) ; - if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空
-
- std::shared_ptr
const res(std::make_shared(data.top())) ; // 在修改堆栈前,分配出返回值 - data.pop();
- return res;
- }
-
- void pop(T& value)
- {
- std::lock_guard
lock(m) ; - if(data.empty()) throw empty_stack();
-
- value=data.top();
- data.pop();
- }
-
- bool empty() const
- {
- std::lock_guard
lock(m) ; - return data.empty();
- }
- };
一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。因为他们都在等待对方释放互斥量,没有线程能工作。这种情况就是死锁,它的问题就是由两个或两个以上的互斥量进行锁定。
避免死锁的一般建议,就是让两个互斥量以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。不过,当有多个互斥量保护同一个类的独立实例时,一个操作对同一个类的两个不同实例进行数据的交换操作,为了保证数据交换操作的正确性,就要避免并发修改数据,并确保每个实例上的互斥量都能锁住自己要保护的区域。不过,选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!
std::lock
——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。下面的程序代码中,就来看一下怎么在一个简单的交换操作中使用std::lock
。
代码3.6 交换操作中使用std::lock()
和std::lock_guard:
- // 这里的std::lock()需要包含
头文件 - class some_big_object;
- void swap(some_big_object& lhs,some_big_object& rhs);
- class X
- {
- private:
- some_big_object some_detail;
- std::mutex m;
- public:
- X(some_big_object const& sd):some_detail(sd){}
-
- friend void swap(X& lhs, X& rhs)
- {
- if(&lhs==&rhs)
- return;
- std::lock(lhs.m,rhs.m); // 1
- std::lock_guard
lock_a(lhs.m,std::adopt_lock) ; // 2 - std::lock_guard
lock_b(rhs.m,std::adopt_lock) ; // 3 - swap(lhs.some_detail,rhs.some_detail);
- }
- };
死锁是多线程编程中令人相当头痛的问题,并且死锁经常是不可预见的,因为在大部分时间里,所有工作都能很好的完成。不过,一些相对简单的规则能帮助写出“无死锁”的代码。
无锁的情况下,仅需要两个线程std::thread
对象互相调用join()就能产生死锁。这种情况下,没有线程可以继续运行,因为他们正在互相等待。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。为了避免死锁,这里意见:不要谦让。
第一个建议往往是最简单的:线程获得一个锁时,就别再去获取第二个。每个线程只持有一个锁,就不会产生死锁。当需要获取多个锁,使用std::lock
来做这件事(对获取锁的操作上锁),避免产生死锁。
第二个建议是次简单的:因为代码是外部提供的,所以没有办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)。当写通用代码时(例如3.2.3中的栈),每一个操作的参数类型,都是外部提供的定义,这就需要其他指导意见来帮助你了。
当硬性要求获取两个或两个以上的锁,并且不能使用std::lock
单独操作来获取它们时,最好在每个线程上,用固定的顺序获取它们(锁)。3.2.4节中提到,当需要获取两个互斥量时,需要以一定的顺序获取锁。
这里提供一种避免死锁的方式,定义遍历的顺序,一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁,不允许反向遍历链表。类似的约定常用于建立其他的数据结构。
虽然,定义锁的顺序是一种特殊情况,但层次锁的意义在于,在运行时会约定是否进行检查。这个建议需要对应用进行分层,并且识别在给定层上所有互斥量。当代码试图对互斥量上锁,而低层已持有该层锁时,不允许锁定。可以通过每个互斥量对应的层数,以及每个线程使用的互斥量,在运行时检查锁定操作是否可以进行。
代码3.7 使用层级防范死锁
- hierarchical_mutex high_level_mutex(10000); // 1
- hierarchical_mutex low_level_mutex(5000); // 2
- hierarchical_mutex other_mutex(6000); // 3
-
- int do_low_level_stuff();
-
- int low_level_func()
- {
- std::lock_guard
lk(low_level_mutex) ; // 4 - return do_low_level_stuff();
- }
-
- void high_level_stuff(int some_param);
-
- void high_level_func()
- {
- std::lock_guard
lk(high_level_mutex) ; // 6 - high_level_stuff(low_level_func()); // 5
- }
-
- void thread_a() // 7
- {
- high_level_func();
- }
-
- void do_other_stuff();
-
- void other_stuff()
- {
- high_level_func(); // 10
- do_other_stuff();
- }
-
- void thread_b() // 8
- {
- std::lock_guard
lk(other_mutex) ; // 9 - other_stuff();
- }
代码3.8 简单的层级互斥:
- class hierarchical_mutex
- {
- std::mutex internal_mutex;
-
- unsigned long const hierarchy_value;
- unsigned long previous_hierarchy_value;
-
- static thread_local unsigned long this_thread_hierarchy_value; // 1
-
- void check_for_hierarchy_violation()
- {
- if(this_thread_hierarchy_value <= hierarchy_value) // 2
- {
- throw std::logic_error(“mutex hierarchy violated”);
- }
- }
-
- void update_hierarchy_value()
- {
- previous_hierarchy_value=this_thread_hierarchy_value; // 3
- this_thread_hierarchy_value=hierarchy_value;
- }
-
- public:
- explicit hierarchical_mutex(unsigned long value):
- hierarchy_value(value),
- previous_hierarchy_value(0)
- {}
-
- void lock()
- {
- check_for_hierarchy_violation();
- internal_mutex.lock(); // 4
- update_hierarchy_value(); // 5
- }
-
- void unlock()
- {
- if(this_thread_hierarchy_value!=hierarchy_value)
- throw std::logic_error(“mutex hierarchy violated”); // 9
- this_thread_hierarchy_value=previous_hierarchy_value; // 6
- internal_mutex.unlock();
- }
-
- bool try_lock()
- {
- check_for_hierarchy_violation();
- if(!internal_mutex.try_lock()) // 7
- return false;
- update_hierarchy_value();
- return true;
- }
- };
- thread_local unsigned long
- hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); // 8
5. 将准则推广到锁操作以外:
死锁不仅仅会发生在锁之间,也会发生在同步构造中(可能会产生一个等待循环),这也需要有指导意见,例如:获取嵌套锁,等待一个持有锁的线程,都是很糟糕的决定(因为线程为了能继续运行可能需要获取对应的锁)。如果去等待一个线程结束,应该确定这个线程的层级,这样一个线程只需要等待比其层级低的线程结束即可。用一个简单的办法便可确定,添加的线程是否在同一函数中启动,如同在3.1.2节和3.3节中描述的那样。
代码已能规避死锁,std::lock()
和std::lock_guard
可组成简单的锁,并覆盖大多数情况,但有时需要更多的灵活性,可以使用标准库提供的std::unique_lock
模板。如 std::lock_guard
,这是一个参数化的互斥量模板类,它提供很多RAII类型锁用来管理std::lock_guard
类型,可以让代码更加灵活。
- class some_big_object;
- void swap(some_big_object& lhs,some_big_object& rhs);
- class X
- {
- private:
- some_big_object some_detail;
- std::mutex m;
- public:
- X(some_big_object const& sd):some_detail(sd){}
- friend void swap(X& lhs, X& rhs)
- {
- if(&lhs==&rhs)
- return;
- std::unique_lock
lock_a(lhs.m,std::defer_lock) ; // 1 - std::unique_lock
lock_b(rhs.m,std::defer_lock) ; // 1 std::defer_lock 留下未上锁的互斥量 - std::lock(lock_a,lock_b); // 2 互斥量在这里上锁
- swap(lhs.some_detail,rhs.some_detail);
- }
- };
std::unique_lock
实例没有与自身相关的互斥量,互斥量的所有权可以通过移动操作,在不同的实例中进行传递。某些情况下,这种转移是自动发生的,例如:当函数返回一个实例。另一种情况下,需要显式的调用std::move()
来执行移动操作。本质上来说,需要依赖于源值是否是左值——一个实际的值或是引用——或一个右值——一个临时类型。当源值是一个右值,为了避免转移所有权过程出错,就必须显式移动成左值。std::unique_lock
是可移动,但不可赋值的类型。
转移有一种用途:准许函数锁定互斥,然后把互斥的归属权转移给函数调用者,好让他在同一个锁的保护下执行其他操作。
- std::unique_lock
get_lock() - {
- extern std::mutex some_mutex;
- std::unique_lock
lk(some_mutex) ; - prepare_data();
- return lk; // 1
- }
- void process_data()
- {
- std::unique_lock
lk(get_lock()) ; // 2 - do_something();
- }
通道(gate way)类是一种利用锁转移的具体形式,锁的角色是其数据成员,用于保证只有正确加锁才能够访问受保护数据,而不再充当函数的返回值。这样,所有数据必须通过通道类访问:若想访问数据,则需先取得通道类的实例(由函数调用返回,如上例中的 get_lock()),再借它执行加锁操作,然后通过通道对象的成员函数才得以访问数据。我们在访问完成后销毁通道对象,锁便随之释放,别的线程遂可以重新访问受保护的数据。这类通道对象几乎是可移动的(只有这样,函数才有可能向外转移归属权),因此锁对象作为其数据成员也必须是可移动的。
一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。
std::unique_lock
在这种情况下工作正常,调用unlock()时,代码不需要再访问共享数据。当再次需要对共享数据进行访问时,再调用lock()就可以了。
- void get_and_process_data()
- {
- std::unique_lock
my_lock(the_mutex) ; - some_class data_to_process=get_next_data_chunk();
- my_lock.unlock(); // 1 不要让锁住的互斥量越过process()函数的调用
- result_type result=process(data_to_process);
- my_lock.lock(); // 2 为了写入数据,对互斥量再次上锁
- write_result(data_to_process,result);
- }
C++标准委员会也认为条件竞争的处理很重要,所以C++标准库提供了std::once_flag
和std::call_once
来处理这种情况。比起锁住互斥量并显式的检查指针,每个线程只需要使用std::call_once
就可以,在std::call_once
的结束时,就能安全的知晓指针已经被其他的线程初始化了。使用std::call_once
比显式使用互斥量消耗的资源更少,特别是当初始化完成后
代码3.12利用std::call_once()函数对类X的数据成员实施线程安全的延迟初始化
- class X
- {
- private:
- connection_info connection_details;
- connection_handle connection;
- std::once_flag connection_init_flag;
-
- void open_connection()
- {
- connection=connection_manager.open(connection_details);
- }
- public:
- X(connection_info const& connection_details_):
- connection_details(connection_details_)
- {}
- void send_data(data_packet const& data) // 1
- {
- std::call_once(connection_init_flag,&X::open_connection,this); // 2
- connection.send_data(data);
- }
- data_packet receive_data() // 3
- {
- std::call_once(connection_init_flag,&X::open_connection,this); // 2
- return connection.receive_data();
- }
- };
值得注意的是,std::mutex
和std::once_flag
的实例不能拷贝和移动,需要通过显式定义相应的成员函数,对这些类成员进行操作。
比起使用std::mutex
实例进行同步,不如使用std::shared_mutex
来做同步。对于更新操作,可以使用std::lock_guard
和std::unique_lock
上锁。作为std::mutex
的替代方案,与std::mutex
所做的一样,这就能保证更新线程的独占访问。那些无需修改数据结构的线程,可以使用std::shared_lock
获取访问权。这种RAII类型模板是在C++14中的新特性,这与使用std::unique_lock
一样,除了多线程可以同时获取同一个std::shared_mutex
的共享锁。唯一的限制:当有线程拥有共享锁时,尝试获取独占锁的线程会被阻塞,直到所有其他线程放弃锁。当任一线程拥有一个独占锁时,其他线程就无法获得共享锁或独占锁,直到第一个线程放弃其拥有的锁。
如同之前描述的那样,下面的代码清单展示了一个简单的DNS缓存,使用std::map
持有缓存数据,使用std::shared_mutex
进行保护。
代码3.13 运用std::shared_mutex保护数据结构
- #include
- #include
- #include
- #include
-
- class dns_entry;
-
- class dns_cache
- {
- std::map
entries; - mutable std::shared_mutex entry_mutex;
- public:
- dns_entry find_entry(std::string const& domain) const
- {
- std::shared_lock
lk(entry_mutex) ; // 1 - std::map
::const_iterator const it= - entries.find(domain);
- return (it==entries.end())?dns_entry():it->second;
- }
- void update_or_add_entry(std::string const& domain,
- dns_entry const& dns_details)
- {
- std::lock_guard
lk(entry_mutex) ; // 2 - entries[domain]=dns_details;
- }
- };
线程对已经获取的std::mutex
(已经上锁)再次上锁是错误的,尝试这样做会导致未定义行为。在某些情况下,一个线程会尝试在释放一个互斥量前多次获取。因此,C++标准库提供了std::recursive_mutex
类。除了可以在同一线程的单个实例上多次上锁,其他功能与std::mutex
相同。其他线程对互斥量上锁前,当前线程必须释放拥有的所有锁,所以如果你调用lock()三次,也必须调用unlock()三次。正确使用std::lock_guard
和std::unique_lock
可以帮你处理这些问题。