第十二章:异步编程——async/await与Tokio

第十二章:异步编程——async/await与Tokio

本章导读:想象一家咖啡店:如果每个顾客点单后,咖啡师都全程等待(磨豆、萃取、打奶),那一分钟只能服务几个人。但如果咖啡师在等待咖啡萃取时,去接待下一位顾客,效率就会大大提升。异步编程就是这种"等待时做其他事"的艺术。Rust 的 async/await 让你用同步的方式写异步代码,兼具可读性和高性能。


🌊 12.1 为什么需要异步?

🚗 12.1.1 同步 vs 异步

同步(阻塞)模型:一个任务完成后才能开始下一个。

use std::thread;
use std::time::Duration;

fn main() {
    let start = std::time::Instant::now();

    // 三个任务,每个"阻塞"1秒
    fetch_data("用户1");
    fetch_data("用户2");
    fetch_data("用户3");

    println!("总耗时: {:?}", start.elapsed());  // 约3秒
}

fn fetch_data(user: &str) {
    println!("开始获取 {} 的数据", user);
    thread::sleep(Duration::from_secs(1));  // 模拟网络请求
    println!("完成获取 {} 的数据", user);
}

异步(非阻塞)模型:等待时可以处理其他任务。

use std::time::Duration;

#[tokio::main]
async fn main() {
    let start = std::time::Instant::now();

    // 同时发起三个请求
    let h1 = tokio::spawn(fetch_data("用户1"));
    let h2 = tokio::spawn(fetch_data("用户2"));
    let h3 = tokio::spawn(fetch_data("用户3"));

    // 等待所有完成
    let _ = tokio::join!(h1, h2, h3);

    println!("总耗时: {:?}", start.elapsed());  // 约1秒
}

async fn fetch_data(user: &str) {
    println!("开始获取 {} 的数据", user);
    tokio::time::sleep(Duration::from_secs(1)).await;  // 非阻塞等待
    println!("完成获取 {} 的数据", user);
}

📊 12.1.2 线程 vs 异步

特性多线程异步
内存开销每线程约2MB栈空间每任务约几KB
切换成本操作系统调度,开销大用户态调度,开销小
适用场景CPU密集型I/O密集型
代码复杂度需要同步原语async/await 较直观

比喻:线程是雇佣多个服务员,每个都需要工资(内存);异步是一个聪明的服务员,在等待咖啡萃取时接待其他顾客。


⚡ 12.2 Future:异步的核心

Rust 的异步基于 Future trait。

🔮 12.2.1 Future Trait

// 简化的 Future 定义
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),    // 任务完成
    Pending,     // 任务进行中,稍后再问
}

Future 是惰性的——只有被 await 或轮询时才会执行。

use std::future::Future;

// async fn 返回 Future
async fn say_hello() {
    println!("Hello!");
}

fn main() {
    // 仅仅调用不会执行任何代码
    let future = say_hello();

    // 需要 .await 或执行器来运行
    // future.await;  // 需要在 async 上下文中
}

⏳ 12.2.2 async/.await 语法

async fn learn_song() -> String {
    println!("学习歌曲...");
    "一首好歌".to_string()
}

async fn sing_song(song: String) {
    println!("演唱: {}", song);
}

async fn dance() {
    println!("跳舞...");
}

// async 函数返回 Future
async fn perform() {
    // .await 会暂停当前函数,让出控制权
    let song = learn_song().await;  // 等待学习完成
    sing_song(song).await;          // 等待演唱完成
    dance().await;                  // 等待舞蹈完成
}

// 并行执行
async fn perform_parallel() {
    // 学习和跳舞可以同时进行
    let learn = learn_song();
    let dance_future = dance();

    // futures::join! 或 tokio::join! 同时等待多个
    let (song, _) = tokio::join!(learn, dance_future);
    sing_song(song).await;
}

🏃 12.3 Tokio:最流行的异步运行时

Tokio 是 Rust 生态中最成熟的异步运行时,提供:

  • 异步 I/O(网络、文件)
  • 定时器
  • 任务调度
  • 同步原语

📦 12.3.1 添加依赖

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }

🚀 12.3.2 Tokio 入门

// 方式1:使用 #[tokio::main] 宏
#[tokio::main]
async fn main() {
    println!("Hello from async!");
}

// 方式2:手动创建运行时
fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("Hello from async!");
    });
}

🌐 12.3.3 异步网络编程

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 绑定端口
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("服务器启动在 127.0.0.1:8080");

    loop {
        // 接受连接(异步)
        let (mut socket, addr) = listener.accept().await?;
        println!("新连接: {}", addr);

        // 为每个连接创建任务
        tokio::spawn(async move {
            let mut buf = [0; 1024];

            // 读取数据(异步)
            match socket.read(&mut buf).await {
                Ok(n) if n > 0 => {
                    // 回显数据
                    socket.write_all(&buf[..n]).await.unwrap();
                }
                _ => {}
            }
        });
    }
}

