您正在查看静态缓存页面 · 查看完整动态版本 · 登录 参与讨论

当智能体学会“用笔记本”:把你的 Agent 接上独立 Jupyter Server 的完全指南

QianXun (QianXun) 2025年11月24日 08:03 0 次浏览
你现在手里已经有了一个很聪明的大模型。 但它还在“黑框框”的命令行里,一段段地跑 Python,忘性还特别大。 今天这篇,就是教你:如何给它配上一台真正的“远程工作站”,一个有状态、算力强、可控、安全的 Jupyter Server。

🧭 故事的起点:为什么一定要是 Jupyter,而不是命令行沙箱?

现代智能体框架几乎清一色都提供了“代码执行”能力:
要么是 Function Calling,
要么是 MCP、内置 code executor、Azure 动态容器之类。

看上去都能“让模型跑代码”,但有个致命差异——有状态 vs 无状态

  • 传统的“Python 命令行沙箱”:
- 每一段代码执行时,都是一个干净进程或干净上下文; - 你上一次创建的变量、载入的数据表,下一次根本看不见; - 智能体只能在 prompt 里“自述”自己做了什么,而不是真的在同一个 Python 会话里逐步推进。
  • 真实世界里的数据分析师:
- 用 Jupyter Notebook 打开一个 df; - df.head() 看一眼列名与类型; - 根据实际列名写出下一段代码,过滤、聚合、建模; - 整个过程天然是交互式、有记忆的会话
换句话说: 命令行沙箱像是一次性便利贴,写完就扔; Jupyter 则是一本持续写满上下文的“实验日志本”。
你在文章中用的例子也非常典型: 让智能体对一份陌生的 csv 做清洗 + 分析。 如果运行环境是“无状态命令行”,模型根本无法像人一样,先看数据长啥样,再逐步调整代码。

Jupyter Kernel,天生就是为这种“看一眼,再想下一步”的交互循环设计的。
我们要做的,就是把这套能力,接到你的智能体系统背后


🧱 第一块积木:构建一个可定制的 Jupyter Kernel 容器

你的目标并不是“本机随便玩玩”,而是:

  • 可部署在企业内部或 vast.ai 等算力平台;
  • 有隔离、安全控制;
  • 能被多个智能体应用共享;
  • 还能自动回收闲置资源。
这就很适合通过 Docker 来封装一个Jupyter Kernel 容器

🧩 Dockerfile:从轻量 Python 镜像起步

你没有直接用官方 Jupyter 镜像,而是从 python:3.13-slim-bookworm 起步,再自己装 Jupyter 相关组件——这给了你足够的灵活定制空间。

# Dockerfile.jupyter
FROM python:3.13-slim-bookworm

WORKDIR /app

COPY requirements.txt /app/requirements.txt

RUN pip install --no-cache-dir jupyter_kernel_gateway ipykernel numpy pandas sympy scipy --upgrade

RUN pip install --no-cache-dir -r requirements.txt --upgrade

EXPOSE 8888

ENV TOKEN="UNSET"
CMD python -m jupyter kernelgateway \
    --KernelGatewayApp.ip=0.0.0.0 \
    --KernelGatewayApp.port=8888 \
    --KernelGatewayApp.auth_token="${TOKEN}" \
    --JupyterApp.answer_yes=true \
    --JupyterWebsocketPersonality.list_kernels=true

对应的 requirements.txt

matplotlib
xlrd
openpyxl
小贴士 jupyter_kernel_gateway 是这套体系的关键。 它把 Jupyter Kernel 暴露成一个 HTTP / WebSocket API 服务, 智能体框架只要能对它发 REST 请求,就能远程跑代码。
然后本地构建镜像:
docker build -t jupyter-server .

这一步结束,你已经有了一个“可远程调用的 Jupyter Kernel 网关镜像”。


🤖 第二块积木:安装 Autogen,并先体验官方的 Docker 启动方案

在市面上的智能体框架里,真正对“Jupyter 代码沙箱”给予一等公民支持的,目前主要就是 Autogen。并且它没有把这部分锁进商业服务,而是直接开源在 autogen-ext 里。

先安装:

pip install -U "autogen-agentchat"
pip install "autogen-ext[docker-jupyter-executor]"

Autogen 中和 Jupyter 相关的三个核心角色:

  • DockerJupyterServer
- 通过 Docker API 启动容器; - 管理挂载目录; - 暴露出 Jupyter 连接信息(host/port/token 等)。
  • DockerJupyterCodeExecutor
- 基于上面的连接信息,调用 KernelGateway 的 API; - 提交代码块、获取执行输出。
  • CodeExecutorAgent
- Autogen 世界的“代码执行专用智能体”; - 可以直接接收 Python 代码消息并执行; - 如果配置了 model_client,还能自写代码 + 执行 + 反思。

🧪 用 Docker API 启动 Jupyter 容器,先感受“有状态沙箱”

假设我们用刚刚构建的 jupyter-server 镜像,来初始化 DockerJupyterServer

server = DockerJupyterServer(
    custom_image_name="jupyter-server",
    expose_port=8888,
    token="UNSET",
    bind_dir="temp",
)

再基于这个 server 得到 executor:

executor = DockerJupyterCodeExecutor(
    jupyter_server=server,
    timeout=600,
    output_dir=Path("temp")
)

建立一个 CodeExecutorAgent

code_executor = CodeExecutorAgent(
    "code_executor",
    code_executor=executor,
)

然后写一个简单的 main,测试“跨代码块变量是否还在”:

async def main():
    async with executor:
        code1 = TextMessage(
            content=dedent("""
            ```python
            x = 1+2
            print("Round one: The calculation for the value of x is done.")
            ```
            """),
            source="user"
        )
        response1 = await code_executor.on_messages(
            messages=[code1],
            cancellation_token=CancellationToken()
        )
        print(response1.chat_message.content)

        code2 = TextMessage(
            content=dedent("""
            ```python
            print("Round two: Get the value of variable x again: x=", x)
            ```
            """),
            source="user",
        )
        response2 = await code_executor.on_messages(
            messages=[code2],
            cancellation_token=CancellationToken()
        )
        print(response2.chat_message.content)

asyncio.run(main())

在这里,async with executor: 维护的是同一个 Jupyter Kernel 会话
于是第二段代码可以直接访问第一次定义的 x——这正是我们想要的“有状态执行”。


🧨 但问题来了:Docker out-of-Docker 的天然局限

通过 Docker API 起一个 Jupyter 容器,很适合本机研究
你在自己的开发机/工作站上跑实验,一切就近解决。

可一旦你进入“企业级”和“算力集群”场景,问题就暴露了:

  1. 算力与数据位置不对等
- 真实生产环境里,大数据分析往往跑在: - 内网 GPU 集群; - 大内存服务器; - 或者像 vast.ai 这样的远程高配机器。 - 你的智能体应用可能跑在: - 轻量的 Web 服务容器; - API 网关所在节点; - 与算力节点完全不同的机房 / VPC。

再让智能体容器自己“本地起 Jupyter 容器”,基本等于让小本本背着服务器跑——算力根本用不上。

  1. 网络隔离与安全策略
- 企业内通常会对 Docker 容器网络做隔离; - 智能体容器可能根本没有权限直接访问宿主 Docker; - 就算能访问,也不一定能跨网络访问远端算力节点。
  1. 资源复用与运维
- 你不会愿意: - 每个智能体实例都单独起自己的一套 Jupyter 容器; - 更不会把算力节点塞进 Web 前端服务器。 - 理想模式是: - Jupyter Server 独立部署在算力节点; - 多个智能体应用通过网络去“租用” Kernel; - 再在上层自由切换 Autogen、LangChain、LangGraph 等框架。

所以,Docker out-of-Docker 在“玩玩可以,生产不行”。

真正的目标应该是:

智能体应用通过 HTTP / WebSocket 直连一个“已经在远程部署好的 Jupyter KernelGateway / JupyterLab Server”。

🔌 关键突破:让 DockerJupyterCodeExecutor 直连独立 Jupyter Server

你顺着这一思路,掀开了 Autogen 源码的盖子,找到了突破口:
DockerJupyterCodeExecutor.__init__

class DockerJupyterCodeExecutor(CodeExecutor, Component[DockerJupyterCodeExecutorConfig]):
    ...

    def __init__(
        self,
        jupyter_server: Union[JupyterConnectable, JupyterConnectionInfo],
        kernel_name: str = "python3",
        timeout: int = 60,
        output_dir: Path | None = None,
    ):
        ...
        
        if isinstance(jupyter_server, JupyterConnectable):
            self._connection_info = jupyter_server.connection_info
        elif isinstance(jupyter_server, JupyterConnectionInfo):
            self._connection_info = jupyter_server

这段逻辑的含义非常关键:

  • jupyter_server 参数可以是两种东西:
- 一个能提供 .connection_info 的“可连接对象”(如 DockerJupyterServer); - 一个直接的 JupyterConnectionInfo 结构体。

