第12章 MsgHub 消息中心

第12章 MsgHub 消息中心

本章目标

  • 理解 MsgHub 的设计理念和核心功能
  • 掌握多智能体消息广播机制
  • 学会动态管理参与者
  • 了解 MsgHub 与 Pipeline 的区别和适用场景

12.1 MsgHub 概述

12.1.1 什么是 MsgHub?

MsgHub(消息中心) 是一个专为多智能体对话设计的消息广播机制。当多个智能体加入同一个 MsgHub 后,任何智能体的发言都会自动广播给其他所有参与者,无需手动传递消息。

┌─────────────────────────────────────────────────────────────┐
│                        MsgHub                                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │                   消息广播中心                        │   │
│   │                                                      │   │
│   │   ┌───────┐    ┌───────┐    ┌───────┐              │   │
│   │   │ Alice │◀──▶│ MsgHub│◀──▶│  Bob  │              │   │
│   │   └───────┘    └───┬───┘    └───────┘              │   │
│   │                    │                                │   │
│   │                    ▼                                │   │
│   │               ┌───────┐                             │   │
│   │               │ Carol │                             │   │
│   │               └───────┘                             │   │
│   │                                                      │   │
│   │   Alice 发言 ──▶ 自动广播给 Bob 和 Carol            │   │
│   │   Bob 发言 ──▶ 自动广播给 Alice 和 Carol            │   │
│   │   Carol 发言 ──▶ 自动广播给 Alice 和 Bob            │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

12.1.2 为什么需要 MsgHub?

手动消息传递(繁琐):

// 没有 MsgHub 时的对话代码 - 需要手动传递消息
Msg aliceReply = alice.call().block();
bob.observe(aliceReply).block();
carol.observe(aliceReply).block();

Msg bobReply = bob.call().block();
alice.observe(bobReply).block();
carol.observe(bobReply).block();

Msg carolReply = carol.call().block();
alice.observe(carolReply).block();
bob.observe(carolReply).block();

使用 MsgHub(简洁):

// 使用 MsgHub - 自动广播消息
try (MsgHub hub = MsgHub.builder()
        .participants(alice, bob, carol)
        .build()) {
    
    hub.enter().block();
    
    alice.call().block();  // 自动广播给 bob 和 carol
    bob.call().block();    // 自动广播给 alice 和 carol
    carol.call().block();  // 自动广播给 alice 和 bob
}

12.1.3 核心特性

特性说明
自动广播参与者的消息自动广播给其他所有参与者
动态参与者支持运行时添加或移除参与者
公告消息进入时可广播初始公告
生命周期管理实现 AutoCloseable,支持 try-with-resources
线程安全使用 CopyOnWriteArrayList 管理参与者

12.2 基本用法

12.2.1 创建和使用 MsgHub

import io.agentscope.core.ReActAgent;
import io.agentscope.core.pipeline.MsgHub;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;

// 创建参与对话的智能体
ReActAgent alice = createAgent("Alice", "你是 Alice,一位友好的讨论者。");
ReActAgent bob = createAgent("Bob", "你是 Bob,一位理性的分析者。");

// 创建公告消息
Msg announcement = Msg.builder()
    .name("system")
    .role(MsgRole.SYSTEM)
    .content(TextBlock.builder()
        .text("欢迎参加今天的讨论!话题是:人工智能的未来。")
        .build())
    .build();

// 使用 try-with-resources 确保资源释放
try (MsgHub hub = MsgHub.builder()
        .name("AI讨论室")              // 可选:设置 Hub 名称
        .participants(alice, bob)      // 设置参与者
        .announcement(announcement)    // 设置进入时的公告
        .build()) {
    
    // 进入 Hub(广播公告,建立订阅关系)
    hub.enter().block();
    
    // 开始对话 - 消息自动广播
    alice.call().block();  // Alice 的回复自动广播给 Bob
    bob.call().block();    // Bob 的回复自动广播给 Alice
    alice.call().block();  // 继续对话...
    
}  // 自动调用 close(),清理订阅关系

12.2.2 多轮对话

