以下示例展示了如何使用Spring AI Alibaba实现基于A2A协议的多智能体协作系统。
integration_instructions
基础配置
首先,需要在项目中添加Spring AI Alibaba依赖:
<dependency>
<groupId>com.alibaba.cloud.ai</groupId>
<artifactId>spring-ai-alibaba-starter</artifactId>
<version>1.0.0.2</version>
</dependency>
然后,在application.yml中配置A2A协议相关参数:
spring:
ai:
alibaba:
dashscope:
api-key: ${DASHSCOPE_API_KEY}
a2a:
enabled: true
agent-id: "customer-service-agent"
registry:
type: "nacos"
server-addr: "localhost:8848"
smart_toy
创建智能体
以下代码展示了如何创建一个支持A2A协议的智能体:
@Component
public class CustomerServiceAgent {
private final ChatModel chatModel;
private final A2AClient a2aClient;
public CustomerServiceAgent(ChatModel chatModel, A2AClient a2aClient) {
this.chatModel = chatModel;
this.a2aClient = a2aClient;
}
@PostConstruct
public void registerAgent() {
AgentCard agentCard = AgentCard.builder()
.agentId("customer-service-agent")
.name("客户服务智能体")
.description("处理客户咨询和投诉")
.capabilities(List.of(
Capability.builder()
.name("handle_inquiry")
.description("处理客户咨询")
.inputSchema("{\"type\":\"object\",\"properties\":{\"query\":{\"type\":\"string\"}}}")
.outputSchema("{\"type\":\"object\",\"properties\":{\"response\":{\"type\":\"string\"}}}")
.build()
))
.build();
a2aClient.registerAgent(agentCard);
}
@A2ATaskHandler("handle_inquiry")
public TaskResult handleInquiry(Task task) {
String query = (String) task.getInput().get("query");
Prompt prompt = new Prompt("作为客户服务代表,请回答以下问题:" + query);
ChatResponse response = chatModel.call(prompt);
Map<String, Object> output = new HashMap<>();
output.put("response", response.getResult().getOutput().getContent());
return TaskResult.builder()
.taskId(task.getTaskId())
.status(TaskStatus.COMPLETED)
.output(output)
.build();
}
}
swap_horiz
智能体协作
以下代码展示了如何实现智能体间的协作:
@Service
public class OrderProcessingService {
private final A2AClient a2aClient;
private final ChatModel chatModel;
public OrderProcessingService(A2AClient a2aClient, ChatModel chatModel) {
this.a2aClient = a2aClient;
this.chatModel = chatModel;
}
public OrderResult processOrder(OrderRequest orderRequest) {
Task inventoryTask = Task.builder()
.taskType("check_inventory")
.input(Map.of(
"productId", orderRequest.getProductId(),
"quantity", orderRequest.getQuantity()
))
.build();
TaskResult inventoryResult = a2aClient.invokeAgent("inventory-agent", inventoryTask);
if (!inventoryResult.getStatus().equals(TaskStatus.COMPLETED)) {
return OrderResult.failure("库存检查失败");
}
Boolean inStock = (Boolean) inventoryResult.getOutput().get("inStock");
if (!inStock) {
Task purchaseTask = Task.builder()
.taskType("request_purchase")
.input(Map.of(
"productId", orderRequest.getProductId(),
"quantity", orderRequest.getQuantity()
))
.build();
TaskResult purchaseResult = a2aClient.invokeAgent("purchase-agent", purchaseTask);
if (!purchaseResult.getStatus().equals(TaskStatus.COMPLETED)) {
return OrderResult.failure("采购请求失败");
}
}
Task logisticsTask = Task.builder()
.taskType("arrange_delivery")
.input(Map.of(
"productId", orderRequest.getProductId(),
"quantity", orderRequest.getQuantity(),
"address", orderRequest.getDeliveryAddress()
))
.build();
TaskResult logisticsResult = a2aClient.invokeAgent("logistics-agent", logisticsTask);
if (!logisticsResult.getStatus().equals(TaskStatus.COMPLETED)) {
return OrderResult.failure("配送安排失败");
}
String trackingNumber = (String) logisticsResult.getOutput().get("trackingNumber");
Task notificationTask = Task.builder()
.taskType("send_notification")
.input(Map.of(
"customerId", orderRequest.getCustomerId(),
"message", "您的订单已处理,物流单号:" + trackingNumber
))
.build();
a2aClient.invokeAgent("customer-service-agent", notificationTask);
return OrderResult.success(trackingNumber);
}
}
account_tree
使用Graph构建工作流
以下代码展示了如何使用Spring AI Alibaba的Graph框架构建多智能体工作流:
@Configuration
public class OrderProcessingWorkflow {
@Bean
public Graph orderProcessingGraph(A2AClient a2aClient) {
return Graph.builder()
.id("order-processing-workflow")
.nodes(
Node.builder()
.id("check-inventory")
.type(NodeType.AGENT)
.agentId("inventory-agent")
.taskType("check_inventory")
.build(),
Node.builder()
.id("request-purchase")
.type(NodeType.AGENT)
.agentId("purchase-agent")
.taskType("request_purchase")
.build(),
Node.builder()
.id("arrange-delivery")
.type(NodeType.AGENT)
.agentId("logistics-agent")
.taskType("arrange_delivery")
.build(),
Node.builder()
.id("send-notification")
.type(NodeType.AGENT)
.agentId("customer-service-agent")
.taskType("send_notification")
.build(),
Node.builder()
.id("check-stock-condition")
.type(NodeType.CONDITION)
.conditionFunction(state -> {
Boolean inStock = (Boolean) state.get("inStock");
return inStock ? "arrange-delivery" : "request-purchase";
})
.build()
)
.edges(
Edge.builder()
.source("check-inventory")
.target("check-stock-condition")
.build(),
Edge.builder()
.source("check-stock-condition")
.target("arrange-delivery")
.condition("arrange-delivery")
.build(),
Edge.builder()
.source("check-stock-condition")
.target("request-purchase")
.condition("request-purchase")
.build(),
Edge.builder()
.source("request-purchase")
.target("arrange-delivery")
.build(),
Edge.builder()
.source("arrange-delivery")
.target("send-notification")
.build()
)
.build();
}
@Service
public class OrderProcessingService {
private final Graph orderProcessingGraph;
public OrderProcessingService(Graph orderProcessingGraph) {
this.orderProcessingGraph = orderProcessingGraph;
}
public OrderResult processOrder(OrderRequest orderRequest) {
Map<String, Object> initialState = new HashMap<>();
initialState.put("productId", orderRequest.getProductId());
initialState.put("quantity", orderRequest.getQuantity());
initialState.put("address", orderRequest.getDeliveryAddress());
initialState.put("customerId", orderRequest.getCustomerId());
GraphExecutionResult result = orderProcessingGraph.execute(initialState);
if (result.getStatus() == ExecutionStatus.COMPLETED) {
String trackingNumber = (String) result.getFinalState().get("trackingNumber");
return OrderResult.success(trackingNumber);
} else {
return OrderResult.failure(result.getErrorMessage());
}
}
}
}
tips_and_updates
最佳实践
在使用Spring AI Alibaba实现A2A协议时,建议遵循以下最佳实践:
- 合理设计智能体边界:确保每个智能体职责单一,避免功能重叠
- 实现幂等性:确保智能体处理的任务具有幂等性,便于重试和错误恢复
- 添加超时机制:为智能体间的通信设置合理的超时时间,防止系统阻塞
- 实现断路器模式:当某个智能体不可用时,快速失败并提供降级方案
- 记录详细日志:记录智能体间的交互日志,便于问题排查和系统优化