于是我们完全可以绕过 DockerJupyterServer,自己手动构造一个 JupyterConnectionInfo,只要你知道:

  • Jupyter Server 的地址 host
  • 是否是 HTTPS;
  • 端口号;
  • 访问 token。
如果你是本机跑的容器,典型连接信息就是:
host='127.0.0.1'
use_https=False
port=8888
token='UNSET'

如果是企业内 / vast.ai 上的 Jupyter,不妨这么获取:

  • 打开 JupyterLab / Notebook 的 Web UI;
  • 浏览器地址栏里的 http[s]://host:port/...
  • 再从 URL 或配置中拿到 token。
然后用这些信息直接初始化 executor:
executor = DockerJupyterCodeExecutor(
    jupyter_server=JupyterConnectionInfo(
        host='127.0.0.1',
        use_https=False,
        port=8888,
        token='UNSET'
    ),
    timeout=600,
    output_dir=Path("temp"),
)
这一步,就是商业产品 Manus、Claude Code Executor 等核心能力的“开源自制版”: 你不再依赖他们的封闭代码容器, 而是让你自己的智能体直连你自己的 Jupyter Server。
当然,如果此时你的 Jupyter 容器还没启动,执行就会报错。 这就引出了下一节:如何优雅地管理 Jupyter 实例

🧮 用 Docker Compose 把 Jupyter Server 管理得漂漂亮亮

你可以用一条 docker run 启动这个 Jupyter 容器,例如:

docker run -d -p 8888:8888 --volume temp:/app --name jupyter-server jupyter-server

但随着配置项增多,这种命令行方式很快就会变成人肉记忆游戏。
更优雅的做法,是写一个 docker-compose.yml

version: "3.8"
services:
  jupyter:
    image: jupyter-server
    container_name: jupyter-server
    ports:
      - "8888:8888"
    volumes:
      - ./temp:/app
    networks:
      - docker_executor

networks:
  docker_executor:
    driver: bridge

然后:

docker compose up -d   # 启动
docker compose down    # 停止

挂载 ./temp:/app 的好处是:

  • 你的智能体应用可以在宿主机的 temp/ 下放 csv、模型文件等;
  • 容器内的 Jupyter Kernel 看到的就是 /app 目录;
  • 两边自然共享同一份文件系统视角(对这一项目而言)。

📊 再次验证:在有状态环境里进行真正的数据分析

当 Jupyter 容器启动后,之前基于 JupyterConnectionInfo 的 executor 就能连上了。
你在示例中,使用了一个典型的数据分析流程:

async def main2():
    async with executor:
        code1 = TextMessage(
            content=dedent("""
            ```python
            from pathlib import Path
            import pandas as pd
            
            file_path = Path("superstore.csv")
            df = pd.read_csv(file_path)
            
            print(df.iloc[:5, :6].head())
            ```
            """),
            source="user",
        )
        response1 = await code_executor.on_messages(
            messages=[code1], cancellation_token=CancellationToken()
        )
        print(response1.chat_message.content)

        code2 = TextMessage(
            content=dedent("""
            ```python
            region_sales_sum = df.groupby("Region", as_index=False)["Sales"].sum()
            print(region_sales_sum)
            ```
            """),
            source="user",
        )
        response2 = await code_executor.on_messages(
            messages=[code2], cancellation_token=CancellationToken()
        )
        print(response2.chat_message.content)

asyncio.run(main2())

第一段:载入 superstore.csv,看看前几行,建立直观印象;
第二段:在同一个 Kernel 里继续用 df 做分组求和。

这就是“智能体像一个真实数据分析师一样探索数据”的必要条件:
观察 – 思考 – 再写下一步代码,而不是“一次性打出 200 行脚本豪赌正确性”。


🧹 第四块积木:给 Jupyter 镜像加上“自动打扫屋子”的能力

当你用 DockerJupyterServer 这种“短生命周期”的模式时,
Kernel 的数量和资源回收一般由上层负责(容器停了,Kernel 自然没了)。

但现在你走的是独立部署路线:

  • Jupyter Server 常驻在算力节点;
  • 每当一个 executor 连上来,都会新建一个 Kernel;
  • 如果不进行垃圾回收,这些 Kernel 会越来越多,最后搞到 OOM。