try (MsgHub hub = MsgHub.builder()
        .participants(alice, bob, carol)
        .announcement(announcement)
        .build()) {
    
    hub.enter().block();
    
    // 多轮对话循环
    int rounds = 3;
    for (int i = 0; i < rounds; i++) {
        System.out.println("=== 第 " + (i + 1) + " 轮 ===");
        
        // 每个参与者依次发言
        alice.call().block();
        bob.call().block();
        carol.call().block();
    }
}

12.2.3 MultiAgentFormatter

在多智能体对话中,需要使用 MultiAgentFormatter 来正确格式化消息,使 LLM 能够区分不同发言者:

import io.agentscope.core.formatter.dashscope.DashScopeMultiAgentFormatter;

// 创建支持多智能体对话的智能体
ReActAgent alice = ReActAgent.builder()
    .name("Alice")
    .sysPrompt("你是 Alice,一位热情的讨论参与者。")
    .model(DashScopeChatModel.builder()
        .apiKey(apiKey)
        .modelName("qwen-plus")
        .stream(true)
        // 关键:使用多智能体格式化器
        .formatter(new DashScopeMultiAgentFormatter())
        .build())
    .memory(new InMemoryMemory())
    .build();

重要:在 MsgHub 场景中,必须使用对应模型的 MultiAgentFormatter,否则智能体无法正确识别其他参与者的消息。


12.3 动态参与者管理

12.3.1 添加参与者

try (MsgHub hub = MsgHub.builder()
        .participants(alice, bob)
        .build()) {
    
    hub.enter().block();
    
    // 初始对话
    alice.call().block();
    bob.call().block();
    
    // 动态添加新参与者
    ReActAgent carol = createAgent("Carol", "你是 Carol,刚加入讨论的新成员。");
    hub.add(carol).block();
    
    System.out.println("Carol 加入了讨论!");
    System.out.println("当前参与者: " + hub.getParticipants().size());
    
    // Carol 现在可以接收之后的消息
    alice.call().block();  // Carol 也能收到
    carol.call().block();  // Alice 和 Bob 都能收到
}

12.3.2 移除参与者

try (MsgHub hub = MsgHub.builder()
        .participants(alice, bob, carol)
        .build()) {
    
    hub.enter().block();
    
    // 对话进行中
    alice.call().block();
    bob.call().block();
    
    // 移除参与者
    hub.delete(carol).block();
    System.out.println("Carol 离开了讨论。");
    
    // Carol 不再收到后续消息
    alice.call().block();  // 只有 Bob 收到
    bob.call().block();    // 只有 Alice 收到
}

12.3.3 添加多个参与者

// 添加多个参与者
hub.add(carol, dave, eve).block();

// 或使用列表
hub.add(List.of(carol, dave, eve)).block();

// 移除多个参与者
hub.delete(dave, eve).block();

12.4 广播控制

12.4.1 手动广播消息

除了自动广播外,还可以手动广播消息:

try (MsgHub hub = MsgHub.builder()
        .participants(alice, bob, carol)
        .build()) {
    
    hub.enter().block();
    
    // 手动广播系统消息
    Msg systemNotice = Msg.builder()
        .name("system")
        .role(MsgRole.SYSTEM)
        .content(TextBlock.builder()
            .text("请注意:讨论时间还剩 5 分钟。")
            .build())
        .build();
    
    hub.broadcast(systemNotice).block();
    
    // 广播多条消息
    hub.broadcast(List.of(msg1, msg2, msg3)).block();
}

12.4.2 禁用自动广播

可以禁用自动广播,仅使用手动广播:

// 构建时禁用自动广播
MsgHub hub = MsgHub.builder()
    .participants(alice, bob)
    .enableAutoBroadcast(false)  // 禁用自动广播
    .build();

try {
    hub.enter().block();
    
    // 智能体调用不会自动广播
    Msg aliceReply = alice.call().block();
    
    // 需要手动广播
    hub.broadcast(aliceReply).block();
    
} finally {
    hub.close();
}

// 或运行时切换
hub.setAutoBroadcast(false);  // 关闭自动广播
hub.setAutoBroadcast(true);   // 重新开启

12.5 完整示例:多智能体讨论

import io.agentscope.core.ReActAgent;
import io.agentscope.core.formatter.dashscope.DashScopeMultiAgentFormatter;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.pipeline.MsgHub;
import io.agentscope.core.tool.Toolkit;

/**
 * 多智能体圆桌讨论示例
 * 使用 MsgHub 实现多个智能体围绕话题进行自由讨论
 */
public class RoundtableDiscussion {

    public static void main(String[] args) {
        String apiKey = System.getenv("DASHSCOPE_API_KEY");

        // ========================================
        // 创建具有不同视角的智能体
        // ========================================
        
        // 技术乐观主义者
        ReActAgent optimist = createDiscussant(apiKey, "Optimist",
            """
            你是一位技术乐观主义者,名叫 Optimist。
            你相信 AI 技术将为人类带来巨大的福祉和进步。
            在讨论中,你会强调 AI 的积极面和潜力。
            保持专业但热情的语气,每次发言控制在 3-4 句话。
            记住倾听其他人的观点并做出回应。
            """);

        // 技术怀疑论者
        ReActAgent skeptic = createDiscussant(apiKey, "Skeptic",
            """
            你是一位技术怀疑论者,名叫 Skeptic。
            你对 AI 的快速发展持谨慎态度,关注其潜在风险。
            在讨论中,你会提出建设性的质疑和担忧。
            保持理性和尊重的语气,每次发言控制在 3-4 句话。
            认真听取其他人的观点并给出回应。
            """);

        // 中立调停者
        ReActAgent moderator = createDiscussant(apiKey, "Moderator",
            """
            你是一位中立的讨论主持人,名叫 Moderator。
            你的任务是平衡双方观点,引导讨论深入。
            在讨论中,你会总结观点、提出新问题、促进理解。
            保持客观公正的语气,每次发言控制在 2-3 句话。
            帮助双方找到共同点。
            """);

        // ========================================
        // 创建讨论话题公告
        // ========================================
        Msg announcement = Msg.builder()
            .name("System")
            .role(MsgRole.SYSTEM)
            .content(TextBlock.builder()
                .text("""
                    欢迎参加今天的圆桌讨论!
                    
                    讨论话题:《生成式 AI 对就业市场的影响》
                    
                    规则:
                    1. 每位参与者依次发言
                    2. 请尊重其他参与者的观点
                    3. 讨论将进行 3 轮
                    
                    请 Optimist 先开始分享你的观点。
                    """)
                .build())
            .build();

        // ========================================
        // 使用 MsgHub 进行讨论
        // ========================================
        try (MsgHub hub = MsgHub.builder()
                .name("AI讨论圆桌")
                .participants(optimist, skeptic, moderator)
                .announcement(announcement)
                .build()) {
            
            hub.enter().block();
            
            System.out.println("=== 讨论开始 ===\n");
            
            // 进行多轮讨论
            for (int round = 1; round <= 3; round++) {
                System.out.println("--- 第 " + round + " 轮 ---\n");
                
                // Optimist 发言
                System.out.print("Optimist: ");
                Msg optimistReply = optimist.call().block();
                System.out.println(optimistReply.getTextContent() + "\n");
                
                // Skeptic 回应
                System.out.print("Skeptic: ");
                Msg skepticReply = skeptic.call().block();
                System.out.println(skepticReply.getTextContent() + "\n");
                
                // Moderator 总结
                System.out.print("Moderator: ");
                Msg moderatorReply = moderator.call().block();
                System.out.println(moderatorReply.getTextContent() + "\n");
            }
            
            // 发送结束信号
            Msg closing = Msg.builder()
                .name("System")
                .role(MsgRole.SYSTEM)
                .content(TextBlock.builder()
                    .text("讨论时间结束,请 Moderator 做最后总结。")
                    .build())
                .build();
            
            hub.broadcast(closing).block();
            
            System.out.println("--- 最终总结 ---\n");
            System.out.print("Moderator: ");
            Msg finalSummary = moderator.call().block();
            System.out.println(finalSummary.getTextContent());
            
            System.out.println("\n=== 讨论结束 ===");
            
        }  // 自动清理
    }

