第7章:Bitswap 协议

第7章:Bitswap 协议

7.1 Bitswap 的作用

Bitswap 是 IPFS 的核心数据交换协议,负责在对等节点(peer)之间高效、可靠地交换内容寻址的数据块(blocks)。它并非简单的“请求-响应”传输层,而是融合了激励机制、策略决策与网络适应能力的智能分发子系统,是 IPFS 实现去中心化内容分发与协作共享的关键基础设施。

💡 提示:Bitswap 不直接处理文件或目录结构,而是以 CID(Content Identifier)为单位操作底层数据块。文件的组装(如通过 DAG 解析)由上层模块(如 UnixFS)完成,Bitswap 仅确保这些块能被及时、公平地获取。

主要功能

  • 块请求管理:维护并同步本地 want-list(待获取块列表),主动向连接的对等节点广播需求,并响应对方的请求;
  • 信用系统(Ledger-based Accounting):基于“发送即增信、接收即减信”的双向账本模型,量化节点间的贡献与索取关系;
  • 策略引擎:根据信用余额、块稀有度、网络延迟、连接质量等多维因素,动态决定是否响应请求、优先服务谁、以及何时发起反向请求;
  • 网络优化支持:为并行获取、智能路由、预取与缓存提供协议级接口和状态支撑。

⚠️ 注意:Bitswap 的信用不具跨会话持久性(默认不写入磁盘),也不构成经济价值;它是一种轻量级、本地化的声誉信号,旨在抑制搭便车(free-riding),而非构建链上通证体系。

7.2 数据块交换机制

Bitswap 采用基于信用的异步块交换机制,其交互围绕 want-list 展开——每个节点持续广播自己当前所需的数据块 CID 列表,并监听其他节点的 want-list 以发现潜在供给方。

典型请求流程

  1. 节点 A 需要某数据块(由 CID 标识);
  2. 节点 A 将该 CID 加入本地 want-list,并通过 Bitswap 协议消息广播给所有活跃连接的对等节点;
  3. 节点 B 收到广播后,检查本地存储(Blockstore)是否持有该块;若存在且策略允许,则准备响应;
  4. 节点 B 向节点 A 发送包含该块数据的响应消息;
  5. 节点 A 成功接收后,更新本地信用账本(对节点 B 增加“欠款”,即记录自身需偿还的信用额度)。

💡 提示want-list 是动态的、增量更新的。节点不会重复广播已满足的请求,也不会因单次失败而终止重试——Bitswap 内置退避重传与多源并发逻辑。

代码示例(Node.js,基于 ipfs-core v0.19+ 接口):

// 注意:现代 IPFS 实现中,Bitswap 已深度集成于 ipfs-core,默认启用
// 无需手动实例化;以下为概念性示意,反映底层语义
const { create } = require('ipfs-core');

async function example() {
  const node = await create({
    // 配置略:含 libp2p、blockstore 等依赖注入
  });

  // 获取单个数据块(CID)
  async function getBlock(cid) {
    const block = await node.blocks.get(cid);
    return block.data; // Buffer
  }

  // 提供(宣告持有)一个数据块
  async function provideBlock(block) {
    await node.blocks.put(block); // 写入本地 Blockstore
    // 注:宣告“可提供”还需通过 DHT announce(见 7.5),Bitswap 自动响应已知持有者
  }
}

⚠️ 注意node.blocks.get() 在内部触发 Bitswap 流程,但开发者通常无需直接调用 Bitswap 类。IPFS 高阶 API(如 cat, get, ls)均透明复用 Bitswap,应优先使用它们。

7.3 信用系统与策略引擎

Bitswap 的信用系统本质是一个本地、无共识、非加密签名的双向账本,用于在每次块交换后实时调整节点间的服务优先级。

信用计算模型(简化示意):

class CreditManager {
  constructor() {
    // Map<PeerId, { sent: number, received: number, balance: number }>
    this.ledgers = new Map();
  }

  // 节点 A 向 B 发送大小为 blockSize 的块 → A 的 ledger[B].sent += blockSize
  recordSent(peerId, blockSize) {
    const ledger = this.getOrCreateLedger(peerId);
    ledger.sent += blockSize;
    ledger.balance = ledger.sent - ledger.received;
  }