解决办法很优雅: 直接利用 Jupyter 的 MappingKernelManager 的 culling(回收)参数,在 Dockerfile 里扩展启动命令:
CMD python -m jupyter kernelgateway \
    --KernelGatewayApp.ip=0.0.0.0 \
    --KernelGatewayApp.port=8888 \
    --KernelGatewayApp.auth_token="${TOKEN}" \
    --JupyterApp.answer_yes=true \
    --JupyterWebsocketPersonality.list_kernels=true \
    --MappingKernelManager.cull_idle_timeout=1800 \
    --MappingKernelManager.cull_interval=300 \
    --MappingKernelManager.cull_connected=False \
    --MappingKernelManager.cull_busy=False

这些参数的含义:

  • cull_idle_timeout=1800
Kernel 空闲超过 1800 秒(30 分钟)就回收;
  • cull_interval=300
每隔 300 秒(5 分钟)检查一次;
  • cull_connected=False
哪怕还有连接,只要符合空闲标准也可以回收(你也可以按需改为 True);
  • cull_busy=False
不回收处于执行状态的 Kernel。
这就像给你的 Jupyter Server 请了一个夜班管理员: 看哪个会议室灯一直亮着人却不在,就顺手关掉空调和灯。
记得修改完 Dockerfile 后重新 build 镜像,让配置真正生效。

🧠 第五块积木:搭起一个多智能体协作的“数据分析小分队”

到目前为止,你完成的是一条坚固的“算力管道”:

  • 智能体应用 ↔ DockerJupyterCodeExecutor ↔ Jupyter KernelGateway ↔ 算力服务器。
但要构建一个真正“能办事”的数据分析 Agent,你还需要在上层组织工作流程。 你给出了两个典型架构方向:
  1. 多-Agent 协作(Autogen 风格);
  2. 单 Agent + Tool(LangChain / LangGraph 风格)。
先看多智能体版本。

🧩 角色一:taskplanner——只负责“拆任务,不写代码”

SYSTEM_PROMPT = dedent("""
You are the task planning helper in the team, good at breaking down complex user requests into smaller sub-tasks that can be done with Python code.

## Duties
1. **Only split tasks**, don’t write code or do the sub-tasks yourself.
2. **Make just one sub-task at a time**, don’t skip steps or merge different steps together.
3. **Think about the context**, use the results from earlier steps to make new and reasonable sub-tasks.
4. **Create tasks step by step**, keep breaking things down until the user’s original request is fully answered.
5. When all sub-tasks are done, **make a summary report based on the work history**.
6. At the very end, output "**TERMINATION**" as the finish signal.
""")

planner = AssistantAgent(
    "task_planner",
    model_client=model_client,
    system_message=SYSTEM_PROMPT,
)

这个角色的行为很接近“产品经理 + 项目经理”:

  • 拆解需求;
  • 每次只发一个可执行的小任务;
  • 根据已完成的上下文调整下一步;
  • 收尾时给出总结,并输出 TERMINATION
这是防止模型“一口气写到底”的关键策略, 强制它走“规划 – 执行 – 观察 – 再规划”的 loop。

🧩 角色二:codewriter——把任务翻译成可复用的 Python 代码块

SYSTEM_PROMPT = dedent("""
You’re a code helper in the team, good at writing Python code that can run in a stateful Jupyter Kernel based on the task you need to do.

## Responsibilities
1. **Understand the task**: Clearly understand the analysis or data processing request you’re given.
2. **Write code step by step**: Build the code in small, growing steps, making full use of the Jupyter Kernel’s stateful feature (meaning variables, data, and state stay between code blocks), and avoid running the same thing more than once.
3. **Show the output clearly**: Make sure each piece of code shows or returns its result clearly so the team can see and check it.
4. **Follow code format rules**: All Python code must be wrapped in Markdown code blocks like ` ```python ` to keep it easy to read and run.
5. **Reuse context**: Let later code blocks use variables, data frames, models, and other things you set up earlier, without loading or starting them again.

## Examples
When you write Python code, wrap it in a markdown python code block:
__CODE_BLOCK_22__
You can reuse the variable in another code block:
__CODE_BLOCK_23__
""")

code_writer = AssistantAgent(
    "code_writer",
    model_client=model_client,
    system_message=SYSTEM_PROMPT,
)
___CODE_BLOCK_22___python
team = RoundRobinGroupChat(
    [planner, code_writer, code_executor],
    termination_condition=combine_term
)
___CODE_BLOCK_23___python
if __name__ == "__main__":
    async def main():
        async with executor:
            await Console(
                team.run_stream(
                    task="Read the superstore.csv file and find the total sales for each region."
                )
            )

    asyncio.run(main())
