线程可以用于同时执行几个完全不相关的任务。
编写一个单线程的程序:
fn process_files(filenames: Vec<String>) -> io::Result<()> {
for document in filenames {
let text = load(&document)?; // 读取源文件
let results = process(text); // 计算统计值
save(&document, results)?; // 写入输出文件
}
Ok(())
}
使用 “并行分叉 — 合并” 模式,可以实现多线程任务执行。
使用 std::thread::spawn 函数可以产生一个新线程:
spawn(|| {
println!("hello from a child thread");
})
FnOnce 闭包或函数。使用 spawn 实现前面 process_files 函数的并行版本:
use std::thead::spawn;
fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {
// 将工作分成几块
const NTHREADS: usize = 8;
let worklists = split_vec_into_chunks(filenames, NTHREADS);
// 分叉:每个块产生一个线程来处理
let mut thread_handles = vec![];
for worklist in worklists {
thread_handles.push(spawn(move || process_files(worklist)));
}
// 合并:等待所有线程完成
for handle in thread_handles {
handle.join.unwrap()?;
}
Ok(())
}
把文件名列表转换为工作线程的过程:
for 循环定义并转移 worklist;move 闭包时,wordlist 被转移到闭包中;spawn 将闭包(以及 wordlist 向量)转移到新的子线程中。handle.join()
方法:
std::thread::Result,如果子线程诧异则是一个错误。诧异不会自动从一个线程传播到依赖它的其他线程。
一个线程的诧异在其他线程中会体现为包含错误的 Result。
handle.join().unwrap():.unwrap() 执行断言操作,实现诧异的传播。如果子线程诧异后,那么会返回 Ok 结果。那么其调用的父线程也会诧异。相当于显示地将诧异从子线程传播到父线程。
在向函数中传引用的时候,如果一个线程触发了 IO 错误,就可能导致调用函数在其他线程完成前退出。那么其他子线程就有可能在主线程被释放之后还继续使用传入的参数,这样就会导致数据发生争用。
在 Rust 中是不允许这种情况发生的。只要有任何线程拥有 Arc<GigabyteMap>,映射就不会释放,即使父线程已经退出了。因为 Arc 中的数据是不可修改的,不会出现任何数据争用。
use std::sync::Arc;
fn process_files_in_parallel(filenames: Vec<String>, glossary: Arc<GigabyteMap>)
-> io::Result<()>
{
...
for worklist in worklists {
// 调用.clone(),克隆Arc并触发引用计数。不会克隆GigabyteMap
let glossary_for_child = glossary.clone();
thread_handles.push(
spawn(move || process_files(worklist, &glossary_for_child))
);
}
...
}
Rayon 库:专门为 “并行分叉 — 合并” 模式设计,提供两种运行并发任务的方式:
extern crate rayon;
use rayon::prelude::*;
// 并行实现两个任务
let (v1, v2) = rayon::join(fn1, fn2);
// 并行实现N个任务
giant_vector.par_iter().for_each(|value| { // 创建一个ParallelIterator,类似迭代器。
do_thing_with_value(value);
});
使用 Rayon 重写 process_files_in_parallel:
extern crate rayon;
use rayon::prelude::*;
fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap)
-> io::Result<()>
{
filenames.par_iter() // 创建并行迭代器
// 调用文件名,得到一个ParallelIterator
.map(|filename| process_file(filename, glossary))
// 组合结果,返回一个Option,只有filename为空是才是None
// 任何后台发生的并行处理,都可保证reduce_with返回时才完成。
.reduce_with(|r1, r2| {
if r1.is_err() {
r1
} else {
r2
}
})
.unwrap_or(Ok(())) // 让结果Ok(())
}
Rayon 也支持在线程间共享引用。
要使用 Rayon 需要添加以下代码:
main.rs 中:
extern crate rayon;
use rayon::prelude::*;
Cargo.toml 中:
[dependencies]
rayon = "0.4"
通道(channel):把值从一个线程发送到另一个线程的单向管道,本质是一个线程安全的队列。
std::sync::mps 模块的一部分。sender.send(item)
把一个值放进通道,
receiver.recv()
则移除一个值。
receiver.recv() 会一直阻塞直到有值发送。Rust 通道比管道快:
倒排索引(inverted index):一种数据库,可以查询哪个关键词在哪里出现过。是实现搜索引擎的关键之一。
启动读取文件线程的代码:
use std::fs::File;
use std::io::prelude::*; // 使用Read::read_to_string
use std::thread::spawn;
use std::sync::mpsc::channel;
let (sender, receiver) = channel(); // 队列数据结构,返回一对值:发送者和接收者。
let handle = spawn( // 启动线程std::thread::spawn。
// 发送者的所有权会通过move闭包返回转移给新线程。
move || { // Rust执行类型推断,判断通道的类型
for filename in documents {
let mut f = File::open(filename)?; // 从磁盘读取文件
let mut text = String::new();
f.read_to_string(&mut text)?;
if sender.send(text).is_err() { // 读取文件后,把其文本内容text发送给通道
break;
}
}
Ok(()) // 线程读取完所有文档后,程序返回Ok(())
}
);
创建第二个线程循环调用 receiver.recv()。
// while循环实现
while let Ok(text) = receiver.recv() {
do_something_with(text);
}
// for循环实现
for text in receiver {
do_something_with(text);
}
接收者线程示例:
fn start_file_indexing_thread(texts: Receiver<>)
-> (Receiver<InMemoryIndex>, JoinHandle<()>)
{
let (sender, receiver) = channel();
let handle = spawn(
move || {
for (doc_id, text) in texts.into_iter().enumerate() {
let index = InMemoryIndex::from_single_document(doc_id, text);
if sender.send(index).is_err() {
break;
}
}
}
);
(receiver, handle)
}
接收时,如果线程发生了 IO 错误,会立即退出,而错误会存储在线程的 JoinHandle 中。包装组合接收者、返回者和新线程的 JoinHandle 代码如下:
fn start_file_reader_thread(documents: Vec<PathBuf>)
-> (Receiver<Strig>, JoinHandle<io::Result<()>>)
{
let (sender, receiver) = channel();
let handle = spawn(
move || { ... }
);
(receiver, handle)
}
在内存中合并索引,直至足够大:
fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>)
-> (Receiver<InMemoryIndex>, JoinHandle<()>)
把大索引写入磁盘:
fn start_index_write_thread(big_indexes: Receiver<InMemoryIndex>, output_dir: &Path)
-> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)
如果有多个大文件,则使用基于文件的合并算法,将它们合并起来:
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
-> io::Result<()>
启动线程,并检查错误:
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
-> io::Result<()>
{
// 启动管道的全部5个阶段
let (texts, h1) = start_file_reader_thread(documents);
let (pints, h2) = start_file_indexing_thread(texts);
let (gallons, h3) = start_in_memory_merge_thread(pints);
let (files, h4) = start_index_writer_thread(gallons, &output_dir);
let result = merge_index_files(files, &output_dir);
// 等待线程完成,保存任何错误
let r1 = h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
let r4 = h4.join().unwrap();
// 返回遇到的第一个错误
// h2和h3不会失败,因为它们是纯内存数据处理
r1?;
r4?;
result
}
管道实现流水线作业,整体性能受限于最慢阶段的生成能力。
std::sync::mps 特型:mpsc(multi-producer, single-consumer)是多生产者,单消费者。
Sender<T> 实现了 Clone 特型:实现创建一个常规通道,然后再克隆发送者。可以把每个 Sender 值转移到不同线程。
Receiver<T> 无法克隆,如果需要多个线程从同一个通道接收值,就需要使用 Mutex。
反压力(backpressure):如果发送值的速度超过接收和处理值的速度,就会导致通道内部的值越积越多。
同步通道(synchronous channel):Unix 的每个管道都有固定大小,如果一个进程尝试向随时可能满的管道写入数据,系统就会直接阻塞该进程,直至管道中有了空间。
use std::sync::mpsc::sync_channel;
let (sender, receiver) = sync_channel(1000);
sender.send(value) 是一个潜在的阻塞操作。std::marker::Send 特型:实现 Send 的类型,可以安全地把值传到另一个线程,即实现线程间转移值。std::marker::Sync 特型:实现 Sync 的类型,可以安全地把不可修改引用传到另一个线程,即他们可以在线程间共享值。#[derive] 派生。Send 和 Sync 的类型主要用于在非线程安全的条件下提供可修改能力。例如引用计数指针类型 std::rc::Rc<T>。Rust 要求在通过 spawn 创建线程时,传入的闭包必须是 Send。下述实现适用于所有迭代器。
use std::thread::spawn;
impl<T> OffThreadExt for T where T: Iterator + Send + 'static, T::Item: Send + 'static {
fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
// 创建一个通道并将工作线程中的项传出来
let (sender, receiver) = mpsc::sync_channel(1024);
// 把这个迭代器转移到一个新线程中并在那里运行它
spawn(
move || {
for item in self {
if sender.send(item).is_err() {
break;
}
}
}
);
// 返回一个从通道提取值的迭代器
receiver.into_iter()
}
}
详见《Rust 程序设计》(吉姆 - 布兰迪、贾森 - 奥伦多夫著,李松峰译)第十九章
原文地址