本教程将带您深入了解如何使用GEPA (Generalized Error-driven Prompt Augmentation) 来优化结构化信息抽取和分类任务。我们将使用Meta发布的 Facility Support Analyzer 数据集作为示例,展示如何从企业环境中的邮件或消息中提取紧急程度、评估情感以及识别相关的服务请求类别。
pip install dspy
pip install mlflow>=3.0.0 # 可选:用于实验跟踪
MLflow是一个LLMOps工具,可以与DSPy原生集成,提供可解释性和实验跟踪功能。MLflow的自动日志记录功能可以自动跟踪GEPA优化进展,并将提示和模块执行可视化为轨迹。
设置步骤:
mlflow ui --port 5000 --backend-store-uri sqlite:///mlruns.db
import mlflow
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("DSPy")
mlflow.dspy.autolog(
log_compiles=True, # 记录优化进度
log_evals=True, # 记录评估结果
log_traces=True # 记录模块执行轨迹
)
我们使用GPT-4.1 nano来演示小模型如何通过GEPA进行调优:
import dspy
api_key = input("Enter your OpenAI API key: ")
lm = dspy.LM("openai/gpt-4.1-nano", temperature=1, api_key=api_key)
dspy.configure(lm=lm)
import requests
import json
import random
def init_dataset():
# 从URL加载数据
url = "https://raw.githubusercontent.com/meta-llama/llama-prompt-ops/refs/heads/main/use-cases/facility-support-analyzer/dataset.json"
dataset = json.loads(requests.get(url).text)
# 转换为DSPy格式
dspy_dataset = [
dspy.Example({
"message": d['fields']['input'],
"answer": d['answer'],
}).with_inputs("message")
for d in dataset
]
# 随机打乱并分割数据集
random.Random(0).shuffle(dspy_dataset)
train_set = dspy_dataset[:int(len(dspy_dataset) * 0.33)]
val_set = dspy_dataset[int(len(dspy_dataset) * 0.33):int(len(dspy_dataset) * 0.66)]
test_set = dspy_dataset[int(len(dspy_dataset) * 0.66):]
return train_set, val_set, test_set
# 加载数据
train_set, val_set, test_set = init_dataset()
print(f"训练集: {len(train_set)}, 验证集: {len(val_set)}, 测试集: {len(test_set)}")
print("输入消息示例:")
print(train_set[0]['message'])
print("\n\n标准答案:")
gold_answer = json.loads(train_set[0]['answer'])
for k, v in gold_answer.items():
print(f"{k}: {v}")
输出示例:
输入消息示例:
Subject: Adjusting Bi-Weekly Cleaning Schedule for My Office
Dear ProCare Facility Solutions Support Team,
I hope this message finds you well. My name is Dr. Alex Turner, and I have been utilizing your services for my office space for the past year. I must say, your team's dedication to maintaining a pristine environment has been commendable and greatly appreciated.
I am reaching out to discuss the scheduling of our regular cleaning services...
标准答案:
categories: {'routine_maintenance_requests': False, 'customer_feedback_and_complaints': False, ...}
sentiment: neutral
urgency: low
我们构建一个3模块系统,分别处理紧急程度、情感和类别分类:
from typing import List, Literal
class FacilitySupportAnalyzerUrgency(dspy.Signature):
"""
读取提供的消息并确定紧急程度。
"""
message: str = dspy.InputField()
urgency: Literal['low', 'medium', 'high'] = dspy.OutputField()
class FacilitySupportAnalyzerSentiment(dspy.Signature):
"""
读取提供的消息并确定情感倾向。
"""
message: str = dspy.InputField()
sentiment: Literal['positive', 'neutral', 'negative'] = dspy.OutputField()
class FacilitySupportAnalyzerCategories(dspy.Signature):
"""
读取提供的消息并确定适用于该消息的类别集合。
"""
message: str = dspy.InputField()
categories: List[Literal[
"emergency_repair_services",
"routine_maintenance_requests",
"quality_and_safety_concerns",
"specialized_cleaning_services",
"general_inquiries",
"sustainability_and_environmental_practices",
"training_and_support_requests",
"cleaning_services_scheduling",
"customer_feedback_and_complaints",
"facility_management_issues"
]] = dspy.OutputField()
class FacilitySupportAnalyzerMM(dspy.Module):
def __init__(self):
self.urgency_module = dspy.ChainOfThought(FacilitySupportAnalyzerUrgency)
self.sentiment_module = dspy.ChainOfThought(FacilitySupportAnalyzerSentiment)
self.categories_module = dspy.ChainOfThought(FacilitySupportAnalyzerCategories)
def forward(self, message: str):
urgency = self.urgency_module(message=message)
sentiment = self.sentiment_module(message=message)
categories = self.categories_module(message=message)
return dspy.Prediction(
urgency=urgency.urgency,
sentiment=sentiment.sentiment,
categories=categories.categories
)
# 创建程序实例
program = FacilitySupportAnalyzerMM()
def score_urgency(gold_urgency, pred_urgency):
"""计算紧急程度模块的得分"""
return 1.0 if gold_urgency == pred_urgency else 0.0
def score_sentiment(gold_sentiment, pred_sentiment):
"""计算情感分析模块的得分"""
return 1.0 if gold_sentiment == pred_sentiment else 0.0
def score_categories(gold_categories, pred_categories):
"""计算类别分类模块的得分"""
correct = 0
for k, v in gold_categories.items():
if v and k in pred_categories:
correct += 1
elif not v and k not in pred_categories:
correct += 1
return correct / len(gold_categories)
def metric(example, pred, trace=None, pred_name=None, pred_trace=None):
"""
基于预测和标准答案的类别、情感和紧急程度一致性计算得分
"""
gold = json.loads(example['answer'])
# 计算所有模块的得分
score_urgency_val = score_urgency(gold['urgency'], pred.urgency)
score_sentiment_val = score_sentiment(gold['sentiment'], pred.sentiment)
score_categories_val = score_categories(gold['categories'], pred.categories)
# 总体得分:三个准确率的平均值
total = (score_urgency_val + score_sentiment_val + score_categories_val) / 3
return total
# 评估未优化的程序
evaluate = dspy.Evaluate(
devset=test_set,
metric=metric,
num_threads=32,
display_table=True,
display_progress=True
)
baseline_result = evaluate(program)
print(f"基线性能: {baseline_result.score:.1f}%")
预期输出:
Average Metric: 51.30 / 68 (75.4%)
基线性能: 75.4%
GEPA是一个反思性提示优化器,其优势在于能够检查DSPy程序执行和评估流水线的文本反馈。我们需要为GEPA提供具体的文本反馈:
def feedback_urgency(gold_urgency, pred_urgency):
"""为紧急程度模块生成反馈"""
score = 1.0 if gold_urgency == pred_urgency else 0.0
if gold_urgency == pred_urgency:
feedback = f"您正确地将消息的紧急程度分类为 `{gold_urgency}`。这条消息确实是 `{gold_urgency}` 紧急程度。"
else:
feedback = f"您错误地将消息的紧急程度分类为 `{pred_urgency}`。正确的紧急程度是 `{gold_urgency}`。请思考如何推理才能得到正确的紧急程度标签。"
return feedback, score
def feedback_sentiment(gold_sentiment, pred_sentiment):
"""为情感分析模块生成反馈"""
score = 1.0 if gold_sentiment == pred_sentiment else 0.0
if gold_sentiment == pred_sentiment:
feedback = f"您正确地将消息的情感分类为 `{gold_sentiment}`。这条消息确实是 `{gold_sentiment}`。"
else:
feedback = f"您错误地将消息的情感分类为 `{pred_sentiment}`。正确的情感是 `{gold_sentiment}`。请思考如何推理才能得到正确的情感标签。"
return feedback, score
def feedback_categories(gold_categories, pred_categories):
"""为类别分类模块生成反馈"""
correctly_included = [k for k, v in gold_categories.items() if v and k in pred_categories]
incorrectly_included = [k for k, v in gold_categories.items() if not v and k in pred_categories]
incorrectly_excluded = [k for k, v in gold_categories.items() if v and k not in pred_categories]
correctly_excluded = [k for k, v in gold_categories.items() if not v and k not in pred_categories]
# 重新计算类别准确率
score = (len(correctly_included) + len(correctly_excluded)) / len(gold_categories)
if score == 1.0:
fb_text = f"类别分类是完美的。您正确识别了消息属于以下类别:`{repr(correctly_included)}`。"
else:
fb_text = f"类别分类不完美。您正确识别了消息属于以下类别:`{repr(correctly_included)}`。\n"
if incorrectly_included:
fb_text += f"但是,您错误地识别消息属于以下类别:`{repr(incorrectly_included)}`。消息实际上不属于这些类别。\n"
if incorrectly_excluded:
prefix = "另外," if incorrectly_included else "但是,"
fb_text += f"{prefix}您没有识别出消息实际属于的以下类别:`{repr(incorrectly_excluded)}`。\n"
fb_text += "请思考如何推理才能得到正确的类别标签。"
return fb_text, score
def metric_with_feedback(example, pred, trace=None, pred_name=None, pred_trace=None):
"""
带反馈的评估指标,支持模块级别的反馈
"""
gold = json.loads(example['answer'])
# 计算所有模块的反馈和得分
fb_urgency, score_urgency = feedback_urgency(gold['urgency'], pred.urgency)
fb_sentiment, score_sentiment = feedback_sentiment(gold['sentiment'], pred.sentiment)
fb_categories, score_categories = feedback_categories(gold['categories'], pred.categories)
# 总体得分
total = (score_urgency + score_sentiment + score_categories) / 3
if pred_name is None:
return total
# 根据预测器名称返回相应的反馈
elif pred_name == 'urgency_module.predict':
feedback = fb_urgency
elif pred_name == 'sentiment_module.predict':
feedback = fb_sentiment
elif pred_name == 'categories_module.predict':
feedback = fb_categories
return dspy.Prediction(score=total, feedback=feedback)
from dspy import GEPA
optimizer = GEPA(
metric=metric_with_feedback,
auto="light", # 使用轻量级预算,生产环境建议使用 "heavy"
num_threads=32,
track_stats=True,
use_merge=False,
reflection_lm=dspy.LM(model="gpt-5", temperature=1.0, max_tokens=32000, api_key=api_key)
)
# 运行GEPA优化
optimized_program = optimizer.compile(
student=program,
trainset=train_set,
valset=val_set
)
优化过程输出示例:
INFO: Running GEPA for approx 1643 metric calls of the program.
INFO: Iteration 0: Base program full valset score: 0.7207070707070706
INFO: Iteration 1: Selected program 0 score: 0.7207070707070706
INFO: Iteration 1: Proposed new text for urgency_module.predict:
Task: 确定客户消息到ProCare Facility Solutions的紧急程度。
上下文和领域:
- 消息通常发送给ProCare Facility Solutions的支持团队,涉及设施服务
- 常见主题包括清洁质量、HVAC性能/安全、日常维护调度和一般查询
如何评估紧急程度:
使用以下主要因素:
1) 安全和风险:
- 高/紧急:存在直接安全危险或潜在伤害
- 中等:提到安全但描述为轻微或无迫在眉睫的危险迹象
2) 运营影响:
- 高:关键系统中断或问题阻止正常运营
- 中等:服务降级或质量不一致需要及时关注
- 低:没有描述运营影响且消息仅为信息性
...
# 评估优化后的程序
optimized_result = evaluate(optimized_program)
print(f"优化后性能: {optimized_result.score:.1f}%")
print(f"性能提升: {optimized_result.score - baseline_result.score:.1f}百分点")
预期输出:
Average Metric: 56.83 / 66 (86.1%)
优化后性能: 86.1%
性能提升: 10.7百分点
# 检查优化后的提示
print("=== 紧急程度模块优化后的提示 ===")
print(optimized_program.urgency_module.predict.signature.instructions)
print("\n=== 情感分析模块优化后的提示 ===")
print(optimized_program.sentiment_module.predict.signature.instructions)
print("\n=== 类别分类模块优化后的提示 ===")
print(optimized_program.categories_module.predict.signature.instructions)
import pandas as pd
# 对比分析
def detailed_evaluation(program, dataset, name):
results = []
for example in dataset:
pred = program(message=example['message'])
gold = json.loads(example['answer'])
urgency_correct = gold['urgency'] == pred.urgency
sentiment_correct = gold['sentiment'] == pred.sentiment
categories_score = score_categories(gold['categories'], pred.categories)
results.append({
'urgency_correct': urgency_correct,
'sentiment_correct': sentiment_correct,
'categories_score': categories_score,
'overall_score': metric(example, pred)
})
df = pd.DataFrame(results)
print(f"\n=== {name} 详细分析 ===")
print(f"紧急程度准确率: {df['urgency_correct'].mean():.3f}")
print(f"情感分析准确率: {df['sentiment_correct'].mean():.3f}")
print(f"类别分类平均得分: {df['categories_score'].mean():.3f}")
print(f"总体平均得分: {df['overall_score'].mean():.3f}")
return df
# 详细评估
baseline_df = detailed_evaluation(program, test_set, "基线模型")
optimized_df = detailed_evaluation(optimized_program, test_set, "优化后模型")
# 生产环境推荐配置
optimizer = GEPA(
metric=metric_with_feedback,
auto="heavy", # 更大的优化预算
num_threads=32,
track_stats=True,
use_merge=True, # 启用模块合并
reflection_lm=dspy.LM(model="gpt-4", temperature=0.7, max_tokens=16000)
)
auto="heavy" 或增加手动预算num_threads 参数def advanced_feedback_categories(gold_categories, pred_categories, message_context):
"""基于消息上下文生成更详细的反馈"""
# 分析消息中的关键词
keywords = extract_keywords(message_context)
# 生成上下文相关的反馈
feedback = generate_contextual_feedback(
gold_categories, pred_categories, keywords
)
return feedback, score
# 先优化单个模块,再优化整体
urgency_optimizer = GEPA(metric=urgency_specific_metric, auto="light")
sentiment_optimizer = GEPA(metric=sentiment_specific_metric, auto="light")
overall_optimizer = GEPA(metric=overall_metric, auto="heavy")
# 分阶段优化
program = urgency_optimizer.compile(program, trainset, valset)
program = sentiment_optimizer.compile(program, trainset, valset)
program = overall_optimizer.compile(program, trainset, valset)
# 结合不同优化方法
from dspy import BootstrapFewShot
# 先用少样本学习初始化
bootstrap = BootstrapFewShot(metric=metric)
bootstrapped_program = bootstrap.compile(program, trainset)
# 再用GEPA精细优化
gepa = GEPA(metric=metric_with_feedback, auto="heavy")
final_program = gepa.compile(bootstrapped_program, trainset, valset)
optimized_program.save("facility_analyzer_v1")本教程展示了如何使用GEPA优化企业信息抽取任务:
关键要点: