## 概述
本教程将带您深入了解如何使用GEPA (Generalized Error-driven Prompt Augmentation) 来优化结构化信息抽取和分类任务。我们将使用Meta发布的 [Facility Support Analyzer](https://github.com/meta-llama/llama-prompt-ops/tree/main/use-cases/facility-support-analyzer) 数据集作为示例,展示如何从企业环境中的邮件或消息中提取紧急程度、评估情感以及识别相关的服务请求类别。
### 主要目标
- **紧急程度分类**:判断消息的紧急级别 (low/medium/high)
- **情感分析**:识别消息的情感倾向 (positive/neutral/negative)
- **类别识别**:识别相关的服务请求类别
## 环境准备
### 安装依赖
```bash
pip install dspy
pip install mlflow>=3.0.0 # 可选:用于实验跟踪
```
### MLflow集成(推荐)
MLflow是一个LLMOps工具,可以与DSPy原生集成,提供可解释性和实验跟踪功能。MLflow的自动日志记录功能可以自动跟踪GEPA优化进展,并将提示和模块执行可视化为轨迹。
**设置步骤:**
1. 启动MLflow UI
```bash
mlflow ui --port 5000 --backend-store-uri sqlite:///mlruns.db
```
2. 连接到MLflow
```python
import mlflow
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("DSPy")
```
3. 启用自动日志记录
```python
mlflow.dspy.autolog(
log_compiles=True, # 记录优化进度
log_evals=True, # 记录评估结果
log_traces=True # 记录模块执行轨迹
)
```
## 步骤1:设置语言模型
我们使用GPT-4.1 nano来演示小模型如何通过GEPA进行调优:
```python
import dspy
api_key = input("Enter your OpenAI API key: ")
lm = dspy.LM("openai/gpt-4.1-nano", temperature=1, api_key=api_key)
dspy.configure(lm=lm)
```
## 步骤2:加载和准备数据集
### 数据加载函数
```python
import requests
import json
import random
def init_dataset():
# 从URL加载数据
url = "https://raw.githubusercontent.com/meta-llama/llama-prompt-ops/refs/heads/main/use-cases/facility-support-analyzer/dataset.json"
dataset = json.loads(requests.get(url).text)
# 转换为DSPy格式
dspy_dataset = [
dspy.Example({
"message": d['fields']['input'],
"answer": d['answer'],
}).with_inputs("message")
for d in dataset
]
# 随机打乱并分割数据集
random.Random(0).shuffle(dspy_dataset)
train_set = dspy_dataset[:int(len(dspy_dataset) * 0.33)]
val_set = dspy_dataset[int(len(dspy_dataset) * 0.33):int(len(dspy_dataset) * 0.66)]
test_set = dspy_dataset[int(len(dspy_dataset) * 0.66):]
return train_set, val_set, test_set
# 加载数据
train_set, val_set, test_set = init_dataset()
print(f"训练集: {len(train_set)}, 验证集: {len(val_set)}, 测试集: {len(test_set)}")
```
### 数据示例
```python
print("输入消息示例:")
print(train_set[0]['message'])
print("\n\n标准答案:")
gold_answer = json.loads(train_set[0]['answer'])
for k, v in gold_answer.items():
print(f"{k}: {v}")
```
**输出示例:**
```
输入消息示例:
Subject: Adjusting Bi-Weekly Cleaning Schedule for My Office
Dear ProCare Facility Solutions Support Team,
I hope this message finds you well. My name is Dr. Alex Turner, and I have been utilizing your services for my office space for the past year. I must say, your team's dedication to maintaining a pristine environment has been commendable and greatly appreciated.
I am reaching out to discuss the scheduling of our regular cleaning services...
标准答案:
categories: {'routine_maintenance_requests': False, 'customer_feedback_and_complaints': False, ...}
sentiment: neutral
urgency: low
```
## 步骤3:定义DSPy程序架构
我们构建一个3模块系统,分别处理紧急程度、情感和类别分类:
```python
from typing import List, Literal
class FacilitySupportAnalyzerUrgency(dspy.Signature):
"""
读取提供的消息并确定紧急程度。
"""
message: str = dspy.InputField()
urgency: Literal['low', 'medium', 'high'] = dspy.OutputField()
class FacilitySupportAnalyzerSentiment(dspy.Signature):
"""
读取提供的消息并确定情感倾向。
"""
message: str = dspy.InputField()
sentiment: Literal['positive', 'neutral', 'negative'] = dspy.OutputField()
class FacilitySupportAnalyzerCategories(dspy.Signature):
"""
读取提供的消息并确定适用于该消息的类别集合。
"""
message: str = dspy.InputField()
categories: List[Literal[
"emergency_repair_services",
"routine_maintenance_requests",
"quality_and_safety_concerns",
"specialized_cleaning_services",
"general_inquiries",
"sustainability_and_environmental_practices",
"training_and_support_requests",
"cleaning_services_scheduling",
"customer_feedback_and_complaints",
"facility_management_issues"
]] = dspy.OutputField()
class FacilitySupportAnalyzerMM(dspy.Module):
def __init__(self):
self.urgency_module = dspy.ChainOfThought(FacilitySupportAnalyzerUrgency)
self.sentiment_module = dspy.ChainOfThought(FacilitySupportAnalyzerSentiment)
self.categories_module = dspy.ChainOfThought(FacilitySupportAnalyzerCategories)
def forward(self, message: str):
urgency = self.urgency_module(message=message)
sentiment = self.sentiment_module(message=message)
categories = self.categories_module(message=message)
return dspy.Prediction(
urgency=urgency.urgency,
sentiment=sentiment.sentiment,
categories=categories.categories
)
# 创建程序实例
program = FacilitySupportAnalyzerMM()
```
## 步骤4:定义评估指标
### 基础评估函数
```python
def score_urgency(gold_urgency, pred_urgency):
"""计算紧急程度模块的得分"""
return 1.0 if gold_urgency == pred_urgency else 0.0
def score_sentiment(gold_sentiment, pred_sentiment):
"""计算情感分析模块的得分"""
return 1.0 if gold_sentiment == pred_sentiment else 0.0
def score_categories(gold_categories, pred_categories):
"""计算类别分类模块的得分"""
correct = 0
for k, v in gold_categories.items():
if v and k in pred_categories:
correct += 1
elif not v and k not in pred_categories:
correct += 1
return correct / len(gold_categories)
def metric(example, pred, trace=None, pred_name=None, pred_trace=None):
"""
基于预测和标准答案的类别、情感和紧急程度一致性计算得分
"""
gold = json.loads(example['answer'])
# 计算所有模块的得分
score_urgency_val = score_urgency(gold['urgency'], pred.urgency)
score_sentiment_val = score_sentiment(gold['sentiment'], pred.sentiment)
score_categories_val = score_categories(gold['categories'], pred.categories)
# 总体得分:三个准确率的平均值
total = (score_urgency_val + score_sentiment_val + score_categories_val) / 3
return total
```
## 步骤5:未优化程序的评估
```python
# 评估未优化的程序
evaluate = dspy.Evaluate(
devset=test_set,
metric=metric,
num_threads=32,
display_table=True,
display_progress=True
)
baseline_result = evaluate(program)
print(f"基线性能: {baseline_result.score:.1f}%")
```
**预期输出:**
```
Average Metric: 51.30 / 68 (75.4%)
基线性能: 75.4%
```
## 步骤6:为GEPA创建带反馈的评估指标
GEPA是一个*反思性*提示优化器,其优势在于能够检查DSPy程序执行和评估流水线的文本反馈。我们需要为GEPA提供具体的文本反馈:
```python
def feedback_urgency(gold_urgency, pred_urgency):
"""为紧急程度模块生成反馈"""
score = 1.0 if gold_urgency == pred_urgency else 0.0
if gold_urgency == pred_urgency:
feedback = f"您正确地将消息的紧急程度分类为 `{gold_urgency}`。这条消息确实是 `{gold_urgency}` 紧急程度。"
else:
feedback = f"您错误地将消息的紧急程度分类为 `{pred_urgency}`。正确的紧急程度是 `{gold_urgency}`。请思考如何推理才能得到正确的紧急程度标签。"
return feedback, score
def feedback_sentiment(gold_sentiment, pred_sentiment):
"""为情感分析模块生成反馈"""
score = 1.0 if gold_sentiment == pred_sentiment else 0.0
if gold_sentiment == pred_sentiment:
feedback = f"您正确地将消息的情感分类为 `{gold_sentiment}`。这条消息确实是 `{gold_sentiment}`。"
else:
feedback = f"您错误地将消息的情感分类为 `{pred_sentiment}`。正确的情感是 `{gold_sentiment}`。请思考如何推理才能得到正确的情感标签。"
return feedback, score
def feedback_categories(gold_categories, pred_categories):
"""为类别分类模块生成反馈"""
correctly_included = [k for k, v in gold_categories.items() if v and k in pred_categories]
incorrectly_included = [k for k, v in gold_categories.items() if not v and k in pred_categories]
incorrectly_excluded = [k for k, v in gold_categories.items() if v and k not in pred_categories]
correctly_excluded = [k for k, v in gold_categories.items() if not v and k not in pred_categories]
# 重新计算类别准确率
score = (len(correctly_included) + len(correctly_excluded)) / len(gold_categories)
if score == 1.0:
fb_text = f"类别分类是完美的。您正确识别了消息属于以下类别:`{repr(correctly_included)}`。"
else:
fb_text = f"类别分类不完美。您正确识别了消息属于以下类别:`{repr(correctly_included)}`。\n"
if incorrectly_included:
fb_text += f"但是,您错误地识别消息属于以下类别:`{repr(incorrectly_included)}`。消息实际上不属于这些类别。\n"
if incorrectly_excluded:
prefix = "另外," if incorrectly_included else "但是,"
fb_text += f"{prefix}您没有识别出消息实际属于的以下类别:`{repr(incorrectly_excluded)}`。\n"
fb_text += "请思考如何推理才能得到正确的类别标签。"
return fb_text, score
def metric_with_feedback(example, pred, trace=None, pred_name=None, pred_trace=None):
"""
带反馈的评估指标,支持模块级别的反馈
"""
gold = json.loads(example['answer'])
# 计算所有模块的反馈和得分
fb_urgency, score_urgency = feedback_urgency(gold['urgency'], pred.urgency)
fb_sentiment, score_sentiment = feedback_sentiment(gold['sentiment'], pred.sentiment)
fb_categories, score_categories = feedback_categories(gold['categories'], pred.categories)
# 总体得分
total = (score_urgency + score_sentiment + score_categories) / 3
if pred_name is None:
return total
# 根据预测器名称返回相应的反馈
elif pred_name == 'urgency_module.predict':
feedback = fb_urgency
elif pred_name == 'sentiment_module.predict':
feedback = fb_sentiment
elif pred_name == 'categories_module.predict':
feedback = fb_categories
return dspy.Prediction(score=total, feedback=feedback)
```
## 步骤7:使用GEPA优化
### 初始化GEPA优化器
```python
from dspy import GEPA
optimizer = GEPA(
metric=metric_with_feedback,
auto="light", # 使用轻量级预算,生产环境建议使用 "heavy"
num_threads=32,
track_stats=True,
use_merge=False,
reflection_lm=dspy.LM(model="gpt-5", temperature=1.0, max_tokens=32000, api_key=api_key)
)
```
### 执行优化
```python
# 运行GEPA优化
optimized_program = optimizer.compile(
student=program,
trainset=train_set,
valset=val_set
)
```
**优化过程输出示例:**
```
INFO: Running GEPA for approx 1643 metric calls of the program.
INFO: Iteration 0: Base program full valset score: 0.7207070707070706
INFO: Iteration 1: Selected program 0 score: 0.7207070707070706
INFO: Iteration 1: Proposed new text for urgency_module.predict:
Task: 确定客户消息到ProCare Facility Solutions的紧急程度。
上下文和领域:
- 消息通常发送给ProCare Facility Solutions的支持团队,涉及设施服务
- 常见主题包括清洁质量、HVAC性能/安全、日常维护调度和一般查询
如何评估紧急程度:
使用以下主要因素:
1) 安全和风险:
- 高/紧急:存在直接安全危险或潜在伤害
- 中等:提到安全但描述为轻微或无迫在眉睫的危险迹象
2) 运营影响:
- 高:关键系统中断或问题阻止正常运营
- 中等:服务降级或质量不一致需要及时关注
- 低:没有描述运营影响且消息仅为信息性
...
```
## 步骤8:评估优化后的程序
```python
# 评估优化后的程序
optimized_result = evaluate(optimized_program)
print(f"优化后性能: {optimized_result.score:.1f}%")
print(f"性能提升: {optimized_result.score - baseline_result.score:.1f}百分点")
```
**预期输出:**
```
Average Metric: 56.83 / 66 (86.1%)
优化后性能: 86.1%
性能提升: 10.7百分点
```
## 步骤9:详细分析优化结果
### 查看优化后的提示
```python
# 检查优化后的提示
print("=== 紧急程度模块优化后的提示 ===")
print(optimized_program.urgency_module.predict.signature.instructions)
print("\n=== 情感分析模块优化后的提示 ===")
print(optimized_program.sentiment_module.predict.signature.instructions)
print("\n=== 类别分类模块优化后的提示 ===")
print(optimized_program.categories_module.predict.signature.instructions)
```
### 性能对比分析
```python
import pandas as pd
# 对比分析
def detailed_evaluation(program, dataset, name):
results = []
for example in dataset:
pred = program(message=example['message'])
gold = json.loads(example['answer'])
urgency_correct = gold['urgency'] == pred.urgency
sentiment_correct = gold['sentiment'] == pred.sentiment
categories_score = score_categories(gold['categories'], pred.categories)
results.append({
'urgency_correct': urgency_correct,
'sentiment_correct': sentiment_correct,
'categories_score': categories_score,
'overall_score': metric(example, pred)
})
df = pd.DataFrame(results)
print(f"\n=== {name} 详细分析 ===")
print(f"紧急程度准确率: {df['urgency_correct'].mean():.3f}")
print(f"情感分析准确率: {df['sentiment_correct'].mean():.3f}")
print(f"类别分类平均得分: {df['categories_score'].mean():.3f}")
print(f"总体平均得分: {df['overall_score'].mean():.3f}")
return df
# 详细评估
baseline_df = detailed_evaluation(program, test_set, "基线模型")
optimized_df = detailed_evaluation(optimized_program, test_set, "优化后模型")
```
## 核心概念和最佳实践
### GEPA的工作原理
1. **反思性优化**:GEPA通过分析文本反馈来理解为什么系统获得特定分数
2. **模块级反馈**:为每个预测器提供具体的反馈,使优化更精准
3. **帕累托前沿**:GEPA维护多个候选程序,平衡不同目标的性能
### 关键设计决策
#### 1. 评估指标设计
- **组合指标**:结合多个子任务的性能
- **文本反馈**:提供可操作的改进建议
- **模块特异性**:为不同模块提供针对性反馈
#### 2. 数据分割策略
- **训练集**:用于GEPA学习模式
- **验证集**:用于帕累托优化和候选选择
- **测试集**:用于最终性能评估
#### 3. 优化参数调优
```python
# 生产环境推荐配置
optimizer = GEPA(
metric=metric_with_feedback,
auto="heavy", # 更大的优化预算
num_threads=32,
track_stats=True,
use_merge=True, # 启用模块合并
reflection_lm=dspy.LM(model="gpt-4", temperature=0.7, max_tokens=16000)
)
```
### 常见问题和解决方案
#### 1. 性能不提升
**原因**:
- 反馈质量不够具体
- 验证集太小
- 优化预算不足
**解决方案**:
- 改进反馈文本的具体性和可操作性
- 增加验证集大小
- 使用 `auto="heavy"` 或增加手动预算
#### 2. 过拟合验证集
**原因**:
- 验证集与测试集分布不一致
- 优化时间过长
**解决方案**:
- 确保数据分割的代表性
- 使用早停策略
- 监控验证和测试性能差距
#### 3. 优化速度慢
**原因**:
- 线程数设置不当
- 反馈生成复杂度高
**解决方案**:
- 调整 `num_threads` 参数
- 简化反馈生成逻辑
- 使用更快的反思模型
## 进阶技巧
### 1. 自定义反馈生成
```python
def advanced_feedback_categories(gold_categories, pred_categories, message_context):
"""基于消息上下文生成更详细的反馈"""
# 分析消息中的关键词
keywords = extract_keywords(message_context)
# 生成上下文相关的反馈
feedback = generate_contextual_feedback(
gold_categories, pred_categories, keywords
)
return feedback, score
```
### 2. 分层优化策略
```python
# 先优化单个模块,再优化整体
urgency_optimizer = GEPA(metric=urgency_specific_metric, auto="light")
sentiment_optimizer = GEPA(metric=sentiment_specific_metric, auto="light")
overall_optimizer = GEPA(metric=overall_metric, auto="heavy")
# 分阶段优化
program = urgency_optimizer.compile(program, trainset, valset)
program = sentiment_optimizer.compile(program, trainset, valset)
program = overall_optimizer.compile(program, trainset, valset)
```
### 3. 集成多个优化器
```python
# 结合不同优化方法
from dspy import BootstrapFewShot
# 先用少样本学习初始化
bootstrap = BootstrapFewShot(metric=metric)
bootstrapped_program = bootstrap.compile(program, trainset)
# 再用GEPA精细优化
gepa = GEPA(metric=metric_with_feedback, auto="heavy")
final_program = gepa.compile(bootstrapped_program, trainset, valset)
```
## 实际应用建议
### 1. 生产部署
- 保存优化后的程序:`optimized_program.save("facility_analyzer_v1")`
- 版本控制:记录优化参数和性能指标
- A/B测试:对比优化前后的实际效果
### 2. 持续改进
- 定期重新优化:使用新数据更新模型
- 监控性能漂移:跟踪生产环境中的性能变化
- 反馈循环:收集用户反馈改进评估指标
### 3. 扩展到其他任务
- 适配不同领域:修改类别定义和反馈逻辑
- 多语言支持:调整提示语言和示例
- 实时优化:在线学习和适应
## 总结
本教程展示了如何使用GEPA优化企业信息抽取任务:
1. **数据准备**:构建高质量的训练和验证集
2. **程序设计**:创建模块化的DSPy程序
3. **评估指标**:设计综合性评估指标和详细反馈
4. **GEPA优化**:使用反思性优化提升性能
5. **结果分析**:深入分析优化效果和改进空间
通过GEPA的反思性优化,我们成功将基线模型的75.4%准确率提升到86.1%,实现了显著的性能改进。这种方法特别适用于需要细粒度控制和高质量输出的企业级应用场景。
关键要点:
- GEPA通过文本反馈实现精准优化
- 模块级反馈使优化更加针对性
- 组合多个子任务需要平衡各个目标
- 持续优化和监控对生产部署至关重要
希望这个教程能帮助您在实际项目中成功应用GEPA技术!
登录后可参与表态
讨论回复
1 条回复
✨步子哥 (steper)
#1
10-07 04:49
登录后可参与表态