• rust 多线程


    多线程并发编程

    使用多线程

    use std::thread;
    use std::time::Duration;
    
    fn main() {
        thread::spawn(|| {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        for i in 1..5 {
            println!("hi number {} from the main thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    有几点值得注意:

    • 线程内部的代码使用闭包来执行
    • main 线程一旦结束,程序就立刻结束,因此需要保持它的存活,直到其它子线程完成自己的任务
    • thread::sleep 会让当前线程休眠指定的时间,随后其它线程会被调度运行,因此就算你的电脑只有一个 CPU 核心,该程序也会表现的如同多 CPU 核心一般,这就是并发!
    在线程闭包中使用 move

    move 关键字在闭包中的使用可以让该闭包拿走环境中某个值的所有权,同样地,你可以使用 move 来将所有权从一个线程转移到另外一个线程。

    use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];
    
        let handle = thread::spawn(move || {
            println!("Here's a vector: {:?}", v);
        });
    
        handle.join().unwrap();
    
        // 下面代码会报错borrow of moved value: `v`
        // println!("{:?}",v);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    线程局部变量(Thread Local Variable)
    标准库
    use std::cell::RefCell;
    use std::thread;
    
    thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
    
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 1);
        *f.borrow_mut() = 2;
    });
    
    // 每个线程开始时都会拿到线程局部变量的FOO的初始值
    let t = thread::spawn(move|| {
        FOO.with(|f| {
            assert_eq!(*f.borrow(), 1);
            *f.borrow_mut() = 3;
        });
    });
    
    // 等待线程完成
    t.join().unwrap();
    
    // 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 2);
    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    第三方库
    use thread_local::ThreadLocal;
    use std::sync::Arc;
    use std::cell::Cell;
    use std::thread;
    
    let tls = Arc::new(ThreadLocal::new());
    
    // 创建多个线程
    for _ in 0..5 {
        let tls2 = tls.clone();
        thread::spawn(move || {
            // 将计数器加1
            let cell = tls2.get_or(|| Cell::new(0));
            cell.set(cell.get() + 1);
        }).join().unwrap();
    }
    
    // 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
    let tls = Arc::try_unwrap(tls).unwrap();
    let total = tls.into_iter().fold(0, |x, y| x + y.get());
    
    // 和为5
    assert_eq!(total, 5);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    线程间的消息传递

    use std::sync::mpsc;
    use std::thread;
    
    fn main() {
        // 创建一个消息通道, 返回一个元组:(发送者,接收者)
        let (tx, rx) = mpsc::channel();
    
        // 创建线程,并发送消息
        thread::spawn(move || {
            // 发送一个数字1, send方法返回Result,通过unwrap进行快速错误处理
            tx.send(1).unwrap();
    
            // 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option类型将产生不匹配错误
            // tx.send(Some(1)).unwrap()
        });
    
        // 在主线程中接收子线程发送的消息并输出
        println!("receive {}", rx.recv().unwrap());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 若值的类型实现了Copy特征,则直接复制一份该值,然后传输过去,例如之前的i32类型
    • 若值没有实现Copy,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
    使用多发送者
    use std::sync::mpsc;
    use std::thread;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
        let tx1 = tx.clone();
        thread::spawn(move || {
            tx.send(String::from("hi from raw tx")).unwrap();
        });
    
        thread::spawn(move || {
            tx1.send(String::from("hi from cloned tx")).unwrap();
        });
    
        for received in rx {
            println!("Got: {}", received);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    以上代码并不复杂,但仍有几点需要注意:

    • tx,rx对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)发送了整数,因此它们分别是mpsc::Sender和mpsc::Receiver类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值, 例如此例中非i32类型的值将导致编译错误
    • 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
    • 需要使用move将tx的所有权转移到子线程的闭包中
    同步和异步通道
    异步通道
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    fn main() {
        let (tx, rx)= mpsc::channel();
    
        let handle = thread::spawn(move || {
            println!("发送之前");
            tx.send(1).unwrap();
            println!("发送之后");
        });
    
        println!("睡眠之前");
        thread::sleep(Duration::from_secs(3));
        println!("睡眠之后");
    
        println!("receive {}", rx.recv().unwrap());
        handle.join().unwrap();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    同步通道
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    fn main() {
        // 设置消息队列大小
        let (tx, rx)= mpsc::sync_channel(0);
    
        let handle = thread::spawn(move || {
            println!("发送之前");
            tx.send(1).unwrap();
            println!("发送之后");
        });
    
        println!("睡眠之前");
        thread::sleep(Duration::from_secs(3));
        println!("睡眠之后");
    
        println!("receive {}", rx.recv().unwrap());
        handle.join().unwrap();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    传输多种类型的数据
    use std::sync::mpsc::{self, Receiver, Sender};
    
    enum Fruit {
        Apple(u8),
        Orange(String)
    }
    
    fn main() {
        let (tx, rx): (Sender, Receiver) = mpsc::channel();
    
        tx.send(Fruit::Orange("sweet".to_string())).unwrap();
        tx.send(Fruit::Apple(2)).unwrap();
    
        for _ in 0..2 {
            match rx.recv().unwrap() {
                Fruit::Apple(count) => println!("received {} apples", count),
                Fruit::Orange(flavor) => println!("received {} oranges", flavor),
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    线程同步:锁、Condvar 和信号量

    共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:

    • 共享内存相对消息传递能节省多次内存拷贝的成本

    • 共享内存的实现简洁的多

    • 共享内存的锁竞争更多
      消息传递适用的场景很多,我们下面列出了几个主要的使用场景:

    • 需要可靠和简单的(简单不等于简洁)实现时

    • 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时

    • 需要一个任务处理流水线(管道)时,等等

    互斥锁 Mutex
    use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn main() {
        let counter = Arc::new(Mutex::new(0));
        let mut handles = vec![];
    
        for _ in 0..10 {
            let counter = Arc::clone(&counter);
            let handle = thread::spawn(move || {
                let mut num = counter.lock().unwrap();
    
                *num += 1;
            });
            handles.push(handle);
        }
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        println!("Result: {}", *counter.lock().unwrap());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    读写锁 RwLock
    use std::sync::RwLock;
    
    fn main() {
        let lock = RwLock::new(5);
    
        // 同一时间允许多个读
        {
            let r1 = lock.read().unwrap();
            let r2 = lock.read().unwrap();
            assert_eq!(*r1, 5);
            assert_eq!(*r2, 5);
        } // 读锁在此处被drop
    
        // 同一时间只允许一个写
        {
            let mut w = lock.write().unwrap();
            *w += 1;
            assert_eq!(*w, 6);
    
            // 以下代码会panic,因为读和写不允许同时存在
            // 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
            // let r1 = lock.read();
            // println!("{:?}",r1);
        }// 写锁在此处被drop
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    同时允许多个读,但最多只能有一个写
    读和写不能同时存在
    读可以使用read、try_read,写write、try_write, 在实际项目中,try_xxx会安全的多

    线程同步:Atomic 原子类型与内存顺序

    原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。

    use std::ops::Sub;
    use std::sync::atomic::{AtomicU64, Ordering};
    use std::thread::{self, JoinHandle};
    use std::time::Instant;
    
    const N_TIMES: u64 = 10000000;
    const N_THREADS: usize = 10;
    
    static R: AtomicU64 = AtomicU64::new(0);
    
    fn add_n_times(n: u64) -> JoinHandle<()> {
        thread::spawn(move || {
            for _ in 0..n {
                R.fetch_add(1, Ordering::Relaxed);
            }
        })
    }
    
    fn main() {
        let s = Instant::now();
        let mut threads = Vec::with_capacity(N_THREADS);
    
        for _ in 0..N_THREADS {
            threads.push(add_n_times(N_TIMES));
        }
    
        for thread in threads {
            thread.join().unwrap();
        }
    
        assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
    
        println!("{:?}",Instant::now().sub(s));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    Atomic 能替代锁吗

    对于复杂的场景下,锁的使用简单粗暴,不容易有坑
    std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等,而锁可以应用于各种类型
    在有些情况下,必须使用锁来配合,例如上一章节中使用Mutex配合Condvar

    基于 Send 和 Sync 的线程安全

  • 相关阅读:
    【Java】三种方案实现 Redis 分布式锁
    Leetcode 1584. 连接所有点的最小费用(手撸普利姆算法)
    【C++】类和对象(上)
    力扣刷题-数组-二分查找总结
    HTTP和HTTPS详解
    【FastAPI】实现服务器向客户端发送SSE(Server-Sent Events)广播
    一站式BI解决方案:从数据采集到处理分析,全面满足决策支持需求
    QT QAxWidget控件 使用详解
    我第一个开源AI小产品-video2blog即将正式发布
    随身WIFI刷真Linux(Debian)系统搭配拓展坞做超低功耗服务器
  • 原文地址:https://blog.csdn.net/studycodeday/article/details/133715797