第十一章:并发编程——无畏并行的艺术
本章导读:想象一家餐厅:如果只有一个服务员,顾客需要排队等待;如果有多个服务员同时工作,效率就会大大提升。并发编程就是让程序像多个服务员一样,同时处理多个任务。但多个人同时工作会带来协调问题——两个人不能同时端同一个盘子。Rust 的"无畏并发"哲学,让你在编译期就能发现并发错误,而不是等到生产环境才崩溃。
🧵 11.1 并发 vs 并行:概念澄清
在深入代码之前,我们先理解两个常被混淆的概念。
🔄 11.1.1 并发(Concurrency)
并发是指多个任务在逻辑上同时进行,但物理上可能交替执行。就像一个人同时处理多件事:写代码时回复消息、等待编译时看文档。
// 单线程上的并发:任务交替执行
// 时间片 1: 任务 A 执行一部分
// 时间片 2: 任务 B 执行一部分
// 时间片 3: 任务 A 继续执行
// ...
⚡ 11.1.2 并行(Parallelism)
并行是指多个任务在物理上同时执行,需要多核处理器。就像多人协作:一个人写前端,另一个人写后端。
// 多线程并行:任务真正同时执行
// CPU 核心 1: 任务 A
// CPU 核心 2: 任务 B
// CPU 核心 3: 任务 C
比喻时刻:并发是一个人同时抛多个球,并行是多个人各抛一个球。Rust 让你能够轻松写出既能并发又能并行的代码。
🚀 11.2 创建线程:thread::spawn
线程是操作系统调度的最小单位。Rust 使用 std::thread 模块创建和管理线程。
🌱 11.2.1 基本线程创建
use std::thread;
use std::time::Duration;
fn main() {
println!("主线程开始");
// spawn 创建新线程,返回 JoinHandle
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("子线程计数: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
// 主线程继续执行
for i in 1..=3 {
println!("主线程计数: {}", i);
thread::sleep(Duration::from_millis(150));
}
// 等待子线程完成(阻塞)
handle.join().unwrap();
println!("所有线程完成");
}
运行结果可能是:
主线程开始
主线程计数: 1
子线程计数: 1
子线程计数: 2
主线程计数: 2
子线程计数: 3
...
注意:每次运行的输出顺序可能不同,因为线程调度由操作系统决定。
🔒 11.2.2 闭包捕获与 move
线程可能比创建它的作用域存活更久,所以闭包需要获取捕获变量的所有权:
use std::thread;
fn main() {
let message = String::from("来自主线程的问候");
// 错误:线程可能在 message 被释放后还在运行
// thread::spawn(|| {
// println!("{}", message);
// });
// 正确:使用 move 转移所有权
let handle = thread::spawn(move || {
println!("子线程收到: {}", message);
// message 的所有权现在属于子线程
});
// println!("{}", message); // 错误:message 已被移动
handle.join().unwrap();
}
🏷️ 11.2.3 线程命名与信息
use std::thread;
fn main() {
let handle = thread::Builder::new()
.name("worker-1".to_string()) // 给线程命名
.stack_size(4 * 1024 * 1024) // 设置栈大小:4MB
.spawn(|| {
// 获取当前线程信息
let current = thread::current();
println!("线程名: {:?}", current.name());
})
.unwrap();
handle.join().unwrap();
}
📨 11.3 消息传递:Channel
"不要通过共享内存来通信,而要通过通信来共享内存"——这是 Go 语言的哲学,Rust 同样支持这种模式。
📬 11.3.1 创建 Channel
use std::sync::mpsc; // multi-producer, single-consumer
use std::thread;
fn main() {
// 创建通道,返回 (发送端, 接收端)
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let message = String::from("你好,来自子线程");
// 发送消息(可能失败,如果接收端已关闭)
tx.send(message).unwrap();
// message 的所有权已转移
// println!("{}", message); // 错误
});
// 接收消息(阻塞直到收到或通道关闭)
let received = rx.recv().unwrap();
println!("收到: {}", received);
}
🔄 11.3.2 发送多个消息
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
"第一条消息",
"第二条消息",
"第三条消息",
];
for msg in messages {
tx.send(msg.to_string()).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
// 使用迭代器持续接收
// 当通道关闭时迭代结束
for received in rx {
println!("收到: {}", received);
}
println!("通道已关闭");
}
📡 11.3.3 多生产者
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone(); // 克隆发送端
thread::spawn(move || {
let msgs = vec![
"生产者1: 消息A",
"生产者1: 消息B",
];
for msg in msgs {
tx.send(msg).unwrap();
}
});
thread::spawn(move || {
let msgs = vec![
"生产者2: 消息X",
"生产者2: 消息Y",
];
for msg in msgs {
tx2.send(msg).unwrap();
}
});
// 单个接收端处理所有消息
for received in rx {
println!("接收: {}", received);
}
}
架构启示:Channel 实现了生产者-消费者模式。想象一个工厂:多条生产线(生产者)制造产品,一条包装线(消费者)统一处理。这种解耦让系统更清晰。
🔒 11.4 互斥锁:Mutex<T>
当多个线程需要访问同一数据时,我们需要一种机制来保证同一时间只有一个线程能修改它。
🚪 11.4.1 Mutex 基础
use std::sync::Mutex;
fn main() {
// Mutex 包装数据
let m = Mutex::new(5);
{
// lock() 返回 MutexGuard,实现了 Deref
// 如果锁被占用,当前线程会阻塞
let mut num = m.lock().unwrap();
*num += 1;
// 离开作用域时自动释放锁
}
println!("m = {:?}", m);
}
比喻时刻:Mutex 就像公共卫生间——同一时间只能一个人使用。获得锁就像进入卫生间,释放锁就像离开。如果里面有人,你必须在外面等待。
🧵 11.4.2 多线程共享 Mutex
直接共享 Mutex 会遇到所有权问题:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc: 原子引用计数,允许多线程共享所有权
// Mutex: 保证互斥访问
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
// lock 在这里自动释放
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", *counter.lock().unwrap()); // 10
}
⚠️ 11.4.3 死锁陷阱
use std::sync::Mutex;
fn main() {
let data1 = Mutex::new(());
let data2 = Mutex::new(());
// 死锁场景:两个线程以不同顺序获取锁
// 线程 1: 先锁 data1,再锁 data2
// 线程 2: 先锁 data2,再锁 data1
// 结果:互相等待,永远阻塞
// 解决方案:
// 1. 始终以相同顺序获取锁
// 2. 尽量减少锁的持有时间
// 3. 考虑使用其他同步原语
}
📖 11.5 读写锁:RwLock<T>
Mutex 无论读写都独占访问。但多个读取操作其实是安全的——只有写入才需要互斥。RwLock 正是为这种场景设计的。
📚 11.5.1 基本用法
use std::sync::RwLock;
fn main() {
let lock = RwLock::new(5);
// 多个读取可以同时进行
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
println!("读取1: {}, 读取2: {}", *r1, *r2);
// r1 和 r2 可以共存
}
// 写入需要独占访问
{
let mut w = lock.write().unwrap();
*w += 1;
println!("写入: {}", *w);
// 在 w 存在期间,任何 read() 或 write() 都会阻塞
}
println!("最终值: {}", *lock.read().unwrap());
}
🎯 11.5.2 使用场景
use std::sync::{Arc, RwLock};
use std::thread;
use std::collections::HashMap;
fn main() {
// 共享的键值存储
let store = Arc::new(RwLock::new(HashMap::new()));
let mut handles = vec![];
// 写入线程
for i in 0..3 {
let store = Arc::clone(&store);
handles.push(thread::spawn(move || {
let mut w = store.write().unwrap();
w.insert(format!("key{}", i), i);
println!("写入 key{}", i);
}));
}
// 读取线程
for i in 0..3 {
let store = Arc::clone(&store);
handles.push(thread::spawn(move || {
// 短暂等待,让写入先完成
thread::sleep(std::time::Duration::from_millis(10));
let r = store.read().unwrap();
println!("读取: {:?}", *r);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
📊 11.5.3 Mutex vs RwLock
| 特性 | Mutex | RwLock |
|---|---|---|
| 读取 | 独占 | 共享 |
| 写入 | 独占 | 独占 |
| 开销 | 较低 | 较高 |
| 适用场景 | 读写混合、频繁写入 | 读多写少 |
选择指南:如果读操作远多于写操作,RwLock 能显著提升并发性能。如果读写比例接近或写入频繁,Mutex 的简单性可能更合适。
🏷️ 11.6 Send 和 Sync Trait
Rust 如何保证线程安全?答案在于 Send 和 Sync 这两个 marker trait。
📤 11.6.1 Send:可以在线程间移动
Send trait 表示类型可以安全地在线程间转移所有权。
// 大多数类型自动实现 Send
// - i32, String, Vec<T> (如果 T: Send)
// - Box<T> (如果 T: Send)
// - Mutex<T> (如果 T: Send)
// 非 Send 类型
// - Rc<T>: 非原子引用计数
// - Raw pointers: 原始指针
// - std::cell 中的类型: 单线程内部可变性
use std::rc::Rc;
use std::thread;
fn main() {
let rc = Rc::new(5);
// 编译错误!Rc 不是 Send
// thread::spawn(move || {
// println!("{}", rc);
// });
// 改用 Arc
use std::sync::Arc;
let arc = Arc::new(5);
thread::spawn(move || {
println!("{}", arc);
}).join().unwrap();
}
🤝 11.6.2 Sync:可以在线程间共享引用
Sync trait 表示类型可以安全地被多个线程同时持有不可变引用。
// T: Sync 意味着 &T: Send
// 即:可以发送 &T 到其他线程
// 自动实现 Sync 的类型
// - i32, String, Vec<T>
// - Mutex<T>: 通过内部锁定保证安全
// - Arc<T>: 如果 T: Sync
// 非 Sync 类型
// - Cell<T>, RefCell<T>: 非线程安全的内部可变性
// - Rc<T>
use std::cell::RefCell;
use std::sync::Arc;
fn main() {
// RefCell 不是 Sync,所以 Arc<RefCell> 不能跨线程
// let data = Arc::new(RefCell::new(0));
// 改用 Mutex
use std::sync::Mutex;
let data = Arc::new(Mutex::new(0));
// Mutex<T> 是 Sync(如果 T: Send)
// 所以 Arc<Mutex<T>> 是 Send + Sync
}
🔐 11.6.3 手动实现 Send/Sync(危险!)
// ⚠️ 极其危险!只有在完全理解后果时才这样做
// unsafe impl Send for MyType {}
// unsafe impl Sync for MyType {}
// Rust 的安全保证来自编译器的自动推导
// 手动实现可能破坏内存安全
第一性原理:Send 和 Sync 是 Rust 并发安全的核心。它们不是运行时检查,而是编译时保证。如果代码编译通过,就意味着不存在数据竞争(data race)。
🌊 11.7 Barrier:同步点
有时我们需要让多个线程在某个点汇合,然后一起继续。
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let n = 3;
let barrier = Arc::new(Barrier::new(n));
let mut handles = vec![];
for i in 0..n {
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
println!("线程 {} 到达第一阶段", i);
barrier.wait(); // 等待所有线程到达
println!("线程 {} 进入第二阶段", i);
barrier.wait(); // 再次同步
println!("线程 {} 完成", i);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
比喻:Barrier 就像登山队的会合点——所有队员必须到齐后才能继续前进。
🧪 11.8 实战:并行数据处理
让我们构建一个并行处理大量数据的实用程序——并行计算目录下所有文件的哈希值。
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let path = "."; // 当前目录
let results = Arc::new(Mutex::new(HashMap::new()));
let mut handles = vec![];
// 收集所有文件
let files: Vec<_> = fs::read_dir(path)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
.collect();
println!("发现 {} 个文件,开始并行处理...", files.len());
let start = std::time::Instant::now();
// 每个文件一个线程处理
for entry in files {
let results = Arc::clone(&results);
let file_path = entry.path();
handles.push(thread::spawn(move || {
// 模拟耗时操作(实际可用 blake3 或 sha2 计算)
let content = fs::read(&file_path).unwrap_or_default();
// 简单的"哈希":文件大小和首字节
let hash = if !content.is_empty() {
format!("size:{}:first:{:02x}", content.len(), content[0])
} else {
"empty".to_string()
};
let file_name = file_path.display().to_string();
// 安全地写入共享结果
let mut map = results.lock().unwrap();
map.insert(file_name, hash);
println!("处理完成: {}", file_path.display());
}));
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
let duration = start.elapsed();
println!("\n处理完成!耗时: {:?}", duration);
// 显示结果
let map = results.lock().unwrap();
for (file, hash) in map.iter() {
println!("{}: {}", file, hash);
}
}
🚀 11.8.1 优化:线程池
创建线程有开销,大量小任务应该使用线程池:
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0);
let (sender, receiver) = mpsc::channel::<Job>();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Self {
let thread = thread::spawn(move || loop {
// 获取任务(阻塞)
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => {
println!("Worker {} 执行任务", id);
job();
}
Err(_) => {
println!("Worker {} 退出", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..10 {
pool.execute(move || {
println!("任务 {} 开始", i);
thread::sleep(std::time::Duration::from_millis(100));
println!("任务 {} 完成", i);
});
}
// 给任务一些时间完成
thread::sleep(std::time::Duration::from_secs(2));
println!("主线程结束");
}
生产建议:实际项目中推荐使用成熟的线程池库,如
rayon或tokio的运行时。
📝 本章小结
本章我们探索了 Rust 的"无畏并发":
| 工具 | 用途 | 关键特性 |
|---|---|---|
thread::spawn | 创建线程 | 返回 JoinHandle |
mpsc::channel | 消息传递 | 生产者-消费者模式 |
Mutex<T> | 互斥访问 | 同一时间只有一个访问者 |
RwLock<T> | 读写分离 | 多读者或单写者 |
Arc<T> | 原子引用计数 | 跨线程共享所有权 |
Barrier | 同步点 | 等待所有线程汇合 |
核心概念:
- Send:类型可以在线程间移动
- Sync:类型可以在线程间共享引用
- 无畏并发:编译器帮助防止数据竞争
费曼技巧提问:为什么
Arc<Mutex>是线程安全的?Rc<RefCell>不是?提示:考虑 Send 和 Sync trait。
动手实验:
- 使用 Channel 实现一个简单的任务队列:主线程发送任务,工作线程接收并执行。
- 使用
Arc<RwLock<T>>实现一个线程安全的缓存,支持读写操作。- 比较使用单线程 vs 4 线程处理 1000 个小任务的时间差异(每个任务 sleep 1ms)。
- 解释为什么以下代码无法编译,并修复它:
``
rust let data = Rc::new(RefCell::new(0)); thread::spawn(move || { *data.borrow_mut() += 1; });``