📡 12.3.4 异步 HTTP 客户端

// 添加依赖: reqwest = { version = "0.11", features = ["json"] }

use reqwest;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct User {
    id: u32,
    name: String,
    email: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 异步 HTTP 请求
    let response = reqwest::get("https://jsonplaceholder.typicode.com/users/1")
        .await?
        .json::<User>()
        .await?;

    println!("用户信息: {:?}", response);
    Ok(())
}

🔄 12.4 并发执行多个 Future

🤝 12.4.1 join!:等待所有完成

use tokio::time::{sleep, Duration};

async fn task(name: &str, secs: u64) -> String {
    println!("{} 开始", name);
    sleep(Duration::from_secs(secs)).await;
    println!("{} 完成", name);
    format!("{} 的结果", name)
}

#[tokio::main]
async fn main() {
    let start = std::time::Instant::now();

    // 同时执行,等待所有完成
    let (r1, r2, r3) = tokio::join!(
        task("任务1", 2),
        task("任务2", 1),
        task("任务3", 3),
    );

    println!("结果: {}, {}, {}", r1, r2, r3);
    println!("总耗时: {:?}", start.elapsed());  // 约3秒(最长的那个)
}

🏆 12.4.2 select!:等待任一完成

use tokio::time::{sleep, Duration, timeout};

async fn fast_task() -> &'static str {
    sleep(Duration::from_millis(100)).await;
    "快速任务完成"
}

async fn slow_task() -> &'static str {
    sleep(Duration::from_secs(10)).await;
    "慢速任务完成"
}

#[tokio::main]
async fn main() {
    // select! 等待第一个完成的分支
    let result = tokio::select! {
        r = fast_task() => r,
        r = slow_task() => r,
    };

    println!("结果: {}", result);  // "快速任务完成"

    // 带超时
    let result = timeout(Duration::from_millis(200), slow_task()).await;
    match result {
        Ok(msg) => println!("完成: {}", msg),
        Err(_) => println!("超时!"),
    }
}

🌊 12.4.3 流(Stream)处理

use tokio_stream::{Stream, StreamExt};
use futures::stream;

#[tokio::main]
async fn main() {
    // 创建流
    let numbers = stream::iter(1..=5);

    // 异步处理每个元素
    numbers
        .then(|n| async move {
            println!("处理 {}", n);
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            n * 2
        })
        .collect::<Vec<_>>()
        .await;

    // 并发处理(最多3个同时)
    use futures::stream;
    let results: Vec<_> = stream::iter(1..=10)
        .map(|n| async move {
            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
            n * 2
        })
        .buffer_unordered(3)  // 最多3个并发
        .collect()
        .await;

    println!("结果: {:?}", results);
}

🔧 12.5 异步同步原语

Tokio 提供异步版本的同步原语。

