第十一章:并发编程——无畏并行的艺术

第十一章:并发编程——无畏并行的艺术

本章导读:想象一家餐厅:如果只有一个服务员,顾客需要排队等待;如果有多个服务员同时工作,效率就会大大提升。并发编程就是让程序像多个服务员一样,同时处理多个任务。但多个人同时工作会带来协调问题——两个人不能同时端同一个盘子。Rust 的"无畏并发"哲学,让你在编译期就能发现并发错误,而不是等到生产环境才崩溃。


🧵 11.1 并发 vs 并行:概念澄清

在深入代码之前,我们先理解两个常被混淆的概念。

🔄 11.1.1 并发(Concurrency)

并发是指多个任务在逻辑上同时进行,但物理上可能交替执行。就像一个人同时处理多件事:写代码时回复消息、等待编译时看文档。

// 单线程上的并发:任务交替执行
// 时间片 1: 任务 A 执行一部分
// 时间片 2: 任务 B 执行一部分
// 时间片 3: 任务 A 继续执行
// ...

⚡ 11.1.2 并行(Parallelism)

并行是指多个任务在物理上同时执行,需要多核处理器。就像多人协作:一个人写前端,另一个人写后端。

// 多线程并行:任务真正同时执行
// CPU 核心 1: 任务 A
// CPU 核心 2: 任务 B
// CPU 核心 3: 任务 C

比喻时刻:并发是一个人同时抛多个球,并行是多个人各抛一个球。Rust 让你能够轻松写出既能并发又能并行的代码。


🚀 11.2 创建线程:thread::spawn

线程是操作系统调度的最小单位。Rust 使用 std::thread 模块创建和管理线程。

🌱 11.2.1 基本线程创建

use std::thread;
use std::time::Duration;

fn main() {
    println!("主线程开始");

    // spawn 创建新线程,返回 JoinHandle
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("子线程计数: {}", i);
            thread::sleep(Duration::from_millis(100));
        }
    });

    // 主线程继续执行
    for i in 1..=3 {
        println!("主线程计数: {}", i);
        thread::sleep(Duration::from_millis(150));
    }

    // 等待子线程完成(阻塞)
    handle.join().unwrap();
    println!("所有线程完成");
}

运行结果可能是:

主线程开始
主线程计数: 1
子线程计数: 1
子线程计数: 2
主线程计数: 2
子线程计数: 3
...

注意:每次运行的输出顺序可能不同,因为线程调度由操作系统决定。

🔒 11.2.2 闭包捕获与 move

线程可能比创建它的作用域存活更久,所以闭包需要获取捕获变量的所有权:

use std::thread;

fn main() {
    let message = String::from("来自主线程的问候");

    // 错误:线程可能在 message 被释放后还在运行
    // thread::spawn(|| {
    //     println!("{}", message);
    // });

    // 正确:使用 move 转移所有权
    let handle = thread::spawn(move || {
        println!("子线程收到: {}", message);
        // message 的所有权现在属于子线程
    });

    // println!("{}", message);  // 错误:message 已被移动

    handle.join().unwrap();
}

🏷️ 11.2.3 线程命名与信息

use std::thread;

fn main() {
    let handle = thread::Builder::new()
        .name("worker-1".to_string())  // 给线程命名
        .stack_size(4 * 1024 * 1024)   // 设置栈大小:4MB
        .spawn(|| {
            // 获取当前线程信息
            let current = thread::current();
            println!("线程名: {:?}", current.name());
        })
        .unwrap();

    handle.join().unwrap();
}

📨 11.3 消息传递:Channel

"不要通过共享内存来通信,而要通过通信来共享内存"——这是 Go 语言的哲学,Rust 同样支持这种模式。

📬 11.3.1 创建 Channel

use std::sync::mpsc;  // multi-producer, single-consumer
use std::thread;

