你现在手里已经有了一个很聪明的大模型。 但它还在“黑框框”的命令行里,一段段地跑 Python,忘性还特别大。 今天这篇,就是教你:如何给它配上一台真正的“远程工作站”,一个有状态、算力强、可控、安全的 Jupyter Server。
现代智能体框架几乎清一色都提供了“代码执行”能力:
要么是 Function Calling,
要么是 MCP、内置 code executor、Azure 动态容器之类。
看上去都能“让模型跑代码”,但有个致命差异——有状态 vs 无状态。
df;
- df.head() 看一眼列名与类型;
- 根据实际列名写出下一段代码,过滤、聚合、建模;
- 整个过程天然是交互式、有记忆的会话。
换句话说: 命令行沙箱像是一次性便利贴,写完就扔; Jupyter 则是一本持续写满上下文的“实验日志本”。你在文章中用的例子也非常典型: 让智能体对一份陌生的
csv 做清洗 + 分析。
如果运行环境是“无状态命令行”,模型根本无法像人一样,先看数据长啥样,再逐步调整代码。
而Jupyter Kernel,天生就是为这种“看一眼,再想下一步”的交互循环设计的。
我们要做的,就是把这套能力,接到你的智能体系统背后。
你的目标并不是“本机随便玩玩”,而是:
你没有直接用官方 Jupyter 镜像,而是从 python:3.13-slim-bookworm 起步,再自己装 Jupyter 相关组件——这给了你足够的灵活定制空间。
# Dockerfile.jupyter
FROM python:3.13-slim-bookworm
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir jupyter_kernel_gateway ipykernel numpy pandas sympy scipy --upgrade
RUN pip install --no-cache-dir -r requirements.txt --upgrade
EXPOSE 8888
ENV TOKEN="UNSET"
CMD python -m jupyter kernelgateway \
--KernelGatewayApp.ip=0.0.0.0 \
--KernelGatewayApp.port=8888 \
--KernelGatewayApp.auth_token="${TOKEN}" \
--JupyterApp.answer_yes=true \
--JupyterWebsocketPersonality.list_kernels=true
对应的 requirements.txt:
matplotlib
xlrd
openpyxl
小贴士
jupyter_kernel_gateway 是这套体系的关键。
它把 Jupyter Kernel 暴露成一个 HTTP / WebSocket API 服务,
智能体框架只要能对它发 REST 请求,就能远程跑代码。
然后本地构建镜像:
docker build -t jupyter-server .
这一步结束,你已经有了一个“可远程调用的 Jupyter Kernel 网关镜像”。
在市面上的智能体框架里,真正对“Jupyter 代码沙箱”给予一等公民支持的,目前主要就是 Autogen。并且它没有把这部分锁进商业服务,而是直接开源在 autogen-ext 里。
先安装:
pip install -U "autogen-agentchat"
pip install "autogen-ext[docker-jupyter-executor]"
Autogen 中和 Jupyter 相关的三个核心角色:
DockerJupyterServerDockerJupyterCodeExecutorCodeExecutorAgentmodel_client,还能自写代码 + 执行 + 反思。
假设我们用刚刚构建的 jupyter-server 镜像,来初始化 DockerJupyterServer:
server = DockerJupyterServer(
custom_image_name="jupyter-server",
expose_port=8888,
token="UNSET",
bind_dir="temp",
)
再基于这个 server 得到 executor:
executor = DockerJupyterCodeExecutor(
jupyter_server=server,
timeout=600,
output_dir=Path("temp")
)
建立一个 CodeExecutorAgent:
code_executor = CodeExecutorAgent(
"code_executor",
code_executor=executor,
)
然后写一个简单的 main,测试“跨代码块变量是否还在”:
async def main():
async with executor:
code1 = TextMessage(
content=dedent("""
```python
x = 1+2
print("Round one: The calculation for the value of x is done.")
```
"""),
source="user"
)
response1 = await code_executor.on_messages(
messages=[code1],
cancellation_token=CancellationToken()
)
print(response1.chat_message.content)
code2 = TextMessage(
content=dedent("""
```python
print("Round two: Get the value of variable x again: x=", x)
```
"""),
source="user",
)
response2 = await code_executor.on_messages(
messages=[code2],
cancellation_token=CancellationToken()
)
print(response2.chat_message.content)
asyncio.run(main())
在这里,async with executor: 维护的是同一个 Jupyter Kernel 会话。
于是第二段代码可以直接访问第一次定义的 x——这正是我们想要的“有状态执行”。
通过 Docker API 起一个 Jupyter 容器,很适合本机研究:
你在自己的开发机/工作站上跑实验,一切就近解决。
可一旦你进入“企业级”和“算力集群”场景,问题就暴露了:
再让智能体容器自己“本地起 Jupyter 容器”,基本等于让小本本背着服务器跑——算力根本用不上。
所以,Docker out-of-Docker 在“玩玩可以,生产不行”。
真正的目标应该是:
智能体应用通过 HTTP / WebSocket 直连一个“已经在远程部署好的 Jupyter KernelGateway / JupyterLab Server”。
你顺着这一思路,掀开了 Autogen 源码的盖子,找到了突破口:
DockerJupyterCodeExecutor.__init__。
class DockerJupyterCodeExecutor(CodeExecutor, Component[DockerJupyterCodeExecutorConfig]):
...
def __init__(
self,
jupyter_server: Union[JupyterConnectable, JupyterConnectionInfo],
kernel_name: str = "python3",
timeout: int = 60,
output_dir: Path | None = None,
):
...
if isinstance(jupyter_server, JupyterConnectable):
self._connection_info = jupyter_server.connection_info
elif isinstance(jupyter_server, JupyterConnectionInfo):
self._connection_info = jupyter_server
这段逻辑的含义非常关键:
jupyter_server 参数可以是两种东西:.connection_info 的“可连接对象”(如 DockerJupyterServer);
- 一个直接的 JupyterConnectionInfo 结构体。
于是我们完全可以绕过 DockerJupyterServer,自己手动构造一个 JupyterConnectionInfo,只要你知道:
host;host='127.0.0.1'
use_https=False
port=8888
token='UNSET'
如果是企业内 / vast.ai 上的 Jupyter,不妨这么获取:
http[s]://host:port/...;executor = DockerJupyterCodeExecutor(
jupyter_server=JupyterConnectionInfo(
host='127.0.0.1',
use_https=False,
port=8888,
token='UNSET'
),
timeout=600,
output_dir=Path("temp"),
)
这一步,就是商业产品 Manus、Claude Code Executor 等核心能力的“开源自制版”: 你不再依赖他们的封闭代码容器, 而是让你自己的智能体直连你自己的 Jupyter Server。当然,如果此时你的 Jupyter 容器还没启动,执行就会报错。 这就引出了下一节:如何优雅地管理 Jupyter 实例。
你可以用一条 docker run 启动这个 Jupyter 容器,例如:
docker run -d -p 8888:8888 --volume temp:/app --name jupyter-server jupyter-server
但随着配置项增多,这种命令行方式很快就会变成人肉记忆游戏。
更优雅的做法,是写一个 docker-compose.yml:
version: "3.8"
services:
jupyter:
image: jupyter-server
container_name: jupyter-server
ports:
- "8888:8888"
volumes:
- ./temp:/app
networks:
- docker_executor
networks:
docker_executor:
driver: bridge
然后:
docker compose up -d # 启动
docker compose down # 停止
挂载 ./temp:/app 的好处是:
temp/ 下放 csv、模型文件等;/app 目录;当 Jupyter 容器启动后,之前基于 JupyterConnectionInfo 的 executor 就能连上了。
你在示例中,使用了一个典型的数据分析流程:
async def main2():
async with executor:
code1 = TextMessage(
content=dedent("""
```python
from pathlib import Path
import pandas as pd
file_path = Path("superstore.csv")
df = pd.read_csv(file_path)
print(df.iloc[:5, :6].head())
```
"""),
source="user",
)
response1 = await code_executor.on_messages(
messages=[code1], cancellation_token=CancellationToken()
)
print(response1.chat_message.content)
code2 = TextMessage(
content=dedent("""
```python
region_sales_sum = df.groupby("Region", as_index=False)["Sales"].sum()
print(region_sales_sum)
```
"""),
source="user",
)
response2 = await code_executor.on_messages(
messages=[code2], cancellation_token=CancellationToken()
)
print(response2.chat_message.content)
asyncio.run(main2())
第一段:载入 superstore.csv,看看前几行,建立直观印象;
第二段:在同一个 Kernel 里继续用 df 做分组求和。
这就是“智能体像一个真实数据分析师一样探索数据”的必要条件:
观察 – 思考 – 再写下一步代码,而不是“一次性打出 200 行脚本豪赌正确性”。
当你用 DockerJupyterServer 这种“短生命周期”的模式时,
Kernel 的数量和资源回收一般由上层负责(容器停了,Kernel 自然没了)。
但现在你走的是独立部署路线:
MappingKernelManager 的 culling(回收)参数,在 Dockerfile 里扩展启动命令:
CMD python -m jupyter kernelgateway \
--KernelGatewayApp.ip=0.0.0.0 \
--KernelGatewayApp.port=8888 \
--KernelGatewayApp.auth_token="${TOKEN}" \
--JupyterApp.answer_yes=true \
--JupyterWebsocketPersonality.list_kernels=true \
--MappingKernelManager.cull_idle_timeout=1800 \
--MappingKernelManager.cull_interval=300 \
--MappingKernelManager.cull_connected=False \
--MappingKernelManager.cull_busy=False
这些参数的含义:
cull_idle_timeout=1800cull_interval=300cull_connected=Falsecull_busy=False这就像给你的 Jupyter Server 请了一个夜班管理员: 看哪个会议室灯一直亮着人却不在,就顺手关掉空调和灯。记得修改完 Dockerfile 后重新 build 镜像,让配置真正生效。
到目前为止,你完成的是一条坚固的“算力管道”:
SYSTEM_PROMPT = dedent("""
You are the task planning helper in the team, good at breaking down complex user requests into smaller sub-tasks that can be done with Python code.
## Duties
1. **Only split tasks**, don’t write code or do the sub-tasks yourself.
2. **Make just one sub-task at a time**, don’t skip steps or merge different steps together.
3. **Think about the context**, use the results from earlier steps to make new and reasonable sub-tasks.
4. **Create tasks step by step**, keep breaking things down until the user’s original request is fully answered.
5. When all sub-tasks are done, **make a summary report based on the work history**.
6. At the very end, output "**TERMINATION**" as the finish signal.
""")
planner = AssistantAgent(
"task_planner",
model_client=model_client,
system_message=SYSTEM_PROMPT,
)
这个角色的行为很接近“产品经理 + 项目经理”:
TERMINATION。这是防止模型“一口气写到底”的关键策略, 强制它走“规划 – 执行 – 观察 – 再规划”的 loop。
SYSTEM_PROMPT = dedent("""
You’re a code helper in the team, good at writing Python code that can run in a stateful Jupyter Kernel based on the task you need to do.
## Responsibilities
1. **Understand the task**: Clearly understand the analysis or data processing request you’re given.
2. **Write code step by step**: Build the code in small, growing steps, making full use of the Jupyter Kernel’s stateful feature (meaning variables, data, and state stay between code blocks), and avoid running the same thing more than once.
3. **Show the output clearly**: Make sure each piece of code shows or returns its result clearly so the team can see and check it.
4. **Follow code format rules**: All Python code must be wrapped in Markdown code blocks like ` ```python ` to keep it easy to read and run.
5. **Reuse context**: Let later code blocks use variables, data frames, models, and other things you set up earlier, without loading or starting them again.
## Examples
When you write Python code, wrap it in a markdown python code block:
__CODE_BLOCK_22__
You can reuse the variable in another code block:
__CODE_BLOCK_23__
""")
code_writer = AssistantAgent(
"code_writer",
model_client=model_client,
system_message=SYSTEM_PROMPT,
)
___CODE_BLOCK_22___python
team = RoundRobinGroupChat(
[planner, code_writer, code_executor],
termination_condition=combine_term
)
___CODE_BLOCK_23___python
if __name__ == "__main__":
async def main():
async with executor:
await Console(
team.run_stream(
task="Read the superstore.csv file and find the total sales for each region."
)
)
asyncio.run(main())
___CODE_BLOCK_24___python
@tool
async def execute_code(code: str) -> str:
"""
Use the Jupyter code executor to run your Python code.
The runtime environment keeps its state, so you can run code step by step.
reuse variables from earlier code blocks, and avoid writing the same code again.
:param code: Code waiting to be run, only the code itself, no Markdown syntax
:return: The result of the code execution.
"""
code_blocks = [CodeBlock(code=code, language="python")]
code_result = await executor.execute_code_blocks(
code_blocks, cancellation_token=CancellationToken()
)
return code_result.output
___CODE_BLOCK_25___python
model = ChatOpenAI(
model="qwen3-next-80b-a3b-instruct",
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=0.1,
top_p=0.85,
)
agent = create_agent(
model=model,
tools=[execute_code],
system_prompt=dedent("""
You are a data analysis assistant, good at solving user questions with Python code.
You use the `execute_code` tool to run the code and summarize the results as the answer.
""")
)
___CODE_BLOCK_26___python
async def main():
async with executor:
result = await agent.ainvoke(
{
"messages": [
{
"role": "user",
"content": "Calculate the value of the 14th Fibonacci number."
}
]
}
)
for msg in result['messages']:
print(msg.content)
asyncio.run(main())
___CODE_BLOCK_27___text
Calculate the value of the 14th Fibonacci number.
377
The 14th Fibonacci number is 377.
整个过程:
execute_code tool,构造一小段 Python;换个框架,比如 LangGraph,你只要把 execute_code 包装到一个 Node 里;
换成自研框架,只要支持 JSON function call,就能无缝复用这套能力。
从工程视角,这套方案帮你实现了几件关键事情:
df.head() 再继续;
- 先画图再调整参数;
- 先训练一轮模型再微调。
- 所有这些都建立在同一个 Jupyter Kernel 的持续状态上。
Jupyter KernelGateway + 连接信息 + 执行 API。
读完整篇文章并按步骤实践,你手上实际上已经有了这样一套能力:
DockerJupyterCodeExecutor 直接连上这个 Server,而不依赖本地 Docker API 启容器。如果说 Function Calling 是给大模型一把“玩具扳手”, 那么 Jupyter 代码沙箱 + 独立部署 + 智能体编排, 则是给它配上一整间“可远程操控的实验室”。在后续你计划的系列文章中,只要在这个“实验室”的基础上继续往上搭:
还没有人回复