🔒 12.5.1 tokio::sync::Mutex

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let data = Arc::clone(&data);
        handles.push(tokio::spawn(async move {
            // 异步获取锁(不阻塞线程)
            let mut num = data.lock().await;
            *num += 1;
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("结果: {}", *data.lock().await);  // 10
}

何时用 tokio::sync::Mutex? 当锁的持有需要 .await 时。如果锁内没有异步操作,std::sync::Mutex 性能更好。

📨 12.5.2 tokio::sync::mpsc

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建有界通道(容量32)
    let (tx, mut rx) = mpsc::channel(32);

    // 生产者
    tokio::spawn(async move {
        for i in 0..10 {
            // 异步发送
            if let Err(_) = tx.send(i).await {
                println!("接收端已关闭");
            }
        }
    });

    // 消费者
    while let Some(msg) = rx.recv().await {
        println!("收到: {}", msg);
    }
}

🔖 12.5.3 tokio::sync::Semaphore

use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    // 限制并发数为3
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for i in 0..10 {
        let permit = semaphore.clone();
        handles.push(tokio::spawn(async move {
            // 获取许可(如果3个都在用,会等待)
            let _permit = permit.acquire().await.unwrap();

            println!("任务 {} 开始", i);
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            println!("任务 {} 完成", i);

            // _permit 离开作用域时自动释放
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

🧪 12.6 实战:构建异步爬虫

让我们构建一个并发的网页爬虫,展示异步编程的实际应用:

use reqwest;
use scraper::{Html, Selector};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration};

// 简化的爬虫结构
struct Crawler {
    client: reqwest::Client,
    visited: Arc<Mutex<HashSet<String>>>,
    max_depth: usize,
}

impl Crawler {
    fn new(max_depth: usize) -> Self {
        Self {
            client: reqwest::Client::builder()
                .timeout(Duration::from_secs(10))
                .user_agent("RustCrawler/1.0")
                .build()
                .unwrap(),
            visited: Arc::new(Mutex::new(HashSet::new())),
            max_depth,
        }
    }

    async fn crawl(&self, url: &str, depth: usize) {
        // 检查深度
        if depth > self.max_depth {
            return;
        }

        // 检查是否已访问
        {
            let mut visited = self.visited.lock().await;
            if visited.contains(url) {
                return;
            }
            visited.insert(url.to_string());
        }

        println!("爬取 [深度 {}]: {}", depth, url);

        // 获取页面
        let body = match self.fetch_page(url).await {
            Ok(body) => body,
            Err(e) => {
                println!("错误: {} - {}", url, e);
                return;
            }
        };

        // 解析并打印标题
        if let Some(title) = self.extract_title(&body) {
            println!("  标题: {}", title);
        }

        // 提取链接(这里简化处理)
        let links = self.extract_links(&body, url);

        // 递归爬取(限制并发数)
        let semaphore = Arc::new(tokio::sync::Semaphore::new(5));
        let mut tasks = vec![];

        for link in links.iter().take(5) {  // 限制每页最多爬5个链接
            let permit = semaphore.clone();
            let crawler = Crawler::new(self.max_depth);
            let link = link.clone();

            tasks.push(tokio::spawn(async move {
                let _permit = permit.acquire().await.unwrap();
                crawler.crawl(&link, depth + 1).await;
            }));
        }

        // 等待所有子任务
        for task in tasks {
            let _ = task.await;
        }
    }

    async fn fetch_page(&self, url: &str) -> Result<String, Box<dyn std::error::Error>> {
        let response = timeout(Duration::from_secs(10), self.client.get(url).send()).await??;
        let body = response.text().await?;
        Ok(body)
    }

    fn extract_title(&self, html: &str) -> Option<String> {
        let doc = Html::parse_document(html);
        let selector = Selector::parse("title").ok()?;
        doc.select(&selector)
            .next()
            .map(|el| el.text().collect::<String>())
    }

    fn extract_links(&self, _html: &str, _base: &str) -> Vec<String> {
        // 简化:实际应解析 HTML 提取 <a href>
        vec![]
    }
}

#[tokio::main]
async fn main() {
    let crawler = Crawler::new(2);  // 最大深度2

    // 添加依赖到 Cargo.toml:
    // [dependencies]
    // tokio = { version = "1", features = ["full"] }
    // reqwest = "0.11"
    // scraper = "0.17"

    // 示例:爬取单个页面
    // crawler.crawl("https://example.com", 0).await;

    println!("爬虫演示完成!");
}

📊 12.7 其他异步运行时

🌙 12.7.1 async-std

async-std 重新实现了标准库的异步版本:

// async-std 版本
#[async_std::main]
async fn main() -> std::io::Result<()> {
    let listener = async_std::net::TcpListener::bind("127.0.0.1:8080").await?;
    println!("服务器启动");

    loop {
        let (stream, addr) = listener.accept().await?;
        println!("连接: {}", addr);
    }
}

🔧 12.7.2 smol

smol 是一个轻量级的异步运行时:

use smol::{Async, Timer};
use std::time::Duration;

fn main() -> std::io::Result<()> {
    smol::block_on(async {
        println!("等待1秒...");
        Timer::after(Duration::from_secs(1)).await;
        println!("完成!");
    });
    Ok(())
}

选择指南

  • Tokio:生态最成熟,适合大多数项目
  • async-std:API 接近标准库,迁移成本低
  • smol:追求轻量、简单

📝 本章小结

本章我们学习了 Rust 的异步编程:

概念说明
Future表示异步计算,惰性执行
async fn定义异步函数,返回 Future
.await等待 Future 完成,让出控制权
tokio::spawn创建异步任务
join!等待多个 Future 全部完成
select!等待多个 Future 任一完成

核心要点:

  • 异步适合 I/O 密集型任务
  • Future 是惰性的,必须被 await 或执行
  • Tokio 是最流行的异步运行时
  • 异步代码看起来像同步,但执行方式完全不同

费曼技巧提问:为什么异步代码可以"同时"处理多个任务,而实际上只用一个线程?提示:想想"让出控制权"意味着什么。


动手实验

  1. 使用 tokio::join! 同时发起 5 个 HTTP 请求,测量总耗时,与顺序请求对比。
  2. 使用 select! 实现一个带超时的异步操作:如果 2 秒内未完成则返回超时错误。
  3. 使用 tokio::sync::mpsc 实现一个生产者-消费者系统:生产者每秒发送一个数字,消费者打印它。
  4. 解释为什么以下代码会死锁,如何修复:

``rust let mutex = std::sync::Mutex::new(0); async fn bad() { let guard = mutex.lock().unwrap(); somethingasync().await; // 持有锁时 await } ``

← 返回目录