• 【Rust 笔记】17-并发(下)


    17.3 - 共享可修改状态

    17.3.1 - 互斥量

    • 互斥量(或称锁):用于强制多线程依次访问特定的数据。

    • C++ 的互斥量实现举例:

      void FernEngine::JoinWaitingList(PlayerId player) {
          mutex.Acquire();  // 临界区(critical section):开始
      
          waitingList.push_back(player);
      
          // 如果等待的玩家满足条件则开始游戏
          if (waitingList.length() >= GAME_SIZE) {
              vector<PlayerId> players;
              waitingList.swap(players);
              StartGame(players);
          }
          mutex.Release();  // 临界区(critical section):结束
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    • 互斥量的作用:

      • 防止数据争用,避免多个线程并发读写同一块内存。
      • 防止不同线程的操作出现相互交错。
      • 互斥量支持通过不变性(invariant)编程,受保护数据由执行者负责初始化,由每个临界区来维护。

    17.3.2-Mutex<T>

    • Rust 中受保护的数据保存在 Mutex 内部:

      let app = Arc::new(FernEmpireApp { // 创建整个应用,分配在堆上
          ...
          waiting_list: Mutex::new(vec![]),
          ...
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • Arc 方便跨线程共享数据。
      • Mutex 方便跨线程共享可修改数据。
    • 举例:使用互斥量

      impl FernEmpireApp {
          fn join_waiting_list(&self, player: PlayerId) {
              let mut guard = self.waiting_list.lock().unwrap();
      
              guard.push(player);
              if guard.len() == GAME_SIZE {
                  let players = guard.split_off(0);
                  self.start_game(players);
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 取得互斥量的数据唯一的方式是调用.lock() 方法。

      • 在阻塞结束,guard 被清除后,锁也会被释放。但也可以手工清除:

        if guard.len() == GAME_SIZE {
            let players = guard.split_off(0);
            drop(guard);
            self.start_game(players);
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5

    17.3.3-mutMutex

    • mut:意味着专有、排他访问(exclusive access)。
    • mut:意味着共享访问(shared access)。
    • Mutex 互斥量,提供对数据的专有 mut 访问权,即使很多线程有对 Mutex 本身的共享(非 mut)访问权。
    • Rust 编译器在编译时,通过类型系统可以动态控制专有访问。

    17.3.4 - 互斥量的问题

    • 只依赖 “并行分叉 — 合并” 的程序具有确定性,不可能死锁。
    • 专门使用通道实现管道操作的程序也具有确定性,虽然消息传输的时间可能不同,但不会影响输出。
    • 数据争用:多线程并发读取同一块内存导致产生无意义的结果。安全的 Rust 代码不会触发数据争用。
    • 互斥量可能存在问题:
      • 有效的 Rust 程序不会出现数据争用,但仍然可能存在竞态条件(race condition),即程序行为取决于线程的执行时间,因此每次运行的结果可能都不相同。以非结构化方式使用互斥量会导致竞态条件。
      • 共享的可修改状态会影响程序设计。通道可以作为代码中抽象的边界。而互斥量鼓励添加一个方法解决问题,可能导致代码纠缠,难以剥离。
      • 互斥量的实现较为复杂。
    • 尽可能使用结构化方式编程,在必需时再使用互斥量 Mutex

    17.3.5 - 死锁

    • 线程在尝试读取自己已经持有的锁时可能会造成死锁。

      let mut guard1 = self.waiting_list.lock().unwrap();
      let mut guard2 = self.waiting_list.lock().unwrap(); // 死锁
      
      • 1
      • 2
    • 使用通道也有可能导致死锁。如两个线程相互阻塞,每个都等待从另一个接收消息。

    17.3.6 - 中毒的互斥量

    • 如果线程在持有

      Mutex
      
      • 1

      时诧异了,那么 Rust 会将

      Mutex
      
      • 1

      标记为已中毒。

      • 后续想要锁住这个受污染的 Mutex 的尝试都会得到一个错误结果。
      • .unwrap() 调用告诉 Rust 在这种情况下要诧异,把其他线程的诧异传播到当前线程。
      • 诧异的线程保证了程序其余部分出在安全状态。
    • Rust 通过毒化这个互斥量来防止其他线程在不经意间也出现这种局面。

      • 在完全互斥的情况下,可以锁住中毒的互斥量,同时访问其中的数据。
      • 详见 PoisonError::into_inner()

    17.3.7 - 使用互斥量的多消费者通道

    • 一个通道只有一个 Receiver

      • 任何线程池都不能有多个线程使用一个 mpsc 通道共享工作成功。

      • 绕过这个限制的例外方法:可以为 Receiver 添加一个 Mutex,然后再共享。

        pub mod shared_channel {
            use std::sync::{Arc, Mutex};
            use std::sync::mpsc::{channel, Sender, Receiver};
        
            /// 对Receiver的线程安全的封装
            #[derive(Clone)]
            pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>); 
            // Arc<Mutex<Receiver<T>>>是嵌套的泛型
            impl <T> Iterator for SharedReceiver<T> {
                type Item = T;
        
                /// 从封装的接收者获取下一项
                fn next(&mut self) -> Option<T> {
                    let guard = self.0.lock().unwrap();
                    guard.recv().ok()
                }
            }
        
            /// 创建一个新通道,其接收者可以跨线程共享。
            /// 返回一个发送者和一个接收者,与stdlib的channel()类似。
            /// 有时候可以直接代替它使用。
            pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
                let (sender, receiver) = channel();
                (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26

    17.3.8 - 读写锁与 RwLock<T>

    • 互斥量使用 lock 方法读写数据。

    • 读写锁则使用 readwrite 两个方法读写数据。

      • RwLock::write 方法:与 Mutex::lock 类似,都是等待获取对受保护数据的专有 mut 访问。
      • RwLock::read 方法提供了非 mut 访问,几乎不需要等待,因为多个线程可以同时安全读取数据。
    • 互斥量与读写锁的区别:

      • 任意给定时刻,受保护数据只能有一个读取器或写入器。
      • 在使用读写锁的情况下,则可以有一个写入器或多个读取器,类似于 Rust 引用。
      • 优先推荐使用读写锁,而不是互斥量。
    • 优化上述 FernEmpireApp 程序:

      • 创建一个结构体保存配置信息,并由 RwLock 保护。

        use std::sync::RwLock;
        struct FernEmpireApp {
            ...
            config: RwLock<AppConfig>,
            ...
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
      • 读取这个配置的方法可以使用 RwLock::read()

        fn mushrooms_enabled(&self) -> bool {
            let config_guard = self.config.read().unwrap();
            config_guard.mushrooms_enabled
        }
        
        • 1
        • 2
        • 3
        • 4
      • 重新加载这个配置的方法,则使用 RwLock::write()

        fn reload_config(&self) -> io::Result<()> {
            let new_config = AppConfig::load()?;
            let mut config_guard = self.config.write().unwrap();
            *config_guard = new_config;
            Ok(())
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
      • self.config.read() 返回一个守卫,可以提供对 AppConfig 的非 mut(即共享)访问;

      • self.config.write() 返回一个不同类型的守卫,可以提供 mut(即专有)访问。

    17.3.9 - 条件变量(Condvar

    • 一个线程经常需要等待某个条件变为

      true
      
      • 1

      • 在服务器关机期间,主线程可能需要等待所有其他线程完全退出。
      • 工作线程在闲置时,需要等待要处理的数据。
      • 实现分布式共识协议的线程,需要等待足够多对等线程的响应。
    • 针对需要等待的条件,会有方便的阻塞 API,如果没有,那么可以使用条件变量(condition variable)来构建自己的 API。

      • std::sync::Condvar 类型实现了条件变量。
      • 它的.wait() 方法可以阻塞到某些线程调用其.notify_all()
    • MutexCondvar 有直接关联:条件变量始终代表由某个 Mutex 保护的数据或真或假的条件。

    17.3.10 - 原子类型

    • std::sync::atomio
      
      • 1

      模块包含无锁并发编程所使用的原子类型。

      • AtomicIsizeAtomicUsize:是共享的整数类型,对应单线程的 isizeusize 类型。
      • AtomicBool:是一个共享的 bool 值。
      • AtomicPtr<T>:是不安全的指针类型 *mut T 的共享值。
    • 原子的最简单应用是取消操作。假设有一个线程正在执行某个耗时的计算任何,比如渲染视频,而我们希望能够异步取消这个操作。那么可以通过一个共享的 AtomicBool 来实现。渲染完每个像素,线程都会调用.load() 方法检查取消标志的值。

    • 原子操作永远不会使用系统调用。加载和存储会编译为一个 CPU 指令。

    • 原子、MutexRwLock 的方法可以用 self 的共享引用为参数。它们也可以作为简单的全局变量来使用。

    17.3.11 - 全局变量

    • 对于全局变量:必须通过某种方式保证线程安全。静态变量必须既是 Sync,又是非 mut
    • 静态初始化器不能调用函数。可以使用 lazy_staic 包来实现。

    详见《Rust 程序设计》(吉姆 - 布兰迪、贾森 - 奥伦多夫著,李松峰译)第十九章
    原文地址

  • 相关阅读:
    2022年第二季度全球网络攻击创新高,如何有效防范网络攻击
    Pandas行列转换
    【教程】在 visual studio 共享和重用项目属性
    如何解决网页中的pdf文件无法下载?pdf打印显示空白怎么办?
    Linux网络基础-6
    activiti7.0工作流,举个例子实现自定义任务监听和执行监听代码
    Vue中的生命周期
    qt mingw 生成DUMP文件,和pdb文件
    2023最新SSM计算机毕业设计选题大全(附源码+LW)之java星光之夜香水网站的设计与开发bfmcr
    jsp528口腔牙医诊所预约挂号ssm+mysql Springboot
  • 原文地址:https://blog.csdn.net/feiyanaffection/article/details/125575417