第十二章:异步编程——async/await与Tokio
本章导读:想象一家咖啡店:如果每个顾客点单后,咖啡师都全程等待(磨豆、萃取、打奶),那一分钟只能服务几个人。但如果咖啡师在等待咖啡萃取时,去接待下一位顾客,效率就会大大提升。异步编程就是这种"等待时做其他事"的艺术。Rust 的 async/await 让你用同步的方式写异步代码,兼具可读性和高性能。
🌊 12.1 为什么需要异步?
🚗 12.1.1 同步 vs 异步
同步(阻塞)模型:一个任务完成后才能开始下一个。
use std::thread;
use std::time::Duration;
fn main() {
let start = std::time::Instant::now();
// 三个任务,每个"阻塞"1秒
fetch_data("用户1");
fetch_data("用户2");
fetch_data("用户3");
println!("总耗时: {:?}", start.elapsed()); // 约3秒
}
fn fetch_data(user: &str) {
println!("开始获取 {} 的数据", user);
thread::sleep(Duration::from_secs(1)); // 模拟网络请求
println!("完成获取 {} 的数据", user);
}
异步(非阻塞)模型:等待时可以处理其他任务。
use std::time::Duration;
#[tokio::main]
async fn main() {
let start = std::time::Instant::now();
// 同时发起三个请求
let h1 = tokio::spawn(fetch_data("用户1"));
let h2 = tokio::spawn(fetch_data("用户2"));
let h3 = tokio::spawn(fetch_data("用户3"));
// 等待所有完成
let _ = tokio::join!(h1, h2, h3);
println!("总耗时: {:?}", start.elapsed()); // 约1秒
}
async fn fetch_data(user: &str) {
println!("开始获取 {} 的数据", user);
tokio::time::sleep(Duration::from_secs(1)).await; // 非阻塞等待
println!("完成获取 {} 的数据", user);
}
📊 12.1.2 线程 vs 异步
| 特性 | 多线程 | 异步 |
|---|---|---|
| 内存开销 | 每线程约2MB栈空间 | 每任务约几KB |
| 切换成本 | 操作系统调度,开销大 | 用户态调度,开销小 |
| 适用场景 | CPU密集型 | I/O密集型 |
| 代码复杂度 | 需要同步原语 | async/await 较直观 |
比喻:线程是雇佣多个服务员,每个都需要工资(内存);异步是一个聪明的服务员,在等待咖啡萃取时接待其他顾客。
⚡ 12.2 Future:异步的核心
Rust 的异步基于 Future trait。
🔮 12.2.1 Future Trait
// 简化的 Future 定义
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // 任务完成
Pending, // 任务进行中,稍后再问
}
Future 是惰性的——只有被 await 或轮询时才会执行。
use std::future::Future;
// async fn 返回 Future
async fn say_hello() {
println!("Hello!");
}
fn main() {
// 仅仅调用不会执行任何代码
let future = say_hello();
// 需要 .await 或执行器来运行
// future.await; // 需要在 async 上下文中
}
⏳ 12.2.2 async/.await 语法
async fn learn_song() -> String {
println!("学习歌曲...");
"一首好歌".to_string()
}
async fn sing_song(song: String) {
println!("演唱: {}", song);
}
async fn dance() {
println!("跳舞...");
}
// async 函数返回 Future
async fn perform() {
// .await 会暂停当前函数,让出控制权
let song = learn_song().await; // 等待学习完成
sing_song(song).await; // 等待演唱完成
dance().await; // 等待舞蹈完成
}
// 并行执行
async fn perform_parallel() {
// 学习和跳舞可以同时进行
let learn = learn_song();
let dance_future = dance();
// futures::join! 或 tokio::join! 同时等待多个
let (song, _) = tokio::join!(learn, dance_future);
sing_song(song).await;
}
🏃 12.3 Tokio:最流行的异步运行时
Tokio 是 Rust 生态中最成熟的异步运行时,提供:
- 异步 I/O(网络、文件)
- 定时器
- 任务调度
- 同步原语
📦 12.3.1 添加依赖
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
🚀 12.3.2 Tokio 入门
// 方式1:使用 #[tokio::main] 宏
#[tokio::main]
async fn main() {
println!("Hello from async!");
}
// 方式2:手动创建运行时
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("Hello from async!");
});
}
🌐 12.3.3 异步网络编程
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 绑定端口
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器启动在 127.0.0.1:8080");
loop {
// 接受连接(异步)
let (mut socket, addr) = listener.accept().await?;
println!("新连接: {}", addr);
// 为每个连接创建任务
tokio::spawn(async move {
let mut buf = [0; 1024];
// 读取数据(异步)
match socket.read(&mut buf).await {
Ok(n) if n > 0 => {
// 回显数据
socket.write_all(&buf[..n]).await.unwrap();
}
_ => {}
}
});
}
}
📡 12.3.4 异步 HTTP 客户端
// 添加依赖: reqwest = { version = "0.11", features = ["json"] }
use reqwest;
use serde::Deserialize;
#[derive(Deserialize, Debug)]
struct User {
id: u32,
name: String,
email: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 异步 HTTP 请求
let response = reqwest::get("https://jsonplaceholder.typicode.com/users/1")
.await?
.json::<User>()
.await?;
println!("用户信息: {:?}", response);
Ok(())
}
🔄 12.4 并发执行多个 Future
🤝 12.4.1 join!:等待所有完成
use tokio::time::{sleep, Duration};
async fn task(name: &str, secs: u64) -> String {
println!("{} 开始", name);
sleep(Duration::from_secs(secs)).await;
println!("{} 完成", name);
format!("{} 的结果", name)
}
#[tokio::main]
async fn main() {
let start = std::time::Instant::now();
// 同时执行,等待所有完成
let (r1, r2, r3) = tokio::join!(
task("任务1", 2),
task("任务2", 1),
task("任务3", 3),
);
println!("结果: {}, {}, {}", r1, r2, r3);
println!("总耗时: {:?}", start.elapsed()); // 约3秒(最长的那个)
}
🏆 12.4.2 select!:等待任一完成
use tokio::time::{sleep, Duration, timeout};
async fn fast_task() -> &'static str {
sleep(Duration::from_millis(100)).await;
"快速任务完成"
}
async fn slow_task() -> &'static str {
sleep(Duration::from_secs(10)).await;
"慢速任务完成"
}
#[tokio::main]
async fn main() {
// select! 等待第一个完成的分支
let result = tokio::select! {
r = fast_task() => r,
r = slow_task() => r,
};
println!("结果: {}", result); // "快速任务完成"
// 带超时
let result = timeout(Duration::from_millis(200), slow_task()).await;
match result {
Ok(msg) => println!("完成: {}", msg),
Err(_) => println!("超时!"),
}
}
🌊 12.4.3 流(Stream)处理
use tokio_stream::{Stream, StreamExt};
use futures::stream;
#[tokio::main]
async fn main() {
// 创建流
let numbers = stream::iter(1..=5);
// 异步处理每个元素
numbers
.then(|n| async move {
println!("处理 {}", n);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
n * 2
})
.collect::<Vec<_>>()
.await;
// 并发处理(最多3个同时)
use futures::stream;
let results: Vec<_> = stream::iter(1..=10)
.map(|n| async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
n * 2
})
.buffer_unordered(3) // 最多3个并发
.collect()
.await;
println!("结果: {:?}", results);
}
🔧 12.5 异步同步原语
Tokio 提供异步版本的同步原语。
🔒 12.5.1 tokio::sync::Mutex
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let data = Arc::clone(&data);
handles.push(tokio::spawn(async move {
// 异步获取锁(不阻塞线程)
let mut num = data.lock().await;
*num += 1;
}));
}
for handle in handles {
handle.await.unwrap();
}
println!("结果: {}", *data.lock().await); // 10
}
何时用 tokio::sync::Mutex? 当锁的持有需要
.await时。如果锁内没有异步操作,std::sync::Mutex性能更好。
📨 12.5.2 tokio::sync::mpsc
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 创建有界通道(容量32)
let (tx, mut rx) = mpsc::channel(32);
// 生产者
tokio::spawn(async move {
for i in 0..10 {
// 异步发送
if let Err(_) = tx.send(i).await {
println!("接收端已关闭");
}
}
});
// 消费者
while let Some(msg) = rx.recv().await {
println!("收到: {}", msg);
}
}
🔖 12.5.3 tokio::sync::Semaphore
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
// 限制并发数为3
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for i in 0..10 {
let permit = semaphore.clone();
handles.push(tokio::spawn(async move {
// 获取许可(如果3个都在用,会等待)
let _permit = permit.acquire().await.unwrap();
println!("任务 {} 开始", i);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("任务 {} 完成", i);
// _permit 离开作用域时自动释放
}));
}
for handle in handles {
handle.await.unwrap();
}
}
🧪 12.6 实战:构建异步爬虫
让我们构建一个并发的网页爬虫,展示异步编程的实际应用:
use reqwest;
use scraper::{Html, Selector};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration};
// 简化的爬虫结构
struct Crawler {
client: reqwest::Client,
visited: Arc<Mutex<HashSet<String>>>,
max_depth: usize,
}
impl Crawler {
fn new(max_depth: usize) -> Self {
Self {
client: reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.user_agent("RustCrawler/1.0")
.build()
.unwrap(),
visited: Arc::new(Mutex::new(HashSet::new())),
max_depth,
}
}
async fn crawl(&self, url: &str, depth: usize) {
// 检查深度
if depth > self.max_depth {
return;
}
// 检查是否已访问
{
let mut visited = self.visited.lock().await;
if visited.contains(url) {
return;
}
visited.insert(url.to_string());
}
println!("爬取 [深度 {}]: {}", depth, url);
// 获取页面
let body = match self.fetch_page(url).await {
Ok(body) => body,
Err(e) => {
println!("错误: {} - {}", url, e);
return;
}
};
// 解析并打印标题
if let Some(title) = self.extract_title(&body) {
println!(" 标题: {}", title);
}
// 提取链接(这里简化处理)
let links = self.extract_links(&body, url);
// 递归爬取(限制并发数)
let semaphore = Arc::new(tokio::sync::Semaphore::new(5));
let mut tasks = vec![];
for link in links.iter().take(5) { // 限制每页最多爬5个链接
let permit = semaphore.clone();
let crawler = Crawler::new(self.max_depth);
let link = link.clone();
tasks.push(tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
crawler.crawl(&link, depth + 1).await;
}));
}
// 等待所有子任务
for task in tasks {
let _ = task.await;
}
}
async fn fetch_page(&self, url: &str) -> Result<String, Box<dyn std::error::Error>> {
let response = timeout(Duration::from_secs(10), self.client.get(url).send()).await??;
let body = response.text().await?;
Ok(body)
}
fn extract_title(&self, html: &str) -> Option<String> {
let doc = Html::parse_document(html);
let selector = Selector::parse("title").ok()?;
doc.select(&selector)
.next()
.map(|el| el.text().collect::<String>())
}
fn extract_links(&self, _html: &str, _base: &str) -> Vec<String> {
// 简化:实际应解析 HTML 提取 <a href>
vec![]
}
}
#[tokio::main]
async fn main() {
let crawler = Crawler::new(2); // 最大深度2
// 添加依赖到 Cargo.toml:
// [dependencies]
// tokio = { version = "1", features = ["full"] }
// reqwest = "0.11"
// scraper = "0.17"
// 示例:爬取单个页面
// crawler.crawl("https://example.com", 0).await;
println!("爬虫演示完成!");
}
📊 12.7 其他异步运行时
🌙 12.7.1 async-std
async-std 重新实现了标准库的异步版本:
// async-std 版本
#[async_std::main]
async fn main() -> std::io::Result<()> {
let listener = async_std::net::TcpListener::bind("127.0.0.1:8080").await?;
println!("服务器启动");
loop {
let (stream, addr) = listener.accept().await?;
println!("连接: {}", addr);
}
}
🔧 12.7.2 smol
smol 是一个轻量级的异步运行时:
use smol::{Async, Timer};
use std::time::Duration;
fn main() -> std::io::Result<()> {
smol::block_on(async {
println!("等待1秒...");
Timer::after(Duration::from_secs(1)).await;
println!("完成!");
});
Ok(())
}
选择指南:
- Tokio:生态最成熟,适合大多数项目
- async-std:API 接近标准库,迁移成本低
- smol:追求轻量、简单
📝 本章小结
本章我们学习了 Rust 的异步编程:
| 概念 | 说明 |
|---|---|
Future | 表示异步计算,惰性执行 |
async fn | 定义异步函数,返回 Future |
.await | 等待 Future 完成,让出控制权 |
tokio::spawn | 创建异步任务 |
join! | 等待多个 Future 全部完成 |
select! | 等待多个 Future 任一完成 |
核心要点:
- 异步适合 I/O 密集型任务
- Future 是惰性的,必须被 await 或执行
- Tokio 是最流行的异步运行时
- 异步代码看起来像同步,但执行方式完全不同
费曼技巧提问:为什么异步代码可以"同时"处理多个任务,而实际上只用一个线程?提示:想想"让出控制权"意味着什么。
动手实验:
- 使用
tokio::join!同时发起 5 个 HTTP 请求,测量总耗时,与顺序请求对比。- 使用
select!实现一个带超时的异步操作:如果 2 秒内未完成则返回超时错误。- 使用
tokio::sync::mpsc实现一个生产者-消费者系统:生产者每秒发送一个数字,消费者打印它。- 解释为什么以下代码会死锁,如何修复:
``
rust let mutex = std::sync::Mutex::new(0); async fn bad() { let guard = mutex.lock().unwrap(); somethingasync().await; // 持有锁时 await }``