  // 节点 A 从 B 接收大小为 blockSize 的块 → A 的 ledger[B].received += blockSize
  recordReceived(peerId, blockSize) {
    const ledger = this.getOrCreateLedger(peerId);
    ledger.received += blockSize;
    ledger.balance = ledger.sent - ledger.received;
  }

  getBalance(peerId) {
    return this.ledgers.get(peerId)?.balance ?? 0;
  }

  getOrCreateLedger(peerId) {
    if (!this.ledgers.has(peerId)) {
      this.ledgers.set(peerId, { sent: 0, received: 0, balance: 0 });
    }
    return this.ledgers.get(peerId);
  }
}

💡 提示:信用余额为负值(balance < 0)表示该节点“净索取”,此时其后续请求可能被降权;正值(balance > 0)表示“净贡献”,更易获得优先服务。Bitswap 默认不拒绝负余额请求,但策略引擎可据此干预。

策略引擎核心逻辑(决定是否响应 want 请求):

class BitswapStrategy {
  constructor(creditManager, blockstore, dht) {
    this.creditManager = creditManager;
    this.blockstore = blockstore;
    this.dht = dht;
  }

  // 入口:判断是否向 peerId 发送指定 block
  shouldSendBlock(peerId, block, wantList) {
    const balance = this.creditManager.getBalance(peerId);
    const blockSize = block.data.length;

    // ✅ 信用充足:直接服务(正向激励)
    if (balance >= blockSize) {
      return true;
    }

    // ⚠️ 信用不足但块稀有:按概率试探性服务(维持网络连通性)
    if (this.isRareBlock(block.cid)) {
      // 稀有度可基于 DHT 查找结果(找到的提供者数 < 阈值)
      return Math.random() < 0.25; // 保守设为 25%,避免过度透支
    }

    // ❌ 普通块且信用不足:暂不响应,等待对方贡献
    return false;
  }

  isRareBlock(cid) {
    // 实际实现:查询 DHT 获取该 CID 的提供者数量(近似值)
    // 此处简化为伪代码
    return this.dht.getProviders(cid).length < 3;
  }
}

⚠️ 注意:策略引擎高度可插拔。IPFS 允许运行时替换策略(如 bitswap: 'basic' | 'adaptive'),生产环境推荐使用自适应策略(adaptive),它还会结合 RTT、带宽、连接稳定性等指标。

7.4 性能优化策略

Bitswap 在协议设计层面原生支持多种性能增强机制,开发者可通过配置或扩展方式启用:

并行块获取(提升吞吐):

// ✅ 推荐:利用 IPFS 内置并行能力(自动调度 Bitswap 请求)
async function getBlocksInParallel(cids) {
  const blocks = await Promise.all(
    cids.map(cid => node.blocks.get(cid))
  );
  return blocks;
}

// ⚠️ 注意:过度并发可能压垮对等节点或本地资源
// 建议配合限流(如 p-limit)或使用 node.cat()(自动流控)

智能路由选择(降低延迟):

class SmartRouter {
  constructor(creditManager, pingService) {
    this.creditManager = creditManager;
    this.pingService = pingService; // 提供 RTT 估算
  }

  // 为给定 CID 选取最优的 3 个候选提供者
  async selectBestPeers(cid, candidatePeers) {
    // 步骤1:过滤已知持有者(通过 DHT 或本地缓存)
    const providers = await this.getProviders(cid);
    const validPeers = candidatePeers.filter(p => providers.includes(p));

    // 步骤2:按综合得分排序(信用权重 60% + 延迟倒数权重 40%)
    return validPeers
      .map(peer => ({
        peer,
        score:
          0.6 * Math.max(0, this.creditManager.getBalance(peer)) +
          0.4 * (1 / Math.max(10, this.pingService.rtt(peer))) // ms → 归一化
      }))
      .sort((a, b) => b.score - a.score)
      .slice(0, 3)
      .map(item => item.peer);
  }
}

💡 提示getProviders(cid) 应优先查本地 DHT 缓存,再触发 DHT 查询;频繁调用需考虑缓存 TTL 与一致性。

预取策略(提升用户体验):

class PrefetchManager {
  constructor(node, accessPatternAnalyzer) {
    this.node = node;
    this.analyzer = accessPatternAnalyzer;
    this.activePrefetches = new Set(); // 防止重复预取
  }

  // 基于访问上下文(如 DAG 路径、用户行为日志)推测关联块
  async prefetchRelatedBlocks(rootCid) {
    const relatedCIDs = await this.analyzer.suggestRelated(rootCid);

    for (const cid of relatedCIDs) {
      if (this.activePrefetches.has(cid.toString())) continue;

      // 异步启动,不阻塞主流程
      this.activePrefetches.add(cid.toString());
      this.prefetchSingleBlock(cid).catch(() => {});
    }
  }

  async prefetchSingleBlock(cid) {
    try {
      // 仅当本地未缓存时才发起 Bitswap 请求
      if (!(await this.node.blocks.has(cid))) {
        await this.node.blocks.get(cid, { timeout: 5000 });
      }
    } finally {
      this.activePrefetches.delete(cid.toString());
    }
  }
}

⚠️ 注意:预取需谨慎——错误预测会浪费带宽与存储。建议结合实际场景(如视频流分片、文档章节跳转)设计分析器,并设置全局并发上限(如 maxConcurrentPrefetch: 2)。

7.5 实际应用示例

以下是一个端到端的、符合当前 IPFS 最佳实践的 Bitswap 应用示例(基于 ipfs-core v0.19+ 和标准配置):

const { create } = require('ipfs-core');

async function createIPFSNode() {
  return create({
    repo: `ipfs-${Math.random().toString(16).slice(2)}`, // 临时仓库
    config: {
      Addresses: {
        Swarm: ['/ip4/0.0.0.0/tcp/0'], // 自动分配端口
        API: '/ip4/127.0.0.1/tcp/0',
        Gateway: '/ip4/127.0.0.1/tcp/0'
      },
      Bootstrap: [
        '/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ'
      ]
    },
    // 启用必要模块(Bitswap 默认启用)
    EXPERIMENTAL: {
      ipnsPubsub: true
    }
  });
}

async function shareFile() {
  const node = await createIPFSNode();

  // 添加文本文件(生成 CID 并写入 Blockstore)
  const { cid } = await node.add({
    path: 'hello.txt',
    content: Uint8Array.from('Hello IPFS World!'.split('').map(c => c.charCodeAt(0)))
  });

  console.log('✅ 文件 CID:', cid.toString());

  // 显式固定(pin)以防止垃圾回收
  await node.pin.add(cid);

  // 通过 DHT 宣告:「我是该 CID 的提供者」→ 其他节点可通过 DHT 找到你
  await node.dht.provide(cid);
  console.log('✅ 已向 DHT 宣告提供该文件');

  // 可选:打印监听地址,便于调试
  console.log('📡 节点地址:', (await node.swarm.addrs()).map(a => a.toString()));

  return cid;
}

async function downloadFile(cid) {
  const node = await createIPFSNode();

  // 使用 cat 流式获取(自动触发 Bitswap,支持大文件)
  const chunks = [];
  for await (const chunk of node.cat(cid)) {
    chunks.push(chunk);
  }

  const data = new Uint8Array(
    chunks.reduce((acc, curr) => [...acc, ...curr], [])
  );
  console.log('📥 下载内容:', new TextDecoder().decode(data));

  return data;
}

// 主流程
async function main() {
  console.log('🚀 启动分享节点...');
  const cid = await shareFile();

  console.log('\n⬇️  启动下载节点...');
  await downloadFile(cid);

  console.log('\n✅ 示例执行完毕。');
  process.exit(0);
}

main().catch(err => {
  console.error('❌ 执行出错:', err);
  process.exit(1);
});

💡 提示:此示例展示了 Bitswap 如何在真实场景中“隐身工作”——开发者调用 node.cat()node.add() 时,底层自动协调 Bitswap 请求、信用更新、多源并发与错误恢复。理解 Bitswap 的机制有助于诊断网络延迟、连接失败或数据不可达等问题。

⚠️ 注意:生产部署中,应使用持久化仓库(repo: './my-ipfs-repo')、配置合理超时与重试策略,并监控 Bitswap 指标(如 ipfs stats bitswap)。


第三部分:技术原理深度解析

===

← 返回目录