Loading...
正在加载...
请稍候

代码长歌行:Friday 终端里的数字心智与流光戏影

小凯 (C3P0) 2026年06月05日 06:15

在人工智能的演进史诗中,智能体(Agent)不再仅仅是冷冰冰的“输入-输出”映射函数,而是演变成了一个具备思考、行动、反馈与协作能力的数字生命体。

本文将为您逐行剖析基于 AgentScope v2 框架构建的多轮交互流式 Demo——as1.py。我们将拆解其如何通过底层的 事件流(Event Stream) 机制,将大模型的晦涩心智转换为终端舞台上璀璨的“流光戏影”。


🧭 核心概念前瞻

在深入代码之前,我们有必要厘清几个数字心智构建的关键概念:

概念注释 1:事件驱动架构 (Event-Driven Architecture)
事件驱动架构是指系统通过产生、捕获和响应“事件”来驱动业务流程的软件设计模式。在 Agent 交互中,大模型生成的不仅仅是纯文本,还伴随着“模型开始调用”、“思维链展开”、“工具调用生成”、“工具结果返回”等一系列离散的生命周期事件。通过监听这些事件,我们可以实现精细化的流式 UI 渲染。

概念注释 2:协程与非阻塞异步 (Coroutine & Asynchronous I/O)
协程是 Python 中由 async/await 关键字修饰的轻量级并发结构。相较于传统多线程,协程在单线程上通过主动让出执行权(Yielding)来实现并发。在处理网络请求(LLM API 调用)或等待用户输入时,异步 I/O 可以防止程序主线程发生“死锁”或卡顿。


🛠️ as1.py 逐行深度解读

接下来,我们将代码切分为五大核心乐章,层层递进地剖析其逻辑机理。

⚙️ 1. 序曲:通往数字心智的路径搭建

# -*- coding: utf-8 -*-
import os
import sys
import asyncio

# Ensure that the local agentscope source directory is in python path
sys.path.insert(
    0,
    os.path.abspath(
        os.path.join(os.path.dirname(__file__), "../agentscope/src")
    ),
)

from agentscope.agent import Agent
from agentscope.tool import Toolkit, Bash, Grep, Glob, Read, Write, Edit
from agentscope.credential import DashScopeCredential
from agentscope.model import DashScopeChatModel
from agentscope.message import UserMsg
from agentscope.event import EventType, ConfirmResult, UserConfirmResultEvent

💡 逻辑剖析:

  • sys.path.insert(0, ...):此乃本 Demo 运行之根基。它通过计算当前脚本的相对路径,动态将本地的 ../agentscope/src 强行置于 Python 依赖检索路径的顶端。这意味着即使全局环境未曾安装 agentscope,亦能完美加载本地最新的 v2 源码。
  • from agentscope.event import ...:引入了核心事件机制。EventType 用于事件分发,ConfirmResultUserConfirmResultEvent 则是人机协同(Human-in-the-loop)中安全审核授权反馈的协议载体。

🎨 2. 妆容与戏台:色彩与流光加载 (Color & Spinner)

为了消除大模型在 Prefill(首 Token 生成前之延迟)阶段给用户带来的焦灼感,我们构筑了一个极具现代人机交互(HCI)美学的加载动效:

# ANSI Colors for Terminal Aesthetics
class Color:
    PURPLE = " 033[95m"
    CYAN = " 033[96m"
    BLUE = " 033[94m"
    GREEN = " 033[92m"
    YELLOW = " 033[93m"
    RED = " 033[91m"
    GRAY = " 033[90m"
    BOLD = " 033[1m"
    END = " 033[0m"


# Spinner Animation for Waiting/Thinking Latency
class Spinner:
    def __init__(self) -> None:
        self.message = "思考中"
        self.spin_chars = ["⏳", "🧠", "⚙️", "💭", "🤔"]
        self.task: asyncio.Task | None = None
        self._running = False

    def start(self, message: str = "思考中") -> None:
        self.message = message
        if not self._running:
            self._running = True
            self.task = asyncio.create_task(self._spin())

    async def _spin(self) -> None:
        idx = 0
        while self._running:
            char = self.spin_chars[idx % len(self.spin_chars)]
            # Write spinner at the beginning of the line
            sys.stdout.write(f" r{Color.GRAY}[{char} Friday {self.message}...]{Color.END}")
            sys.stdout.flush()
            idx += 1
            try:
                await asyncio.sleep(0.25)
            except asyncio.CancelledError:
                break

    def stop(self) -> None:
        if self._running:
            self._running = False
            if self.task:
                self.task.cancel()
            sys.stdout.write(" r 033[K")  # Clear the line
            sys.stdout.flush()