fn main() {
    // 创建通道,返回 (发送端, 接收端)
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let message = String::from("你好,来自子线程");
        // 发送消息(可能失败,如果接收端已关闭)
        tx.send(message).unwrap();
        // message 的所有权已转移
        // println!("{}", message);  // 错误
    });

    // 接收消息(阻塞直到收到或通道关闭)
    let received = rx.recv().unwrap();
    println!("收到: {}", received);
}

🔄 11.3.2 发送多个消息

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messages = vec![
            "第一条消息",
            "第二条消息",
            "第三条消息",
        ];

        for msg in messages {
            tx.send(msg.to_string()).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
    });

    // 使用迭代器持续接收
    // 当通道关闭时迭代结束
    for received in rx {
        println!("收到: {}", received);
    }

    println!("通道已关闭");
}

📡 11.3.3 多生产者

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx2 = tx.clone();  // 克隆发送端

    thread::spawn(move || {
        let msgs = vec![
            "生产者1: 消息A",
            "生产者1: 消息B",
        ];
        for msg in msgs {
            tx.send(msg).unwrap();
        }
    });

    thread::spawn(move || {
        let msgs = vec![
            "生产者2: 消息X",
            "生产者2: 消息Y",
        ];
        for msg in msgs {
            tx2.send(msg).unwrap();
        }
    });

    // 单个接收端处理所有消息
    for received in rx {
        println!("接收: {}", received);
    }
}

架构启示:Channel 实现了生产者-消费者模式。想象一个工厂:多条生产线(生产者)制造产品,一条包装线(消费者)统一处理。这种解耦让系统更清晰。


🔒 11.4 互斥锁:Mutex<T>

当多个线程需要访问同一数据时,我们需要一种机制来保证同一时间只有一个线程能修改它。

🚪 11.4.1 Mutex 基础

use std::sync::Mutex;

fn main() {
    // Mutex 包装数据
    let m = Mutex::new(5);

    {
        // lock() 返回 MutexGuard,实现了 Deref
        // 如果锁被占用,当前线程会阻塞
        let mut num = m.lock().unwrap();
        *num += 1;
        // 离开作用域时自动释放锁
    }

    println!("m = {:?}", m);
}

比喻时刻:Mutex 就像公共卫生间——同一时间只能一个人使用。获得锁就像进入卫生间,释放锁就像离开。如果里面有人,你必须在外面等待。

🧵 11.4.2 多线程共享 Mutex

直接共享 Mutex 会遇到所有权问题:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc: 原子引用计数,允许多线程共享所有权
    // Mutex: 保证互斥访问
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
            // lock 在这里自动释放
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("结果: {}", *counter.lock().unwrap());  // 10
}

⚠️ 11.4.3 死锁陷阱

use std::sync::Mutex;

fn main() {
    let data1 = Mutex::new(());
    let data2 = Mutex::new(());

    // 死锁场景:两个线程以不同顺序获取锁
    // 线程 1: 先锁 data1,再锁 data2
    // 线程 2: 先锁 data2,再锁 data1
    // 结果:互相等待,永远阻塞

    // 解决方案:
    // 1. 始终以相同顺序获取锁
    // 2. 尽量减少锁的持有时间
    // 3. 考虑使用其他同步原语
}

📖 11.5 读写锁:RwLock<T>

Mutex 无论读写都独占访问。但多个读取操作其实是安全的——只有写入才需要互斥。RwLock 正是为这种场景设计的。

📚 11.5.1 基本用法

use std::sync::RwLock;

fn main() {
    let lock = RwLock::new(5);

    // 多个读取可以同时进行
    {
        let r1 = lock.read().unwrap();
        let r2 = lock.read().unwrap();
        println!("读取1: {}, 读取2: {}", *r1, *r2);
        // r1 和 r2 可以共存
    }

    // 写入需要独占访问
    {
        let mut w = lock.write().unwrap();
        *w += 1;
        println!("写入: {}", *w);
        // 在 w 存在期间,任何 read() 或 write() 都会阻塞
    }

    println!("最终值: {}", *lock.read().unwrap());
}

🎯 11.5.2 使用场景

use std::sync::{Arc, RwLock};
use std::thread;
use std::collections::HashMap;

