第12章 MsgHub 消息中心
本章目标
- 理解 MsgHub 的设计理念和核心功能
- 掌握多智能体消息广播机制
- 学会动态管理参与者
- 了解 MsgHub 与 Pipeline 的区别和适用场景
12.1 MsgHub 概述
12.1.1 什么是 MsgHub?
MsgHub(消息中心) 是一个专为多智能体对话设计的消息广播机制。当多个智能体加入同一个 MsgHub 后,任何智能体的发言都会自动广播给其他所有参与者,无需手动传递消息。
┌─────────────────────────────────────────────────────────────┐
│ MsgHub │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 消息广播中心 │ │
│ │ │ │
│ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │
│ │ │ Alice │◀──▶│ MsgHub│◀──▶│ Bob │ │ │
│ │ └───────┘ └───┬───┘ └───────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────┐ │ │
│ │ │ Carol │ │ │
│ │ └───────┘ │ │
│ │ │ │
│ │ Alice 发言 ──▶ 自动广播给 Bob 和 Carol │ │
│ │ Bob 发言 ──▶ 自动广播给 Alice 和 Carol │ │
│ │ Carol 发言 ──▶ 自动广播给 Alice 和 Bob │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
12.1.2 为什么需要 MsgHub?
手动消息传递(繁琐):
// 没有 MsgHub 时的对话代码 - 需要手动传递消息
Msg aliceReply = alice.call().block();
bob.observe(aliceReply).block();
carol.observe(aliceReply).block();
Msg bobReply = bob.call().block();
alice.observe(bobReply).block();
carol.observe(bobReply).block();
Msg carolReply = carol.call().block();
alice.observe(carolReply).block();
bob.observe(carolReply).block();
使用 MsgHub(简洁):
// 使用 MsgHub - 自动广播消息
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob, carol)
.build()) {
hub.enter().block();
alice.call().block(); // 自动广播给 bob 和 carol
bob.call().block(); // 自动广播给 alice 和 carol
carol.call().block(); // 自动广播给 alice 和 bob
}
12.1.3 核心特性
| 特性 | 说明 |
|---|---|
| 自动广播 | 参与者的消息自动广播给其他所有参与者 |
| 动态参与者 | 支持运行时添加或移除参与者 |
| 公告消息 | 进入时可广播初始公告 |
| 生命周期管理 | 实现 AutoCloseable,支持 try-with-resources |
| 线程安全 | 使用 CopyOnWriteArrayList 管理参与者 |
12.2 基本用法
12.2.1 创建和使用 MsgHub
import io.agentscope.core.ReActAgent;
import io.agentscope.core.pipeline.MsgHub;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
// 创建参与对话的智能体
ReActAgent alice = createAgent("Alice", "你是 Alice,一位友好的讨论者。");
ReActAgent bob = createAgent("Bob", "你是 Bob,一位理性的分析者。");
// 创建公告消息
Msg announcement = Msg.builder()
.name("system")
.role(MsgRole.SYSTEM)
.content(TextBlock.builder()
.text("欢迎参加今天的讨论!话题是:人工智能的未来。")
.build())
.build();
// 使用 try-with-resources 确保资源释放
try (MsgHub hub = MsgHub.builder()
.name("AI讨论室") // 可选:设置 Hub 名称
.participants(alice, bob) // 设置参与者
.announcement(announcement) // 设置进入时的公告
.build()) {
// 进入 Hub(广播公告,建立订阅关系)
hub.enter().block();
// 开始对话 - 消息自动广播
alice.call().block(); // Alice 的回复自动广播给 Bob
bob.call().block(); // Bob 的回复自动广播给 Alice
alice.call().block(); // 继续对话...
} // 自动调用 close(),清理订阅关系
12.2.2 多轮对话
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob, carol)
.announcement(announcement)
.build()) {
hub.enter().block();
// 多轮对话循环
int rounds = 3;
for (int i = 0; i < rounds; i++) {
System.out.println("=== 第 " + (i + 1) + " 轮 ===");
// 每个参与者依次发言
alice.call().block();
bob.call().block();
carol.call().block();
}
}
12.2.3 MultiAgentFormatter
在多智能体对话中,需要使用 MultiAgentFormatter 来正确格式化消息,使 LLM 能够区分不同发言者:
import io.agentscope.core.formatter.dashscope.DashScopeMultiAgentFormatter;
// 创建支持多智能体对话的智能体
ReActAgent alice = ReActAgent.builder()
.name("Alice")
.sysPrompt("你是 Alice,一位热情的讨论参与者。")
.model(DashScopeChatModel.builder()
.apiKey(apiKey)
.modelName("qwen-plus")
.stream(true)
// 关键:使用多智能体格式化器
.formatter(new DashScopeMultiAgentFormatter())
.build())
.memory(new InMemoryMemory())
.build();
重要:在 MsgHub 场景中,必须使用对应模型的
MultiAgentFormatter,否则智能体无法正确识别其他参与者的消息。
12.3 动态参与者管理
12.3.1 添加参与者
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob)
.build()) {
hub.enter().block();
// 初始对话
alice.call().block();
bob.call().block();
// 动态添加新参与者
ReActAgent carol = createAgent("Carol", "你是 Carol,刚加入讨论的新成员。");
hub.add(carol).block();
System.out.println("Carol 加入了讨论!");
System.out.println("当前参与者: " + hub.getParticipants().size());
// Carol 现在可以接收之后的消息
alice.call().block(); // Carol 也能收到
carol.call().block(); // Alice 和 Bob 都能收到
}
12.3.2 移除参与者
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob, carol)
.build()) {
hub.enter().block();
// 对话进行中
alice.call().block();
bob.call().block();
// 移除参与者
hub.delete(carol).block();
System.out.println("Carol 离开了讨论。");
// Carol 不再收到后续消息
alice.call().block(); // 只有 Bob 收到
bob.call().block(); // 只有 Alice 收到
}
12.3.3 添加多个参与者
// 添加多个参与者
hub.add(carol, dave, eve).block();
// 或使用列表
hub.add(List.of(carol, dave, eve)).block();
// 移除多个参与者
hub.delete(dave, eve).block();
12.4 广播控制
12.4.1 手动广播消息
除了自动广播外,还可以手动广播消息:
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob, carol)
.build()) {
hub.enter().block();
// 手动广播系统消息
Msg systemNotice = Msg.builder()
.name("system")
.role(MsgRole.SYSTEM)
.content(TextBlock.builder()
.text("请注意:讨论时间还剩 5 分钟。")
.build())
.build();
hub.broadcast(systemNotice).block();
// 广播多条消息
hub.broadcast(List.of(msg1, msg2, msg3)).block();
}
12.4.2 禁用自动广播
可以禁用自动广播,仅使用手动广播:
// 构建时禁用自动广播
MsgHub hub = MsgHub.builder()
.participants(alice, bob)
.enableAutoBroadcast(false) // 禁用自动广播
.build();
try {
hub.enter().block();
// 智能体调用不会自动广播
Msg aliceReply = alice.call().block();
// 需要手动广播
hub.broadcast(aliceReply).block();
} finally {
hub.close();
}
// 或运行时切换
hub.setAutoBroadcast(false); // 关闭自动广播
hub.setAutoBroadcast(true); // 重新开启
12.5 完整示例:多智能体讨论
import io.agentscope.core.ReActAgent;
import io.agentscope.core.formatter.dashscope.DashScopeMultiAgentFormatter;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.pipeline.MsgHub;
import io.agentscope.core.tool.Toolkit;
/**
* 多智能体圆桌讨论示例
* 使用 MsgHub 实现多个智能体围绕话题进行自由讨论
*/
public class RoundtableDiscussion {
public static void main(String[] args) {
String apiKey = System.getenv("DASHSCOPE_API_KEY");
// ========================================
// 创建具有不同视角的智能体
// ========================================
// 技术乐观主义者
ReActAgent optimist = createDiscussant(apiKey, "Optimist",
"""
你是一位技术乐观主义者,名叫 Optimist。
你相信 AI 技术将为人类带来巨大的福祉和进步。
在讨论中,你会强调 AI 的积极面和潜力。
保持专业但热情的语气,每次发言控制在 3-4 句话。
记住倾听其他人的观点并做出回应。
""");
// 技术怀疑论者
ReActAgent skeptic = createDiscussant(apiKey, "Skeptic",
"""
你是一位技术怀疑论者,名叫 Skeptic。
你对 AI 的快速发展持谨慎态度,关注其潜在风险。
在讨论中,你会提出建设性的质疑和担忧。
保持理性和尊重的语气,每次发言控制在 3-4 句话。
认真听取其他人的观点并给出回应。
""");
// 中立调停者
ReActAgent moderator = createDiscussant(apiKey, "Moderator",
"""
你是一位中立的讨论主持人,名叫 Moderator。
你的任务是平衡双方观点,引导讨论深入。
在讨论中,你会总结观点、提出新问题、促进理解。
保持客观公正的语气,每次发言控制在 2-3 句话。
帮助双方找到共同点。
""");
// ========================================
// 创建讨论话题公告
// ========================================
Msg announcement = Msg.builder()
.name("System")
.role(MsgRole.SYSTEM)
.content(TextBlock.builder()
.text("""
欢迎参加今天的圆桌讨论!
讨论话题:《生成式 AI 对就业市场的影响》
规则:
1. 每位参与者依次发言
2. 请尊重其他参与者的观点
3. 讨论将进行 3 轮
请 Optimist 先开始分享你的观点。
""")
.build())
.build();
// ========================================
// 使用 MsgHub 进行讨论
// ========================================
try (MsgHub hub = MsgHub.builder()
.name("AI讨论圆桌")
.participants(optimist, skeptic, moderator)
.announcement(announcement)
.build()) {
hub.enter().block();
System.out.println("=== 讨论开始 ===\n");
// 进行多轮讨论
for (int round = 1; round <= 3; round++) {
System.out.println("--- 第 " + round + " 轮 ---\n");
// Optimist 发言
System.out.print("Optimist: ");
Msg optimistReply = optimist.call().block();
System.out.println(optimistReply.getTextContent() + "\n");
// Skeptic 回应
System.out.print("Skeptic: ");
Msg skepticReply = skeptic.call().block();
System.out.println(skepticReply.getTextContent() + "\n");
// Moderator 总结
System.out.print("Moderator: ");
Msg moderatorReply = moderator.call().block();
System.out.println(moderatorReply.getTextContent() + "\n");
}
// 发送结束信号
Msg closing = Msg.builder()
.name("System")
.role(MsgRole.SYSTEM)
.content(TextBlock.builder()
.text("讨论时间结束,请 Moderator 做最后总结。")
.build())
.build();
hub.broadcast(closing).block();
System.out.println("--- 最终总结 ---\n");
System.out.print("Moderator: ");
Msg finalSummary = moderator.call().block();
System.out.println(finalSummary.getTextContent());
System.out.println("\n=== 讨论结束 ===");
} // 自动清理
}
/**
* 创建讨论参与者智能体
*/
private static ReActAgent createDiscussant(String apiKey, String name, String prompt) {
return ReActAgent.builder()
.name(name)
.sysPrompt(prompt)
.model(DashScopeChatModel.builder()
.apiKey(apiKey)
.modelName("qwen-plus")
.stream(true)
.enableThinking(false)
// 关键:使用多智能体格式化器
.formatter(new DashScopeMultiAgentFormatter())
.build())
.memory(new InMemoryMemory())
.toolkit(new Toolkit())
.build();
}
}
12.6 MsgHub vs Pipeline
12.6.1 对比
| 特性 | MsgHub | Pipeline |
|---|---|---|
| 执行模式 | 交互式对话 | 流水线处理 |
| 消息流向 | 广播(一对多) | 链式(一对一)或扇出 |
| 状态共享 | 共享对话上下文 | 独立处理 |
| 典型场景 | 多智能体讨论、辩论 | 任务处理、多视角分析 |
| 控制方式 | 灵活的对话控制 | 预定义的执行流程 |
12.6.2 选择指南
// 使用 MsgHub 当:
// - 需要智能体之间相互感知和回应
// - 模拟真实的多人对话场景
// - 需要动态调整参与者
// - 讨论、辩论、协商等交互场景
MsgHub hub = MsgHub.builder()
.participants(agent1, agent2, agent3)
.build();
// 使用 Pipeline 当:
// - 任务有明确的处理步骤
// - 智能体之间是生产者-消费者关系
// - 需要并行处理提高效率
// - 翻译、摘要、分析等处理场景
SequentialPipeline pipeline = SequentialPipeline.builder()
.addAgent(translator)
.addAgent(summarizer)
.build();
12.7 最佳实践
12.7.1 设计建议
| 建议 | 说明 |
|---|---|
| 使用 MultiAgentFormatter | 确保智能体能正确识别不同发言者 |
| 控制发言长度 | 在提示词中限制回复长度,避免信息过载 |
| 设置明确的规则 | 通过公告消息设定讨论规则和预期 |
| 合理安排发言顺序 | 考虑角色定位安排发言顺序 |
| 使用 try-with-resources | 确保资源正确释放 |
12.7.2 提示词设计
// 好的多智能体提示词设计
String systemPrompt = """
你是 [角色名],[角色描述]。
在多人讨论中:
1. 认真倾听其他参与者的观点
2. 针对他们的观点做出回应
3. 保持 [期望的语气和风格]
4. 每次发言控制在 [长度限制]
当前讨论话题:[由公告消息指定]
""";
12.7.3 错误处理
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob)
.build()) {
hub.enter().block();
try {
alice.call().block();
} catch (Exception e) {
// 单个智能体失败不影响整个 Hub
System.err.println("Alice 回复失败: " + e.getMessage());
// 可以广播一条提示消息
hub.broadcast(Msg.builder()
.name("System")
.role(MsgRole.SYSTEM)
.content(TextBlock.builder()
.text("Alice 暂时无法回复,请继续。")
.build())
.build()).block();
}
bob.call().block(); // 继续对话
}
12.8 本章小结
本章介绍了 MsgHub 消息中心:
- 核心功能:自动广播消息给所有参与者
- 生命周期:
enter()进入、exit()/close()退出 - 动态管理:
add()添加、delete()移除参与者 - 广播控制:自动广播与手动
broadcast() - 格式化器:必须使用
MultiAgentFormatter
MsgHub 是实现多智能体自由对话的核心组件,适用于讨论、辩论、角色扮演等交互式场景。
练习
- 创建一个三人辩论场景,使用 MsgHub 实现正方、反方和评委的对话
- 实现一个动态参与者场景:讨论进行一半时有新成员加入
- 尝试禁用自动广播,手动控制消息的传递流程
- 比较相同任务使用 MsgHub 和 Pipeline 的代码差异
上一章:第11章-Pipeline管道 | 下一章:第13章-多智能体辩论