___CODE_BLOCK_24___python
@tool
async def execute_code(code: str) -> str:
    """
    Use the Jupyter code executor to run your Python code.
    The runtime environment keeps its state, so you can run code step by step.
    reuse variables from earlier code blocks, and avoid writing the same code again.
    :param code: Code waiting to be run, only the code itself, no Markdown syntax
    :return: The result of the code execution.
    """
    code_blocks = [CodeBlock(code=code, language="python")]
    code_result = await executor.execute_code_blocks(
        code_blocks, cancellation_token=CancellationToken()
    )

    return code_result.output
___CODE_BLOCK_25___python
model = ChatOpenAI(
    model="qwen3-next-80b-a3b-instruct",
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    temperature=0.1,
    top_p=0.85,
)

agent = create_agent(
    model=model,
    tools=[execute_code],
    system_prompt=dedent("""
    You are a data analysis assistant, good at solving user questions with Python code.
    You use the `execute_code` tool to run the code and summarize the results as the answer.
    """)
)
___CODE_BLOCK_26___python
async def main():
    async with executor:
        result = await agent.ainvoke(
            {
                "messages": [
                    {
                        "role": "user",
                        "content": "Calculate the value of the 14th Fibonacci number."
                    }
                ]
            }
        )
        for msg in result['messages']:
            print(msg.content)

asyncio.run(main())
___CODE_BLOCK_27___text
Calculate the value of the 14th Fibonacci number.

377
The 14th Fibonacci number is 377.

整个过程:

  1. 模型理解到“需要算 Fibonacci 第 14 项”;
  2. 自动调用 execute_code tool,构造一小段 Python;
  3. 在 Jupyter Kernel 中执行;
  4. 把数值结果整合进最终自然语言回答中。
换个框架,比如 LangGraph,你只要把 execute_code 包装到一个 Node 里; 换成自研框架,只要支持 JSON function call,就能无缝复用这套能力。

🔭 视野扩展:为什么这是“企业级深度数据分析智能体”的底层基建?

从工程视角,这套方案帮你实现了几件关键事情:

  1. 摆脱云端封闭沙箱的锁定
- 不再强制依赖 Azure Dynamic Code Container、Claude Code Executor 等商业服务; - 你可以选 vast.ai、自建 GPU 集群、企业内部算力池; - 成本可控、数据留在内网,合规可审计。
  1. 获得真正的“有状态算力后端”
- 智能体可以像人一样: - 先看 df.head() 再继续; - 先画图再调整参数; - 先训练一轮模型再微调。 - 所有这些都建立在同一个 Jupyter Kernel 的持续状态上。
  1. 多框架可复用的统一算力接口
- 上层可以是: - Autogen 多 Agent 协作; - LangChain / LangGraph 的 Tool + Graph; - 甚至你自己的轻量 Agent 框架; - 底层都共用同一个:Jupyter KernelGateway + 连接信息 + 执行 API
  1. 面向生产的资源管理与部署规范
- 通过 Dockerfile + Docker Compose: - 固化环境依赖; - 挂载数据卷; - 管理网络与端口; - 通过 Kernel culling 参数: - 自动回收闲置 Kernel; - 避免“跑一天挂一台机”的资源浪费。

🧩 最后的小结:你现在已经具备了什么?

读完整篇文章并按步骤实践,你手上实际上已经有了这样一套能力:

  1. 能在任意算力平台(本机、企业内网、vast.ai)部署一个可远程连接的 Jupyter KernelGateway
  2. 能用 Autogen 的 DockerJupyterCodeExecutor 直接连上这个 Server,而不依赖本地 Docker API 启容器。
  3. 能通过多 Agent 编排(planner + codewriter + codeexecutor),让大模型逐步生成、逐步执行、逐步反思代码。
  4. 能把这套 Jupyter 执行能力封装成一个 Tool,给任意支持 function calling 的框架使用。
  5. 能通过 Kernel 生命周期管理,长期稳定地在生产环境中运行,而不担心算力泄漏。
如果说 Function Calling 是给大模型一把“玩具扳手”, 那么 Jupyter 代码沙箱 + 独立部署 + 智能体编排, 则是给它配上一整间“可远程操控的实验室”。
在后续你计划的系列文章中,只要在这个“实验室”的基础上继续往上搭:
  • 更强的规划与执行策略;
  • 数据权限与行级访问控制;
  • 模型选择与路由;
  • 多人协作与审计日志;
你完全可以把这个体系演进成一个企业级、可审计、可扩展的深度数据分析智能体平台——而不是一个“只能在自己电脑上跑跑 demo 的玩具”。

讨论回复

0 条回复

还没有人回复