第11章 Pipeline 管道
本章目标
- 理解 Pipeline 管道的设计理念和应用场景
- 掌握 SequentialPipeline(顺序管道)的使用方法
- 掌握 FanoutPipeline(扇出管道)的使用方法
- 学会使用 Pipelines 工具类快速创建管道
- 了解管道组合与高级用法
11.1 Pipeline 概述
11.1.1 什么是 Pipeline?
Pipeline(管道) 是一种智能体编排模式,用于协调多个智能体按照特定的流程执行任务。它提供了一种声明式的方式来定义智能体之间的执行顺序和数据流向。
┌─────────────────────────────────────────────────────────────┐
│ Pipeline 模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ SequentialPipeline │ │
│ │ (顺序管道) │ │
│ │ │ │
│ │ Input ──▶ Agent1 ──▶ Agent2 ──▶ Agent3 ──▶ Output │ │
│ │ │ │ │ │ │
│ │ 翻译 摘要 情感分析 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ FanoutPipeline │ │
│ │ (扇出管道) │ │
│ │ │ │
│ │ ┌──▶ Agent1 ──┐ │ │
│ │ │ 评审者1 │ │ │
│ │ Input ─────────┼──▶ Agent2 ──┼──▶ [Results] │ │
│ │ │ 评审者2 │ │ │
│ │ └──▶ Agent3 ──┘ │ │
│ │ 评审者3 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
11.1.2 Pipeline 接口
所有管道都实现 Pipeline 接口:
package io.agentscope.core.pipeline;
import io.agentscope.core.message.Msg;
import reactor.core.publisher.Mono;
/**
* Pipeline 基础接口
*
* @param <T> 管道执行结果的类型
*/
public interface Pipeline<T> {
/**
* 执行管道
*
* @param input 输入消息
* @return 包含执行结果的 Mono
*/
Mono<T> execute(Msg input);
/**
* 执行管道(带结构化输出)
*
* @param input 输入消息
* @param structuredOutputClass 结构化输出的类型
* @return 包含执行结果的 Mono
*/
Mono<T> execute(Msg input, Class<?> structuredOutputClass);
/**
* 无输入执行
*/
default Mono<T> execute() {
return execute((Msg) null);
}
/**
* 获取管道描述
*/
default String getDescription() {
return getClass().getSimpleName();
}
}
11.1.3 两种管道类型
| 管道类型 | 类名 | 输入/输出 | 执行模式 | 适用场景 |
|---|---|---|---|---|
| 顺序管道 | SequentialPipeline | 单输入 → 单输出 | 串行链式 | 多步骤处理流程 |
| 扇出管道 | FanoutPipeline | 单输入 → 多输出 | 并行/串行 | 多视角分析、投票 |
11.2 SequentialPipeline 顺序管道
11.2.1 核心概念
顺序管道将多个智能体串联成一条处理链,每个智能体的输出成为下一个智能体的输入:
Input ──▶ Agent1 ──▶ Agent2 ──▶ Agent3 ──▶ Output
│ │ │
output1 output2 output3
│ │ │
└───────────┴───────────┘
每个输出成为下一个的输入
11.2.2 基本用法
方式一:使用 Builder 模式
import io.agentscope.core.ReActAgent;
import io.agentscope.core.pipeline.SequentialPipeline;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
// 创建三个专业智能体
ReActAgent translator = createTranslatorAgent(); // 翻译
ReActAgent summarizer = createSummarizerAgent(); // 摘要
ReActAgent analyzer = createSentimentAnalyzerAgent();// 情感分析
// 使用 Builder 构建顺序管道
SequentialPipeline pipeline = SequentialPipeline.builder()
.addAgent(translator) // 第一步:翻译
.addAgent(summarizer) // 第二步:摘要
.addAgent(analyzer) // 第三步:情感分析
.build();
// 准备输入
Msg input = Msg.builder()
.role(MsgRole.USER)
.content(TextBlock.builder()
.text("Artificial Intelligence is transforming industries...")
.build())
.build();
// 执行管道
Msg result = pipeline.execute(input).block();
// 获取最终结果(情感分析的输出)
System.out.println("最终结果: " + result.getTextContent());
方式二:使用构造函数
import java.util.List;
// 直接传入智能体列表
SequentialPipeline pipeline = new SequentialPipeline(
List.of(translator, summarizer, analyzer)
);
Msg result = pipeline.execute(input).block();
方式三:使用 Pipelines 工具类
import io.agentscope.core.pipeline.Pipelines;
// 静态方法快速执行
Msg result = Pipelines.sequential(
List.of(translator, summarizer, analyzer),
input
).block();
// 或创建可复用的管道
SequentialPipeline reusablePipeline = Pipelines.createSequential(
List.of(translator, summarizer, analyzer)
);
11.2.3 完整示例:内容处理流水线
import io.agentscope.core.ReActAgent;
import io.agentscope.core.formatter.dashscope.DashScopeChatFormatter;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.pipeline.SequentialPipeline;
import io.agentscope.core.tool.Toolkit;
/**
* 内容处理流水线示例
* 演示如何使用 SequentialPipeline 构建多步骤处理流程
*/
public class ContentProcessingPipeline {
public static void main(String[] args) {
String apiKey = System.getenv("DASHSCOPE_API_KEY");
// ========================================
// 步骤 1:创建专业智能体
// ========================================
// 翻译智能体:英译中
ReActAgent translator = ReActAgent.builder()
.name("Translator")
.sysPrompt("""
你是一位专业翻译。请将给定的英文文本准确翻译成中文。
保持原文的语气和风格。
只输出翻译结果,不要添加解释。
""")
.model(createModel(apiKey))
.memory(new InMemoryMemory())
.toolkit(new Toolkit())
.build();
// 摘要智能体:生成摘要
ReActAgent summarizer = ReActAgent.builder()
.name("Summarizer")
.sysPrompt("""
你是一位内容摘要专家。请为给定的文本生成一个简洁的摘要。
摘要应该包含2-3个要点,总长度不超过100字。
保持与输入相同的语言。
只输出摘要内容。
""")
.model(createModel(apiKey))
.memory(new InMemoryMemory())
.toolkit(new Toolkit())
.build();
// 情感分析智能体
ReActAgent sentimentAnalyzer = ReActAgent.builder()
.name("SentimentAnalyzer")
.sysPrompt("""
你是一位情感分析专家。分析给定文本的情感倾向。
请按以下格式输出:
情感: [正面/负面/中性/混合]
理由: [1-2句解释]
内容概要: [重复输入的摘要内容]
""")
.model(createModel(apiKey))
.memory(new InMemoryMemory())
.toolkit(new Toolkit())
.build();
// ========================================
// 步骤 2:构建顺序管道
// ========================================
SequentialPipeline pipeline = SequentialPipeline.builder()
.addAgent(translator) // 第一步
.addAgent(summarizer) // 第二步
.addAgent(sentimentAnalyzer) // 第三步
.build();
System.out.println("管道创建完成: " + pipeline.getDescription());
System.out.println("智能体数量: " + pipeline.size());
// ========================================
// 步骤 3:执行管道
// ========================================
String article = """
Artificial Intelligence has revolutionized the technology industry.
Machine learning algorithms now power everything from recommendation
systems to autonomous vehicles. While AI brings tremendous opportunities,
it also raises important questions about ethics, privacy, and employment.
""";
Msg input = Msg.builder()
.role(MsgRole.USER)
.content(TextBlock.builder().text(article).build())
.build();
System.out.println("\n原文 (英文):");
System.out.println("=".repeat(50));
System.out.println(article);
System.out.println("\n执行管道中...\n");
long startTime = System.currentTimeMillis();
Msg result = pipeline.execute(input).block();
long duration = System.currentTimeMillis() - startTime;
// ========================================
// 步骤 4:输出结果
// ========================================
System.out.println("最终结果:");
System.out.println("=".repeat(50));
System.out.println(result.getTextContent());
System.out.println("\n执行时间: " + duration + "ms");
}
private static DashScopeChatModel createModel(String apiKey) {
return DashScopeChatModel.builder()
.apiKey(apiKey)
.modelName("qwen-plus")
.stream(true)
.enableThinking(false)
.formatter(new DashScopeChatFormatter())
.build();
}
}
11.2.4 结构化输出
顺序管道支持在最后一个智能体使用结构化输出:
// 定义输出结构
public record AnalysisResult(
String sentiment,
String reason,
String summary
) {}
// 执行管道并获取结构化结果
Msg result = pipeline
.execute(input, AnalysisResult.class)
.block();
// 提取结构化数据
AnalysisResult analysis = result.getStructuredData(AnalysisResult.class);
System.out.println("情感: " + analysis.sentiment());
System.out.println("理由: " + analysis.reason());
11.3 FanoutPipeline 扇出管道
11.3.1 核心概念
扇出管道将同一输入分发给多个智能体,并收集所有输出。支持并行和串行两种执行模式:
┌──▶ Agent1 ──┐
│ (视角1) │
Input ─────────────┼──▶ Agent2 ──┼──▶ [Output1, Output2, Output3]
│ (视角2) │
└──▶ Agent3 ──┘
(视角3)
11.3.2 基本用法
并行执行(默认)
import io.agentscope.core.pipeline.FanoutPipeline;
import java.util.List;
// 创建三个评审智能体
ReActAgent reviewer1 = createReviewer("技术专家");
ReActAgent reviewer2 = createReviewer("产品经理");
ReActAgent reviewer3 = createReviewer("用户体验设计师");
// 构建扇出管道(默认并行执行)
FanoutPipeline pipeline = FanoutPipeline.builder()
.addAgent(reviewer1)
.addAgent(reviewer2)
.addAgent(reviewer3)
.concurrent() // 并行执行(默认)
.build();
// 执行管道 - 所有智能体并行处理同一输入
List<Msg> results = pipeline.execute(proposalMsg).block();
// 处理所有评审结果
for (int i = 0; i < results.size(); i++) {
System.out.printf("评审者 %d: %s%n", i + 1, results.get(i).getTextContent());
}
串行执行
// 构建串行扇出管道
FanoutPipeline sequentialPipeline = FanoutPipeline.builder()
.addAgent(reviewer1)
.addAgent(reviewer2)
.addAgent(reviewer3)
.sequential() // 串行执行
.build();
// 执行 - 智能体依次处理(但各自独立处理相同输入)
List<Msg> results = sequentialPipeline.execute(input).block();
使用 Pipelines 工具类
import io.agentscope.core.pipeline.Pipelines;
// 并行执行
List<Msg> parallelResults = Pipelines.fanout(
List.of(reviewer1, reviewer2, reviewer3),
input
).block();
// 串行执行
List<Msg> sequentialResults = Pipelines.fanoutSequential(
List.of(reviewer1, reviewer2, reviewer3),
input
).block();
11.3.3 完整示例:多视角代码审查
import io.agentscope.core.ReActAgent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.pipeline.FanoutPipeline;
import java.util.List;
/**
* 多视角代码审查示例
* 使用 FanoutPipeline 从多个角度并行审查代码
*/
public class MultiPerspectiveCodeReview {
public static void main(String[] args) {
String apiKey = System.getenv("DASHSCOPE_API_KEY");
// ========================================
// 创建多个审查视角的智能体
// ========================================
// 安全专家视角
ReActAgent securityReviewer = createReviewerAgent(apiKey, "SecurityExpert",
"""
你是一位安全专家,专注于代码安全审查。
请检查代码中的潜在安全漏洞,包括:
- SQL 注入
- XSS 攻击
- 敏感信息泄露
- 权限控制问题
输出格式:
[安全评分]: 0-10 分
[发现问题]: 问题列表
[改进建议]: 具体建议
""");
// 性能专家视角
ReActAgent performanceReviewer = createReviewerAgent(apiKey, "PerformanceExpert",
"""
你是一位性能优化专家,专注于代码性能审查。
请检查代码中的性能问题,包括:
- 算法复杂度
- 内存使用
- I/O 效率
- 并发问题
输出格式:
[性能评分]: 0-10 分
[发现问题]: 问题列表
[优化建议]: 具体建议
""");
// 可维护性专家视角
ReActAgent maintainabilityReviewer = createReviewerAgent(apiKey, "MaintainabilityExpert",
"""
你是一位代码质量专家,专注于可维护性审查。
请检查代码的可维护性问题,包括:
- 代码可读性
- 命名规范
- 模块化设计
- 文档注释
输出格式:
[可维护性评分]: 0-10 分
[发现问题]: 问题列表
[改进建议]: 具体建议
""");
// ========================================
// 构建并行扇出管道
// ========================================
FanoutPipeline reviewPipeline = FanoutPipeline.builder()
.addAgent(securityReviewer)
.addAgent(performanceReviewer)
.addAgent(maintainabilityReviewer)
.concurrent() // 并行执行,提高效率
.build();
System.out.println("审查管道: " + reviewPipeline.getDescription());
System.out.println("并行模式: " + reviewPipeline.isConcurrentEnabled());
// ========================================
// 准备待审查的代码
// ========================================
String codeToReview = """
public class UserService {
private Connection conn;
public User findUser(String username) {
String sql = "SELECT * FROM users WHERE name = '" + username + "'";
ResultSet rs = conn.createStatement().executeQuery(sql);
// ... 处理结果
return user;
}
public void updatePassword(String userId, String newPassword) {
String sql = "UPDATE users SET password = '" + newPassword + "' WHERE id = " + userId;
conn.createStatement().executeUpdate(sql);
}
}
""";
Msg codeMsg = Msg.builder()
.role(MsgRole.USER)
.content(TextBlock.builder().text(codeToReview).build())
.build();
System.out.println("\n待审查代码:");
System.out.println("=".repeat(60));
System.out.println(codeToReview);
// ========================================
// 并行执行审查
// ========================================
System.out.println("\n开始并行审查...\n");
long startTime = System.currentTimeMillis();
List<Msg> reviews = reviewPipeline.execute(codeMsg).block();
long duration = System.currentTimeMillis() - startTime;
// ========================================
// 输出所有审查结果
// ========================================
String[] perspectives = {"安全专家", "性能专家", "可维护性专家"};
for (int i = 0; i < reviews.size(); i++) {
System.out.println("=".repeat(60));
System.out.println(perspectives[i] + " 审查意见:");
System.out.println("=".repeat(60));
System.out.println(reviews.get(i).getTextContent());
System.out.println();
}
System.out.println("总执行时间: " + duration + "ms (并行执行)");
System.out.println("如果串行执行,预计需要: " + (duration * 3) + "ms");
}
private static ReActAgent createReviewerAgent(String apiKey, String name, String prompt) {
return ReActAgent.builder()
.name(name)
.sysPrompt(prompt)
.model(DashScopeChatModel.builder()
.apiKey(apiKey)
.modelName("qwen-plus")
.stream(true)
.build())
.memory(new InMemoryMemory())
.toolkit(new Toolkit())
.build();
}
}
11.3.4 自定义调度器
对于并行执行,可以自定义调度器来控制并发行为:
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
// 使用固定大小的线程池
Scheduler customScheduler = Schedulers.newBoundedElastic(
10, // 线程数
100, // 队列大小
"agent-pool"
);
FanoutPipeline pipeline = FanoutPipeline.builder()
.addAgents(List.of(agent1, agent2, agent3, agent4, agent5))
.concurrent()
.scheduler(customScheduler) // 自定义调度器
.build();
11.3.5 错误处理
FanoutPipeline 会收集所有智能体的执行错误,并在执行完成后统一报告:
try {
List<Msg> results = pipeline.execute(input).block();
} catch (CompositeAgentException e) {
// 获取所有失败的智能体信息
for (CompositeAgentException.AgentExceptionInfo info : e.getAgentExceptions()) {
System.err.printf("智能体 %s (ID: %s) 失败: %s%n",
info.getAgentName(),
info.getAgentId(),
info.getThrowable().getMessage());
}
}
11.4 Pipelines 工具类
Pipelines 提供静态方法,用于快速创建和执行管道:
11.4.1 快速执行方法
import io.agentscope.core.pipeline.Pipelines;
// 顺序执行
Msg result = Pipelines.sequential(agents, input).block();
// 并行扇出
List<Msg> results = Pipelines.fanout(agents, input).block();
// 串行扇出
List<Msg> results = Pipelines.fanoutSequential(agents, input).block();
11.4.2 创建可复用管道
// 创建顺序管道
SequentialPipeline seqPipeline = Pipelines.createSequential(agents);
// 创建并行扇出管道
FanoutPipeline parallelPipeline = Pipelines.createFanout(agents);
// 创建串行扇出管道
FanoutPipeline serialPipeline = Pipelines.createFanoutSequential(agents);
11.4.3 管道组合
// 组合两个顺序管道
Pipeline<Msg> composedPipeline = Pipelines.compose(pipeline1, pipeline2);
// 执行组合管道
// 相当于: input → pipeline1 → pipeline2 → output
Msg result = composedPipeline.execute(input).block();
11.5 高级用法
11.5.1 顺序 + 扇出组合
// 场景:先处理输入,再从多个视角分析
// 第一阶段:预处理
ReActAgent preprocessor = createPreprocessorAgent();
// 第二阶段:多视角分析
List<ReActAgent> analyzers = List.of(
createTechnicalAnalyzer(),
createBusinessAnalyzer(),
createUserAnalyzer()
);
// 执行流程
Msg preprocessed = preprocessor.call(input).block();
List<Msg> analyses = Pipelines.fanout(analyzers, preprocessed).block();
11.5.2 扇出 + 汇总
// 场景:多个评审者评审,然后汇总意见
// 多个评审者并行评审
FanoutPipeline reviewPipeline = FanoutPipeline.builder()
.addAgents(reviewers)
.concurrent()
.build();
List<Msg> reviews = reviewPipeline.execute(proposal).block();
// 汇总智能体处理所有评审意见
ReActAgent summarizer = createSummarizerAgent();
// 构建汇总输入:合并所有评审意见
String combinedReviews = reviews.stream()
.map(Msg::getTextContent)
.collect(Collectors.joining("\n\n---\n\n"));
Msg summaryInput = Msg.builder()
.role(MsgRole.USER)
.content(TextBlock.builder().text(combinedReviews).build())
.build();
Msg finalSummary = summarizer.call(summaryInput).block();
11.5.3 条件分支
// 场景:根据条件选择不同的处理管道
// 分类智能体
ReActAgent classifier = createClassifierAgent();
// 不同类型的处理管道
SequentialPipeline technicalPipeline = SequentialPipeline.builder()
.addAgent(techExpert1)
.addAgent(techExpert2)
.build();
SequentialPipeline businessPipeline = SequentialPipeline.builder()
.addAgent(bizExpert1)
.addAgent(bizExpert2)
.build();
// 先分类,再路由到对应管道
Msg classificationResult = classifier.call(input).block();
String category = extractCategory(classificationResult);
Msg finalResult;
if ("technical".equals(category)) {
finalResult = technicalPipeline.execute(input).block();
} else {
finalResult = businessPipeline.execute(input).block();
}
11.6 最佳实践
11.6.1 管道设计建议
| 建议 | 说明 |
|---|---|
| 单一职责 | 每个智能体专注一个任务,便于复用和调试 |
| 清晰的提示词 | 明确指定输入输出格式,减少歧义 |
| 合理的管道长度 | 顺序管道不宜过长(建议 3-5 个智能体) |
| 错误处理 | 考虑中间环节失败的处理策略 |
| 性能优化 | 能并行的尽量并行,减少总执行时间 |
11.6.2 选择合适的管道类型
// 使用 SequentialPipeline 当:
// - 每个步骤依赖前一步的输出
// - 需要渐进式处理数据
// - 有明确的处理顺序
SequentialPipeline processChain = ...;
// 使用 FanoutPipeline 当:
// - 需要从多个视角分析同一数据
// - 任务之间相互独立
// - 需要并行提高效率
FanoutPipeline multiReview = ...;
11.6.3 监控和调试
// 添加日志观察执行过程
import io.agentscope.core.hook.Hook;
import io.agentscope.core.hook.LoggingHook;
// 为每个智能体添加日志 Hook
ReActAgent agentWithLogging = ReActAgent.builder()
.name("MyAgent")
.model(model)
.hooks(List.of(new LoggingHook())) // 添加日志
.build();
// 输出会显示每个智能体的执行过程
11.7 本章小结
本章介绍了 AgentScope-Java 的 Pipeline 管道系统:
- Pipeline 接口:定义管道的标准执行方法
- SequentialPipeline:顺序执行,输出传递给下一个智能体
- FanoutPipeline:扇出执行,支持并行和串行模式
- Pipelines 工具类:快速创建和执行管道的静态方法
- 组合模式:顺序 + 扇出、扇出 + 汇总等高级用法
管道模式是构建复杂多智能体工作流的基础,掌握它能够高效地编排多个智能体协同工作。
练习
- 创建一个顺序管道,实现"翻译 → 扩写 → 校对"的文本处理流程
- 创建一个扇出管道,让三个"评委智能体"从不同角度评价一篇文章
- 组合使用顺序管道和扇出管道,实现"预处理 → 多视角分析 → 汇总"的复合流程
- 比较并行和串行扇出管道的执行时间差异
上一章:第10章-RAG知识检索 | 下一章:第12章-MsgHub消息中心