互斥量(或称锁):用于强制多线程依次访问特定的数据。
C++ 的互斥量实现举例:
void FernEngine::JoinWaitingList(PlayerId player) {
mutex.Acquire(); // 临界区(critical section):开始
waitingList.push_back(player);
// 如果等待的玩家满足条件则开始游戏
if (waitingList.length() >= GAME_SIZE) {
vector<PlayerId> players;
waitingList.swap(players);
StartGame(players);
}
mutex.Release(); // 临界区(critical section):结束
}
互斥量的作用:
Mutex<T>Rust 中受保护的数据保存在 Mutex 内部:
let app = Arc::new(FernEmpireApp { // 创建整个应用,分配在堆上
...
waiting_list: Mutex::new(vec![]),
...
});
Arc 方便跨线程共享数据。Mutex 方便跨线程共享可修改数据。举例:使用互斥量
impl FernEmpireApp {
fn join_waiting_list(&self, player: PlayerId) {
let mut guard = self.waiting_list.lock().unwrap();
guard.push(player);
if guard.len() == GAME_SIZE {
let players = guard.split_off(0);
self.start_game(players);
}
}
}
取得互斥量的数据唯一的方式是调用.lock() 方法。
在阻塞结束,guard 被清除后,锁也会被释放。但也可以手工清除:
if guard.len() == GAME_SIZE {
let players = guard.split_off(0);
drop(guard);
self.start_game(players);
}
mut 与 Mutexmut:意味着专有、排他访问(exclusive access)。mut:意味着共享访问(shared access)。Mutex 互斥量,提供对数据的专有 mut 访问权,即使很多线程有对 Mutex 本身的共享(非 mut)访问权。Mutex。线程在尝试读取自己已经持有的锁时可能会造成死锁。
let mut guard1 = self.waiting_list.lock().unwrap();
let mut guard2 = self.waiting_list.lock().unwrap(); // 死锁
使用通道也有可能导致死锁。如两个线程相互阻塞,每个都等待从另一个接收消息。
如果线程在持有
Mutex
时诧异了,那么 Rust 会将
Mutex
标记为已中毒。
Mutex 的尝试都会得到一个错误结果。.unwrap() 调用告诉 Rust 在这种情况下要诧异,把其他线程的诧异传播到当前线程。Rust 通过毒化这个互斥量来防止其他线程在不经意间也出现这种局面。
PoisonError::into_inner()。一个通道只有一个 Receiver。
任何线程池都不能有多个线程使用一个 mpsc 通道共享工作成功。
绕过这个限制的例外方法:可以为 Receiver 添加一个 Mutex,然后再共享。
pub mod shared_channel {
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
/// 对Receiver的线程安全的封装
#[derive(Clone)]
pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);
// Arc<Mutex<Receiver<T>>>是嵌套的泛型
impl <T> Iterator for SharedReceiver<T> {
type Item = T;
/// 从封装的接收者获取下一项
fn next(&mut self) -> Option<T> {
let guard = self.0.lock().unwrap();
guard.recv().ok()
}
}
/// 创建一个新通道,其接收者可以跨线程共享。
/// 返回一个发送者和一个接收者,与stdlib的channel()类似。
/// 有时候可以直接代替它使用。
pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
let (sender, receiver) = channel();
(sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
}
}
RwLock<T>互斥量使用 lock 方法读写数据。
读写锁则使用 read 和 write 两个方法读写数据。
RwLock::write 方法:与 Mutex::lock 类似,都是等待获取对受保护数据的专有 mut 访问。RwLock::read 方法提供了非 mut 访问,几乎不需要等待,因为多个线程可以同时安全读取数据。互斥量与读写锁的区别:
优化上述 FernEmpireApp 程序:
创建一个结构体保存配置信息,并由 RwLock 保护。
use std::sync::RwLock;
struct FernEmpireApp {
...
config: RwLock<AppConfig>,
...
}
读取这个配置的方法可以使用 RwLock::read():
fn mushrooms_enabled(&self) -> bool {
let config_guard = self.config.read().unwrap();
config_guard.mushrooms_enabled
}
重新加载这个配置的方法,则使用 RwLock::write():
fn reload_config(&self) -> io::Result<()> {
let new_config = AppConfig::load()?;
let mut config_guard = self.config.write().unwrap();
*config_guard = new_config;
Ok(())
}
self.config.read() 返回一个守卫,可以提供对 AppConfig 的非 mut(即共享)访问;
self.config.write() 返回一个不同类型的守卫,可以提供 mut(即专有)访问。
Condvar)一个线程经常需要等待某个条件变为
true
:
针对需要等待的条件,会有方便的阻塞 API,如果没有,那么可以使用条件变量(condition variable)来构建自己的 API。
std::sync::Condvar 类型实现了条件变量。.wait() 方法可以阻塞到某些线程调用其.notify_all()。Mutex 与 Condvar 有直接关联:条件变量始终代表由某个 Mutex 保护的数据或真或假的条件。
std::sync::atomio
模块包含无锁并发编程所使用的原子类型。
AtomicIsize 和 AtomicUsize:是共享的整数类型,对应单线程的 isize 和 usize 类型。AtomicBool:是一个共享的 bool 值。AtomicPtr<T>:是不安全的指针类型 *mut T 的共享值。原子的最简单应用是取消操作。假设有一个线程正在执行某个耗时的计算任何,比如渲染视频,而我们希望能够异步取消这个操作。那么可以通过一个共享的 AtomicBool 来实现。渲染完每个像素,线程都会调用.load() 方法检查取消标志的值。
原子操作永远不会使用系统调用。加载和存储会编译为一个 CPU 指令。
原子、Mutex、RwLock 的方法可以用 self 的共享引用为参数。它们也可以作为简单的全局变量来使用。
Sync,又是非 mut。lazy_staic 包来实现。详见《Rust 程序设计》(吉姆 - 布兰迪、贾森 - 奥伦多夫著,李松峰译)第十九章
原文地址