fn main() {
    // 共享的键值存储
    let store = Arc::new(RwLock::new(HashMap::new()));
    let mut handles = vec![];

    // 写入线程
    for i in 0..3 {
        let store = Arc::clone(&store);
        handles.push(thread::spawn(move || {
            let mut w = store.write().unwrap();
            w.insert(format!("key{}", i), i);
            println!("写入 key{}", i);
        }));
    }

    // 读取线程
    for i in 0..3 {
        let store = Arc::clone(&store);
        handles.push(thread::spawn(move || {
            // 短暂等待,让写入先完成
            thread::sleep(std::time::Duration::from_millis(10));
            let r = store.read().unwrap();
            println!("读取: {:?}", *r);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

📊 11.5.3 Mutex vs RwLock

特性MutexRwLock
读取独占共享
写入独占独占
开销较低较高
适用场景读写混合、频繁写入读多写少

选择指南:如果读操作远多于写操作,RwLock 能显著提升并发性能。如果读写比例接近或写入频繁,Mutex 的简单性可能更合适。


🏷️ 11.6 Send 和 Sync Trait

Rust 如何保证线程安全?答案在于 SendSync 这两个 marker trait。

📤 11.6.1 Send:可以在线程间移动

Send trait 表示类型可以安全地在线程间转移所有权。

// 大多数类型自动实现 Send
// - i32, String, Vec<T> (如果 T: Send)
// - Box<T> (如果 T: Send)
// - Mutex<T> (如果 T: Send)

// 非 Send 类型
// - Rc<T>: 非原子引用计数
// - Raw pointers: 原始指针
// - std::cell 中的类型: 单线程内部可变性

use std::rc::Rc;
use std::thread;

fn main() {
    let rc = Rc::new(5);

    // 编译错误!Rc 不是 Send
    // thread::spawn(move || {
    //     println!("{}", rc);
    // });

    // 改用 Arc
    use std::sync::Arc;
    let arc = Arc::new(5);
    thread::spawn(move || {
        println!("{}", arc);
    }).join().unwrap();
}

🤝 11.6.2 Sync:可以在线程间共享引用

Sync trait 表示类型可以安全地被多个线程同时持有不可变引用。

// T: Sync 意味着 &T: Send
// 即:可以发送 &T 到其他线程

// 自动实现 Sync 的类型
// - i32, String, Vec<T>
// - Mutex<T>: 通过内部锁定保证安全
// - Arc<T>: 如果 T: Sync

// 非 Sync 类型
// - Cell<T>, RefCell<T>: 非线程安全的内部可变性
// - Rc<T>

use std::cell::RefCell;
use std::sync::Arc;

fn main() {
    // RefCell 不是 Sync,所以 Arc<RefCell> 不能跨线程
    // let data = Arc::new(RefCell::new(0));

    // 改用 Mutex
    use std::sync::Mutex;
    let data = Arc::new(Mutex::new(0));
    // Mutex<T> 是 Sync(如果 T: Send)
    // 所以 Arc<Mutex<T>> 是 Send + Sync
}

🔐 11.6.3 手动实现 Send/Sync(危险!)

// ⚠️ 极其危险!只有在完全理解后果时才这样做
// unsafe impl Send for MyType {}
// unsafe impl Sync for MyType {}

// Rust 的安全保证来自编译器的自动推导
// 手动实现可能破坏内存安全

第一性原理:Send 和 Sync 是 Rust 并发安全的核心。它们不是运行时检查,而是编译时保证。如果代码编译通过,就意味着不存在数据竞争(data race)。


🌊 11.7 Barrier:同步点

有时我们需要让多个线程在某个点汇合,然后一起继续。

use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
    let n = 3;
    let barrier = Arc::new(Barrier::new(n));
    let mut handles = vec![];

    for i in 0..n {
        let barrier = Arc::clone(&barrier);
        handles.push(thread::spawn(move || {
            println!("线程 {} 到达第一阶段", i);
            barrier.wait();  // 等待所有线程到达

            println!("线程 {} 进入第二阶段", i);
            barrier.wait();  // 再次同步

            println!("线程 {} 完成", i);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

比喻:Barrier 就像登山队的会合点——所有队员必须到齐后才能继续前进。


🧪 11.8 实战:并行数据处理

让我们构建一个并行处理大量数据的实用程序——并行计算目录下所有文件的哈希值。

use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let path = ".";  // 当前目录
    let results = Arc::new(Mutex::new(HashMap::new()));
    let mut handles = vec![];

    // 收集所有文件
    let files: Vec<_> = fs::read_dir(path)
        .unwrap()
        .filter_map(|e| e.ok())
        .filter(|e| e.path().is_file())
        .collect();

    println!("发现 {} 个文件,开始并行处理...", files.len());

    let start = std::time::Instant::now();

    // 每个文件一个线程处理
    for entry in files {
        let results = Arc::clone(&results);
        let file_path = entry.path();

        handles.push(thread::spawn(move || {
            // 模拟耗时操作(实际可用 blake3 或 sha2 计算)
            let content = fs::read(&file_path).unwrap_or_default();

            // 简单的"哈希":文件大小和首字节
            let hash = if !content.is_empty() {
                format!("size:{}:first:{:02x}", content.len(), content[0])
            } else {
                "empty".to_string()
            };

            let file_name = file_path.display().to_string();

            // 安全地写入共享结果
            let mut map = results.lock().unwrap();
            map.insert(file_name, hash);

            println!("处理完成: {}", file_path.display());
        }));
    }

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    let duration = start.elapsed();
    println!("\n处理完成!耗时: {:?}", duration);

    // 显示结果
    let map = results.lock().unwrap();
    for (file, hash) in map.iter() {
        println!("{}: {}", file, hash);
    }
}

🚀 11.8.1 优化:线程池

创建线程有开销,大量小任务应该使用线程池:

use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;

struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel::<Job>();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Self {
        let thread = thread::spawn(move || loop {
            // 获取任务(阻塞)
            let job = receiver.lock().unwrap().recv();

            match job {
                Ok(job) => {
                    println!("Worker {} 执行任务", id);
                    job();
                }
                Err(_) => {
                    println!("Worker {} 退出", id);
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

fn main() {
    let pool = ThreadPool::new(4);

    for i in 0..10 {
        pool.execute(move || {
            println!("任务 {} 开始", i);
            thread::sleep(std::time::Duration::from_millis(100));
            println!("任务 {} 完成", i);
        });
    }

    // 给任务一些时间完成
    thread::sleep(std::time::Duration::from_secs(2));
    println!("主线程结束");
}

生产建议:实际项目中推荐使用成熟的线程池库,如 rayontokio 的运行时。


📝 本章小结

本章我们探索了 Rust 的"无畏并发":

工具用途关键特性
thread::spawn创建线程返回 JoinHandle
mpsc::channel消息传递生产者-消费者模式
Mutex<T>互斥访问同一时间只有一个访问者
RwLock<T>读写分离多读者或单写者
Arc<T>原子引用计数跨线程共享所有权
Barrier同步点等待所有线程汇合

核心概念:

  • Send:类型可以在线程间移动
  • Sync:类型可以在线程间共享引用
  • 无畏并发:编译器帮助防止数据竞争

费曼技巧提问:为什么 Arc<Mutex> 是线程安全的?Rc<RefCell> 不是?提示:考虑 Send 和 Sync trait。


动手实验

  1. 使用 Channel 实现一个简单的任务队列:主线程发送任务,工作线程接收并执行。
  2. 使用 Arc<RwLock<T>> 实现一个线程安全的缓存,支持读写操作。
  3. 比较使用单线程 vs 4 线程处理 1000 个小任务的时间差异(每个任务 sleep 1ms)。
  4. 解释为什么以下代码无法编译,并修复它:

``rust let data = Rc::new(RefCell::new(0)); thread::spawn(move || { *data.borrow_mut() += 1; }); ``

← 返回目录