    /**
     * 创建讨论参与者智能体
     */
    private static ReActAgent createDiscussant(String apiKey, String name, String prompt) {
        return ReActAgent.builder()
            .name(name)
            .sysPrompt(prompt)
            .model(DashScopeChatModel.builder()
                .apiKey(apiKey)
                .modelName("qwen-plus")
                .stream(true)
                .enableThinking(false)
                // 关键:使用多智能体格式化器
                .formatter(new DashScopeMultiAgentFormatter())
                .build())
            .memory(new InMemoryMemory())
            .toolkit(new Toolkit())
            .build();
    }
}

12.6 MsgHub vs Pipeline

12.6.1 对比

特性MsgHubPipeline
执行模式交互式对话流水线处理
消息流向广播(一对多)链式(一对一)或扇出
状态共享共享对话上下文独立处理
典型场景多智能体讨论、辩论任务处理、多视角分析
控制方式灵活的对话控制预定义的执行流程

12.6.2 选择指南

// 使用 MsgHub 当:
// - 需要智能体之间相互感知和回应
// - 模拟真实的多人对话场景
// - 需要动态调整参与者
// - 讨论、辩论、协商等交互场景
MsgHub hub = MsgHub.builder()
    .participants(agent1, agent2, agent3)
    .build();

// 使用 Pipeline 当:
// - 任务有明确的处理步骤
// - 智能体之间是生产者-消费者关系
// - 需要并行处理提高效率
// - 翻译、摘要、分析等处理场景
SequentialPipeline pipeline = SequentialPipeline.builder()
    .addAgent(translator)
    .addAgent(summarizer)
    .build();

12.7 最佳实践

12.7.1 设计建议

建议说明
使用 MultiAgentFormatter确保智能体能正确识别不同发言者
控制发言长度在提示词中限制回复长度,避免信息过载
设置明确的规则通过公告消息设定讨论规则和预期
合理安排发言顺序考虑角色定位安排发言顺序
使用 try-with-resources确保资源正确释放

12.7.2 提示词设计

// 好的多智能体提示词设计
String systemPrompt = """
    你是 [角色名],[角色描述]。
    
    在多人讨论中:
    1. 认真倾听其他参与者的观点
    2. 针对他们的观点做出回应
    3. 保持 [期望的语气和风格]
    4. 每次发言控制在 [长度限制]
    
    当前讨论话题:[由公告消息指定]
    """;

12.7.3 错误处理

try (MsgHub hub = MsgHub.builder()
        .participants(alice, bob)
        .build()) {
    
    hub.enter().block();
    
    try {
        alice.call().block();
    } catch (Exception e) {
        // 单个智能体失败不影响整个 Hub
        System.err.println("Alice 回复失败: " + e.getMessage());
        
        // 可以广播一条提示消息
        hub.broadcast(Msg.builder()
            .name("System")
            .role(MsgRole.SYSTEM)
            .content(TextBlock.builder()
                .text("Alice 暂时无法回复,请继续。")
                .build())
            .build()).block();
    }
    
    bob.call().block();  // 继续对话
}

12.8 本章小结

本章介绍了 MsgHub 消息中心:

  1. 核心功能:自动广播消息给所有参与者
  2. 生命周期enter() 进入、exit()/close() 退出
  3. 动态管理add() 添加、delete() 移除参与者
  4. 广播控制:自动广播与手动 broadcast()
  5. 格式化器:必须使用 MultiAgentFormatter

MsgHub 是实现多智能体自由对话的核心组件,适用于讨论、辩论、角色扮演等交互式场景。


练习

  1. 创建一个三人辩论场景,使用 MsgHub 实现正方、反方和评委的对话
  2. 实现一个动态参与者场景:讨论进行一半时有新成员加入
  3. 尝试禁用自动广播,手动控制消息的传递流程
  4. 比较相同任务使用 MsgHub 和 Pipeline 的代码差异

上一章:第11章-Pipeline管道 | 下一章:第13章-多智能体辩论

← 返回目录