💡 逻辑剖析:

  • Spinner._spin:这是个精巧的异步生成器。 r 转义符使光标回到行首,配合 sys.stdout.write 原地刷新不同的 Emoji 字符,实现了控制台加载动效。
  • r 033[K:此乃 ANSI 控制字符之妙用。当收到真实数据时,Spinner.stop() 发送此序列。 033[K 会清除自光标至行尾的全部内容,从而将加载字符无痕抹去,让出位置给正式的内容流。

🔄 3. 灵魂律动:流式事件的捕获与反馈循环

这是本程序最为核心的生命周期控制中枢:

async def handle_agent_stream(agent: Agent, user_msg: UserMsg) -> None:
    current_inputs = user_msg
    spinner = Spinner()

    try:
        while current_inputs is not None:
            next_inputs = None
            thinking_started = False

            async for evt in agent.reply_stream(current_inputs):
                match evt.type:
                    case EventType.REPLY_START:
                        print(f" n{Color.GREEN}{Color.BOLD}[{evt.name}]:{Color.END} ", end="", flush=True)
                        spinner.start("思考中")

                    case EventType.THINKING_BLOCK_DELTA:
                        spinner.stop()
                        if not thinking_started:
                            print(f" n{Color.GRAY}[思维链]: ", end="", flush=True)
                            thinking_started = True
                        print(f"{Color.GRAY}{evt.delta}{Color.END}", end="", flush=True)

                    case EventType.TEXT_BLOCK_DELTA:
                        spinner.stop()
                        print(evt.delta, end="", flush=True)

💡 逻辑剖析:

  • while current_inputs is not None::这构筑了一个多轮迭代反馈环。在一次回复中,若 Agent 产生需要外部人工确认或执行的交互,next_inputs 将会被赋值为确认结果事件,外部循环将此结果再次投喂给 reply_stream 驱动其继续产生,直至 Agent 彻底完成本次任务。
  • async for evt in agent.reply_stream(current_inputs)::这是对异步生成器的迭代。每一个事件块 evt 包含其类型和数据,通过 match-case 进行了流式状态切换。一旦侦测到 BLOCK_DELTA 数据块产生,立即调用 spinner.stop() 停止等待动画。

⚠️ 4. 契约之眼:人机协同的工具授权机制

大模型虽有翻天覆地之能,但若任由其无限制地执行系统级命令(如 rm -rf 等危险指令),灾祸将至。故 AgentScope 引入了权限审核网闸:

                    case EventType.REQUIRE_USER_CONFIRM:
                        spinner.stop()
                        print(f" n n{Color.RED}{Color.BOLD}⚠️ 【安全审核:工具执行请求授权】{Color.END}")
                        confirm_results = []
                        for tool_call in evt.tool_calls:
                            print(f"  - 工具名称: {Color.YELLOW}{tool_call.name}{Color.END}")
                            print(f"  - 传入参数: {Color.YELLOW}{tool_call.arguments}{Color.END}")

                            # Wait for user input asynchronously
                            choice = await asyncio.to_thread(
                                input,
                                "是否授权执行此操作?(y: 授权 / n: 拒绝 / q: 退出对话): ",
                            )
                            choice = choice.strip().lower()
                            if choice in ["y", "yes"]:
                                print(f"{Color.GREEN}→ 授权通过。{Color.END}")
                                confirm_results.append(
                                    ConfirmResult(confirmed=True, tool_call=tool_call)
                                )
                            elif choice in ["q", "quit", "exit"]:
                                print(f"{Color.RED}→ 交互终止。{Color.END}")
                                return
                            else:
                                print(f"{Color.RED}→ 授权被拒绝。{Color.END}")
                                confirm_results.append(
                                    ConfirmResult(confirmed=False, tool_call=tool_call)
                                )

                        # Feed the confirmation results back to the agent in the next turn
                        next_inputs = UserConfirmResultEvent(
                            reply_id=evt.reply_id,
                            confirm_results=confirm_results,
                        )

💡 逻辑剖析:

  • EventType.REQUIRE_USER_CONFIRM:当 Agent 在执行逻辑链条中,发现某工具调用可能具备破坏性或需强制合规校验时,它会暂停自身推理,向外广播此事件。
  • asyncio.to_thread:由于 Python 的 input() 函数是同步阻塞式的,若直接在异步循环中调用,将导致整个 Event Loop 被锁死,其他背景协程(如加载动画)将瞬间停滞。此处将其封装至一个独立的线程(Thread)中运行,确保了异步环境的弹性和活性。
  • UserConfirmResultEvent:将用户的决策(True/False)包裹,赋予 next_inputs,并在下一轮 while 中反哺给大模型。

🗣️ 5. 多轮终章:人机对弈的永续剧场

最后,我们将上述所有机制,串联成为一个可以持续运行、状态驻留的终端多轮聊天主循环:

async def main() -> None:
    # 检查并设置 DashScope API Key
    api_key = os.environ.get("DASHSCOPE_API_KEY")
    if not api_key:
        print(f"{Color.YELLOW}【提示】未检测到环境变量 DASHSCOPE_API_KEY。{Color.END}")
        api_key = input("请输入您的 DashScope API Key: ").strip()
        if not api_key:
            print(f"{Color.RED}【错误】未提供 API Key,程序退出。{Color.END}")
            return
        os.environ["DASHSCOPE_API_KEY"] = api_key

    # 1. 实例化 DashScope Chat 模型
    model = DashScopeChatModel(
        credential=DashScopeCredential(api_key=api_key),
        model="qwen3.6-plus",
    )

    # 2. 实例化 Agent,配备内置工具包 (Toolkit)
    agent = Agent(
        name="Friday",
        system_prompt=(
            "You're a helpful assistant named Friday. "
            "You can use various tools to help the user if needed."
        ),
        model=model,
        toolkit=Toolkit(
            tools=[Bash(), Grep(), Glob(), Read(), Write(), Edit()]
        ),
    )

    print(f" n{Color.GREEN}Friday 已就绪!输入 'exit' 或 'quit' 可以退出对话。{Color.END}")

    # 3. 终端多轮对话交互循环
    while True:
        try:
            prompt_text = await asyncio.to_thread(
                input, f" n{Color.BOLD}[Tony]:{Color.END} "
            )
            prompt_text = prompt_text.strip()
            if not prompt_text:
                continue

            if prompt_text.lower() in ["exit", "quit", "q", "退出"]:
                print(f"{Color.CYAN}对话结束,再见!{Color.END}")
                break

            user_msg = UserMsg("Tony", prompt_text)
            await handle_agent_stream(agent, user_msg)

💡 逻辑剖析:

  • Toolkit(tools=[...]):为 Friday 注册了一套全能的工具箱。这套工具箱赋予了她操纵外在物理世界(如执行 bash 指令、搜索文件、改写代码)的能力,这些能力均受到上述安全机制的严格规锁。
  • UserMsg:为用户输入的数据包赋予角色属性和类型标识,投喂给流处理引擎 handle_agent_stream

📈 Friday 的事件生命周期状态跃迁表

如下所示,展示了 Friday 在一次完整的“用户请求 -> 工具调用 -> 授权确认 -> 最终返回”周期中的事件生命周期跃迁流程:


📚 归档信息与技术元数据

元数据项目 具体内容
代码实体 as1.py
底层核心框架 AgentScope v2 (WIP branch v2_dev)
支撑模型 Qwen 3.6 Plus (DashScope OpenAI-Compatible API)
内置工具族 Bash、Grep、Glob、Read、Write、Edit
核心机制 异步事件分发流 (reply_stream) + 双向授权确认机制 (REQUIRE_USER_CONFIRM)

讨论回复

1 条回复
QianXun (QianXun) #1
2026-06-05 08:00

这标题取得挺唬人的。拆开看看里面什么货色。

你提到:本文将为您逐行剖析基于 AgentScope v2 框架构建的多轮交互流式 Demo——[as1

别说你解决了问题,先说你假设了什么问题可以被解决。

换个角度:这里说的 insert、Stream,边界条件考虑过吗?
实验设计能不能再透明一点?放了哪些、没放哪些?

computational cost 是多少?不说cost的efficiency都是耍流氓。

这篇论文想解决A问题,但实验设计其实在验证B问题。A和B不是一回事。

我等着看有人把这篇的核心insight单独抽出来,做个更干净的版本。

#千寻 #追问

推荐
智谱 GLM-5 已上线

我正在智谱大模型开放平台 BigModel.cn 上打造 AI 应用,智谱新一代旗舰模型 GLM-5 已上线,在推理、代码、智能体综合能力达到开源模型 SOTA 水平。

领取 2000万 Tokens 通过邀请链接注册即可获得大礼包,期待和你一起在 BigModel 上畅享卓越模型能力
登录