隐私保护委托是一种创新的 AI 系统架构,旨在解决现代 AI 应用中的核心矛盾:高性能与隐私保护。在传统方案中,用户要么选择功能强大但可能泄露隐私的大型云端模型,要么选择隐私安全但性能有限的本地小模型。
PAPILLON(Privacy-Preserving Delegation)系统提出了第三种路径:
GEPA(Generative Evolution with Program Analysis)代表了 AI 系统优化的新范式。与传统优化器不同,GEPA 具备以下革命性特点:
GEPA 的核心创新在于将大语言模型的反思能力与进化优化算法相结合,形成了一个自适应的优化闭环:
DspyAdapter 捕获程序执行的完整轨迹:
class DspyAdapter(GEPAAdapter):
def evaluate(self, batch, candidate, capture_traces=True):
# 构建程序实例
program = self.build_program(candidate)
# 捕获详细执行轨迹
trajs = bootstrap_trace_data(
program=program,
dataset=batch,
metric=self.metric_fn,
capture_failed_parses=True, # 捕获失败案例
raise_on_error=False # 容错执行
)
return EvaluationBatch(trajectories=trajs, ...)
class ReflectiveExample(TypedDict):
Inputs: dict[str, Any] # 预测器输入
Generated_Outputs: dict | str # 生成输出或错误信息
Feedback: str # 详细反馈信息
每个反思示例都包含:
candidate_selection_strategy="pareto"
# 从质量-隐私的帕累托前沿随机选择候选
candidate_selection_strategy="current_best"
# 总是选择当前表现最好的候选
component_selector="round_robin"
# 依次优化每个组件
component_selector="all"
# 同时优化所有组件
class SmartComponentSelector(ReflectionComponentSelector):
def select_components(self, state, trajectories):
# 基于性能数据智能选择需要优化的组件
return selected_components
auto_budget 方法实现了智能预算分配:
def auto_budget(self, num_preds, num_candidates, valset_size,
minibatch_size=35, full_eval_steps=5):
import numpy as np
# 基于预测器数量和候选数量计算试验次数
num_trials = int(max(
2 * (num_preds * 2) * np.log2(num_candidates),
1.5 * num_candidates
))
# 计算总预算
total = valset_size # 初始评估
total += num_candidates * 5 # 候选引导
total += num_trials * minibatch_size # 小批量评估
# 添加周期性完整评估
periodic_fulls = (num_trials + 1) // full_eval_steps + 1
total += periodic_fulls * valset_size
return total
AUTO_RUN_SETTINGS = {
"light": {"n": 10, "description": "快速实验"},
"medium": {"n": 25, "description": "平衡优化"},
"heavy": {"n": 50, "description": "深度优化"}
}
GEPA 的合并机制能够整合多个成功候选的优点:
use_merge=True # 启用合并优化
max_merge_invocations=5 # 最大合并次数
合并过程通过以下步骤实现:
PAPILLON 系统的设计遵循最小权限原则和信息流控制两大安全原则:
class CraftRedactedRequest(dspy.Signature):
"""智能隐私保护请求生成器
核心功能:
1. 敏感信息检测与分类
2. 上下文保持的匿名化
3. 任务完整性维护
"""
user_query = dspy.InputField(desc="包含敏感信息的原始用户查询")
llm_request = dspy.OutputField(desc="隐私安全的外部模型请求")
处理策略详解:
class RespondToQuery(dspy.Signature):
"""智能响应整合与上下文恢复
核心功能:
1. 外部响应与本地上下文的融合
2. 个性化信息的恢复与注入
3. 响应质量的优化与校验
"""
related_llm_request = dspy.InputField(desc="发送给外部模型的请求")
related_llm_response = dspy.InputField(desc="外部模型的高质量响应")
user_query = dspy.InputField(desc="用户的原始查询")
response = dspy.OutputField(desc="整合后的最终响应")
整合策略:
class PAPILLON(dspy.Module):
"""PAPILLON主控制器
工作流程:
1. 接收用户查询
2. 隐私保护处理
3. 外部模型委托
4. 响应整合输出
5. 异常处理与恢复
"""
def __init__(self, untrusted_model):
# 使用链式思考进行隐私保护决策
self.craft_redacted_request = dspy.ChainOfThought(CraftRedactedRequest)
# 使用简单预测进行响应整合
self.respond_to_query = dspy.Predict(RespondToQuery)
# 存储外部模型引用
self.untrusted_model = untrusted_model
def forward(self, user_query):
try:
# 第一阶段:隐私保护处理
privacy_result = self.craft_redacted_request(user_query=user_query)
llm_request = privacy_result.llm_request
# 第二阶段:外部模型委托
llm_response = self.untrusted_model(llm_request)[0]
# 第三阶段:响应整合
final_result = self.respond_to_query(
related_llm_request=llm_request,
related_llm_response=llm_response,
user_query=user_query
)
return dspy.Prediction(
llm_request=llm_request,
llm_response=llm_response,
response=final_result.response
)
except Exception as e:
# 优雅的异常处理
logger.warning(f"PAPILLON处理异常: {e}")
return dspy.Prediction(
llm_request="",
llm_response="",
response="抱歉,系统暂时无法处理您的请求,请稍后重试。"
)
硬件要求:
# 基础依赖安装
pip install dspy-ai>=2.5.0
pip install datasets>=2.0.0
pip install numpy>=1.21.0
pip install pandas>=1.3.0
# MLflow集成(实验跟踪)
pip install mlflow>=3.0.0
# WandB集成(可选)
pip install wandb>=0.16.0
# 多模态支持(可选)
pip install pillow>=8.0.0
# 性能优化依赖
pip install psutil>=5.8.0
pip install tqdm>=4.60.0
import dspy
import os
from typing import Optional
class ModelManager:
"""统一的模型管理器"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
if not self.api_key:
self.api_key = input("请输入您的 OpenAI API 密钥: ")
def setup_models(self, local_model: str = "openai/gpt-4.1-nano",
large_model: str = "openai/gpt-4.1-mini",
reflection_model: str = "openai/gpt-4.1"):
"""配置三层模型架构"""
# 本地隐私保护模型
self.local_lm = dspy.LM(
model=local_model,
api_key=self.api_key,
max_tokens=4000,
temperature=0.1, # 较低温度确保一致性
timeout=30
)
# 外部大模型(用于委托)
self.large_lm = dspy.LM(
model=large_model,
api_key=self.api_key,
max_tokens=8000,
temperature=0.3, # 适中温度平衡创造性和准确性
timeout=60
)
# 反思模型(用于GEPA优化)
self.reflection_lm = dspy.LM(
model=reflection_model,
api_key=self.api_key,
max_tokens=32000, # 大量token用于深度分析
temperature=1.0, # 高温度促进创造性思考
timeout=120
)
# 设置默认模型
dspy.configure(lm=self.local_lm)
return self.local_lm, self.large_lm, self.reflection_lm
# 使用示例
model_manager = ModelManager()
local_lm, large_lm, reflection_lm = model_manager.setup_models()
print(f"模型配置完成:")
print(f"- 本地模型: {local_lm.model}")
print(f"- 大型模型: {large_lm.model}")
print(f"- 反思模型: {reflection_lm.model}")
import mlflow
import mlflow.dspy
from datetime import datetime
class ExperimentTracker:
"""实验跟踪管理器"""
def __init__(self, tracking_uri: str = "http://localhost:5000",
experiment_name: str = None):
self.tracking_uri = tracking_uri
self.experiment_name = experiment_name or f"GEPA-PAPILLON-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
def setup_mlflow(self):
"""配置 MLflow 跟踪"""
try:
mlflow.set_tracking_uri(self.tracking_uri)
mlflow.set_experiment(self.experiment_name)
# 启用详细的自动日志记录
mlflow.dspy.autolog(
log_compiles=True, # 记录编译过程
log_evals=True, # 记录评估结果
log_traces=True, # 记录执行轨迹
log_datasets=True, # 记录数据集信息
log_metrics=True, # 记录性能指标
capture_input_output=True # 捕获输入输出
)
print(f"MLflow 跟踪已启动: {self.tracking_uri}")
print(f"实验名称: {self.experiment_name}")
return True
except Exception as e:
print(f"MLflow 配置失败: {e}")
print("继续执行但不记录实验数据")
return False
def log_custom_metrics(self, metrics: dict):
"""记录自定义指标"""
for key, value in metrics.items():
mlflow.log_metric(key, value)
# 启动实验跟踪
tracker = ExperimentTracker()
tracker.setup_mlflow()
import wandb
import os
def setup_wandb(project_name: str = "gepa-papillon",
api_key: Optional[str] = None):
"""配置 Weights & Biases 跟踪"""
api_key = api_key or os.getenv("WANDB_API_KEY")
if api_key:
os.environ["WANDB_API_KEY"] = api_key
# 初始化 wandb
wandb.init(
project=project_name,
name=f"papillon-optimization-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
config={
"optimizer": "GEPA",
"task": "privacy-preserving-delegation",
"framework": "DSPy"
}
)
return wandb.config
# 可选的 WandB 配置
# wandb_config = setup_wandb()
import psutil
import time
from threading import Thread
class PerformanceMonitor:
"""系统性能监控器"""
def __init__(self):
self.monitoring = False
self.stats = []
def start_monitoring(self):
"""开始性能监控"""
self.monitoring = True
monitor_thread = Thread(target=self._monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def stop_monitoring(self):
"""停止性能监控"""
self.monitoring = False
return self.get_summary()
def _monitor_loop(self):
"""监控循环"""
while self.monitoring:
stats = {
'timestamp': time.time(),
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'memory_used_gb': psutil.virtual_memory().used / (1024**3)
}
self.stats.append(stats)
time.sleep(5) # 每5秒记录一次
def get_summary(self):
"""获取性能摘要"""
if not self.stats:
return {}
cpu_avg = sum(s['cpu_percent'] for s in self.stats) / len(self.stats)
mem_avg = sum(s['memory_percent'] for s in self.stats) / len(self.stats)
mem_max = max(s['memory_used_gb'] for s in self.stats)
return {
'average_cpu_percent': cpu_avg,
'average_memory_percent': mem_avg,
'peak_memory_gb': mem_max,
'duration_minutes': (self.stats[-1]['timestamp'] - self.stats[0]['timestamp']) / 60
}
# 启动性能监控
perf_monitor = PerformanceMonitor()
perf_monitor.start_monitoring()
class CraftRedactedRequest(dspy.Signature):
"""
给定私人用户查询,创建隐私保护的外部 LLM 请求。
"""
user_query = dspy.InputField()
llm_request = dspy.OutputField()
class RespondToQuery(dspy.Signature):
"""
基于外部 LLM 响应回答用户查询。
"""
related_llm_request = dspy.InputField()
related_llm_response = dspy.InputField(desc="外部 LLM 的响应信息")
user_query = dspy.InputField(desc="用户的原始请求")
response = dspy.OutputField(desc="最终响应")
class PAPILLON(dspy.Module):
def __init__(self, untrusted_model):
self.craft_redacted_request = dspy.ChainOfThought(CraftRedactedRequest)
self.respond_to_query = dspy.Predict(RespondToQuery)
self.untrusted_model = untrusted_model
def forward(self, user_query):
try:
# 步骤1: 创建隐私保护请求
llm_request = self.craft_redacted_request(user_query=user_query).llm_request
# 步骤2: 获取外部模型响应
llm_response = self.untrusted_model(llm_request)[0]
# 步骤3: 生成最终响应
response = self.respond_to_query(
related_llm_request=llm_request,
related_llm_response=llm_response,
user_query=user_query
).response
except Exception:
return dspy.Prediction(llm_request="", llm_response="", response="")
return dspy.Prediction(llm_request=llm_request, llm_response=llm_response, response=response)
from datasets import load_dataset
# 加载 PUPA 数据集
pupa_new = load_dataset("Columbia-NLP/PUPA", "pupa_new")
examples = [
dspy.Example({
"target_response": x["target_response"],
"user_query": x["user_query"],
"pii_str": x["pii_units"]
}).with_inputs("user_query")
for x in pupa_new["train"]
]
trainset, devset, testset = examples[:225], examples[225:450], examples[450:]
print(f"训练集: {len(trainset)}, 开发集: {len(devset)}, 测试集: {len(testset)}")
class JudgeQuality(dspy.Signature):
"""比较两个响应的质量"""
user_query = dspy.InputField(desc="用户请求")
response_A = dspy.InputField()
response_B = dspy.InputField()
judgment: bool = dspy.OutputField()
class JudgeLeakage(dspy.Signature):
"""评估 PII 信息泄露程度"""
pii = dspy.InputField()
prompt = dspy.InputField()
num_pii_leaked: int = dspy.OutputField()
class LLMJudge(dspy.Module):
def __init__(self):
self.quality_judge = dspy.ChainOfThought(JudgeQuality)
self.fact_checker = dspy.ChainOfThought(JudgeLeakage)
def forward(self, user_query, og_resp, new_resp=None, updated_query=None, pii_str=None):
# 质量评估
judgment_1 = self.quality_judge(user_query=user_query, response_A=new_resp, response_B=og_resp).judgment
judgment_2 = self.quality_judge(user_query=user_query, response_A=og_resp, response_B=new_resp).judgment
judgment = judgment_1 or (judgment_1 == judgment_2)
# 隐私泄露评估
pii = list(set(pii_str.split("||")))
pii_score = self.fact_checker(pii=pii, prompt=updated_query).num_pii_leaked
pii_score = pii_score / len(pii) if len(pii) > 0 else 0
return dspy.Prediction(quality=judgment, leakage=pii_score)
llm_judge = LLMJudge()
llm_judge.set_lm(large_lm)
def compute_metrics(gold, pred, trace=None):
return llm_judge(
user_query=gold.user_query,
new_resp=pred.response,
og_resp=gold.target_response,
updated_query=pred.llm_request,
pii_str=gold.pii_str,
)
def compute_overall_score(gold, pred, trace=None):
metrics = compute_metrics(gold, pred, trace)
overall_score = (metrics.quality + (1 - metrics.leakage)) / 2.0
return overall_score
# GEPA 专用的带反馈评估函数
def compute_overall_score_with_feedback(gold, pred, trace=None, pred_name=None, pred_trace=None):
metrics = compute_metrics(gold, pred, trace)
overall_score = (metrics.quality + (1 - metrics.leakage)) / 2.0
feedback_text = (
f"总体得分: {overall_score:.2f} = "
f"质量得分({metrics.quality:.2f}) + 隐私得分({1-metrics.leakage:.2f}) / 2. "
f"需要提高响应质量并减少 PII 泄露。"
)
return dspy.Prediction(score=overall_score, feedback=feedback_text)
# 创建未优化的系统
zeroshot = PAPILLON(untrusted_model=large_lm)
# 评估配置
kwargs = dict(num_threads=16, display_progress=True, display_table=5, max_errors=100)
evaluate = dspy.Evaluate(metric=compute_overall_score, devset=testset, **kwargs)
# 执行基线评估
baseline_result = evaluate(zeroshot)
# 结果: 163.71 / 214 (76.5%)
from dspy import GEPA
# 创建 PAPILLON 实例
papillon = PAPILLON(untrusted_model=large_lm)
papillon.set_lm(local_lm)
# 配置 GEPA 优化器
compiler = GEPA(
metric=compute_overall_score_with_feedback,
reflection_lm=dspy.LM(model="openai/gpt-4.1", api_key=api_key),
num_threads=16,
track_stats=True,
track_best_outputs=True,
max_full_evals=1 # 演示用,实际推荐使用 auto="heavy"
)
# 执行优化
optimized_papillon = compiler.compile(
student=papillon,
trainset=trainset,
valset=devset,
)
# 查看优化生成的新提示
print(optimized_papillon.craft_redacted_request.predict.signature.instructions)
GEPA 生成的优化提示包含:
# 评估优化后的系统
optimized_result = evaluate(optimized_papillon)
# 结果: 184.26 / 214 (86.1%)
print(f"基线性能: 76.5%")
print(f"优化后性能: 86.1%")
print(f"性能提升: +9.6%")
# 生产环境推荐
compiler = GEPA(
metric=compute_overall_score_with_feedback,
auto="heavy", # 重度预算获得最佳性能
num_threads=16
)
# 快速验证
compiler = GEPA(
metric=compute_overall_score_with_feedback,
max_full_evals=3, # 限制预算快速测试
num_threads=8
)
# 多层次隐私保护
PRIVACY_LEVELS = {
'high': '积极匿名化所有敏感信息',
'medium': '选择性处理关键 PII',
'low': '最小化处理仅邮箱等明显标识'
}
本教程展示了如何使用 GEPA 优化器显著提升 PAPILLON 隐私保护委托系统的性能。通过反思性优化和详细的反馈机制,GEPA 能够在极小的评估预算下实现质量和隐私保护的双重提升。
关键收获: