第21章 会话管理
在生产环境中,Agent需要跨多次交互保持状态,记住之前的对话历史和上下文。AgentScope-Java提供了完善的会话管理系统,支持状态的持久化和恢复。本章将详细介绍会话管理的设计和使用。
21.1 会话管理概述
21.1.1 为什么需要会话管理
┌─────────────────────────────────────────────────────────────────┐
│ 无会话 vs 有会话 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 无会话管理: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 用户: "我叫张三" │ │
│ │ Agent: "你好,张三!" │ │
│ │ │ │
│ │ [应用重启或连接断开] │ │
│ │ │ │
│ │ 用户: "还记得我叫什么吗?" │ │
│ │ Agent: "抱歉,我不知道您的名字..." ← 状态丢失! │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 有会话管理: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 用户: "我叫张三" │ │
│ │ Agent: "你好,张三!" → [状态自动保存] │ │
│ │ │ │
│ │ [应用重启或连接断开] │ │
│ │ [状态自动恢复] ← │ │
│ │ │ │
│ │ 用户: "还记得我叫什么吗?" │ │
│ │ Agent: "当然记得,您是张三!" ← 状态恢复成功! │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
21.1.2 会话管理核心组件
| 组件 | 说明 | 职责 |
|---|---|---|
| Session | 会话存储接口 | 定义状态存取的标准接口 |
| SessionKey | 会话标识 | 唯一标识一个会话 |
| StateModule | 状态模块接口 | 定义组件的状态序列化能力 |
| State | 状态标记接口 | 标识可序列化的状态类 |
| InMemorySession | 内存会话实现 | 进程内状态存储 |
| JsonSession | JSON文件会话实现 | 基于文件的持久化存储 |
┌─────────────────────────────────────────────────────────────────┐
│ 会话管理架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ ReActAgent │ │
│ │ implements StateModule │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Memory │ │ Toolkit │ │PlanNoteb.│ │ │
│ │ │ (状态) │ │ (状态) │ │ (状态) │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │ │
│ │ └──────────┬──┴─────────────┘ │ │
│ │ │ │ │
│ │ saveTo/loadFrom │ │
│ │ ↓ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Session │ │
│ │ save(SessionKey, key, State) │ │
│ │ get(SessionKey, key, Class<T>) │ │
│ │ save(SessionKey, key, List<State>) │ │
│ │ getList(SessionKey, key, Class<T>) │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ InMemorySession │ │ JsonSession │ │ │
│ │ │ (内存存储) │ │ (文件存储) │ │ │
│ │ └─────────────────┘ └─────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
21.2 Session接口
21.2.1 核心接口定义
/**
* Session - 会话存储接口
*
* 提供状态对象的持久化存储能力,支持:
* - 单值存储:save/get
* - 列表存储:save/getList(支持增量追加)
* - 会话管理:exists/delete/listSessionKeys
*/
public interface Session {
/**
* 保存单个状态值(完整替换)
*
* @param sessionKey 会话标识
* @param key 状态键名(如"agent_meta", "toolkit_activeGroups")
* @param value 状态值
*/
void save(SessionKey sessionKey, String key, State value);
/**
* 保存状态列表
*
* 不同实现可能使用不同存储策略:
* - JsonSession: 增量追加,只写入新元素
* - InMemorySession: 完整替换
*
* @param sessionKey 会话标识
* @param key 状态键名(如"memory_messages")
* @param values 完整状态列表
*/
void save(SessionKey sessionKey, String key, List<? extends State> values);
/**
* 获取单个状态值
*
* @return 状态值,不存在返回Optional.empty()
*/
<T extends State> Optional<T> get(
SessionKey sessionKey, String key, Class<T> type);
/**
* 获取状态列表
*
* @return 状态列表,不存在返回空列表
*/
<T extends State> List<T> getList(
SessionKey sessionKey, String key, Class<T> itemType);
/**
* 检查会话是否存在
*/
boolean exists(SessionKey sessionKey);
/**
* 删除会话及其所有数据
*/
void delete(SessionKey sessionKey);
/**
* 列出所有会话键
*/
Set<SessionKey> listSessionKeys();
/**
* 清理资源
*/
default void close() {}
}
21.2.2 SessionKey接口
/**
* SessionKey - 会话标识接口
*
* 支持自定义会话标识结构,适应多租户等复杂场景
*/
public interface SessionKey {
/**
* 转换为字符串标识
* 用于存储(目录名、数据库键、Redis前缀等)
*/
default String toIdentifier() {
return JsonUtils.getJsonCodec().toJson(this);
}
}
/**
* 简单会话键实现
*/
public record SimpleSessionKey(String sessionId) implements SessionKey {
public static SimpleSessionKey of(String sessionId) {
return new SimpleSessionKey(sessionId);
}
@Override
public String toIdentifier() {
return sessionId; // 直接返回ID,更易读
}
}
/**
* 多租户会话键示例
*/
public record TenantSessionKey(
String tenantId,
String userId,
String sessionId
) implements SessionKey {
// 自动使用JSON序列化
}
21.3 Session实现
21.3.1 InMemorySession
/**
* 内存会话实现
*
* 特点:
* - 线程安全(使用ConcurrentHashMap)
* - 适合单进程应用
* - JVM退出时数据丢失
*/
public class InMemorySession implements Session {
private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();
@Override
public void save(SessionKey sessionKey, String key, State value) {
String keyStr = serializeSessionKey(sessionKey);
SessionData data = sessions.computeIfAbsent(keyStr, k -> new SessionData());
data.setSingleState(key, value);
}
@Override
public void save(SessionKey sessionKey, String key, List<? extends State> values) {
String keyStr = serializeSessionKey(sessionKey);
SessionData data = sessions.computeIfAbsent(keyStr, k -> new SessionData());
// 完整替换列表(创建不可变副本)
data.setListState(key, List.copyOf(values));
}
@Override
public <T extends State> Optional<T> get(
SessionKey sessionKey, String key, Class<T> type) {
String keyStr = serializeSessionKey(sessionKey);
SessionData data = sessions.get(keyStr);
if (data == null) return Optional.empty();
State state = data.getSingleState(key);
if (state == null) return Optional.empty();
return Optional.of(type.cast(state));
}
@Override
public <T extends State> List<T> getList(
SessionKey sessionKey, String key, Class<T> itemType) {
String keyStr = serializeSessionKey(sessionKey);
SessionData data = sessions.get(keyStr);
if (data == null) return List.of();
List<? extends State> list = data.getListState(key);
return list == null ? List.of() : (List<T>) list;
}
@Override
public boolean exists(SessionKey sessionKey) {
return sessions.containsKey(serializeSessionKey(sessionKey));
}
@Override
public void delete(SessionKey sessionKey) {
sessions.remove(serializeSessionKey(sessionKey));
}
@Override
public Set<SessionKey> listSessionKeys() {
return sessions.keySet().stream()
.map(SimpleSessionKey::of)
.collect(Collectors.toSet());
}
/**
* 获取活跃会话数量
*/
public int getSessionCount() {
return sessions.size();
}
/**
* 清空所有会话
*/
public void clearAll() {
sessions.clear();
}
}
// 使用示例
Session session = new InMemorySession();
SessionKey key = SimpleSessionKey.of("user_123");
// 保存状态
session.save(key, "agent_meta", agentState);
// 读取状态
Optional<AgentMetaState> meta = session.get(key, "agent_meta", AgentMetaState.class);
21.3.2 JsonSession
/**
* JSON文件会话实现
*
* 特点:
* - 基于文件系统持久化
* - 支持增量追加(列表类型)
* - 使用哈希检测变更
* - UTF-8编码
*/
public class JsonSession implements Session {
private final Path sessionDirectory;
/**
* 使用默认目录 ~/.agentscope/sessions
*/
public JsonSession() {
this(Paths.get(System.getProperty("user.home"), ".agentscope", "sessions"));
}
/**
* 使用自定义目录
*/
public JsonSession(Path sessionDirectory) {
this.sessionDirectory = sessionDirectory;
try {
Files.createDirectories(sessionDirectory);
} catch (IOException e) {
throw new RuntimeException("Failed to create session directory", e);
}
}
@Override
public void save(SessionKey sessionKey, String key, State value) {
Path file = getStatePath(sessionKey, key);
ensureDirectoryExists(file.getParent());
try {
String json = JsonUtils.getJsonCodec().toPrettyJson(value);
Files.writeString(file, json, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to save state: " + key, e);
}
}
@Override
public void save(SessionKey sessionKey, String key, List<? extends State> values) {
Path file = getListPath(sessionKey, key);
Path hashFile = getHashPath(sessionKey, key);
ensureDirectoryExists(file.getParent());
try {
// 计算当前哈希
String currentHash = ListHashUtil.computeHash(values);
String storedHash = readHashFile(hashFile);
long existingCount = countLines(file);
// 判断是否需要完整重写
boolean needsFullRewrite = ListHashUtil.needsFullRewrite(
currentHash, storedHash, values.size(), (int) existingCount);
if (needsFullRewrite) {
// 完整重写
rewriteEntireList(file, values);
} else if (values.size() > existingCount) {
// 增量追加
appendNewItems(file, values, (int) existingCount);
}
// else: 无变化,跳过
// 更新哈希文件
writeHashFile(hashFile, currentHash);
} catch (IOException e) {
throw new RuntimeException("Failed to save list: " + key, e);
}
}
@Override
public <T extends State> Optional<T> get(
SessionKey sessionKey, String key, Class<T> type) {
Path file = getStatePath(sessionKey, key);
if (!Files.exists(file)) {
return Optional.empty();
}
try {
String json = Files.readString(file, StandardCharsets.UTF_8);
T state = JsonUtils.getJsonCodec().fromJson(json, type);
return Optional.of(state);
} catch (IOException e) {
throw new RuntimeException("Failed to load state: " + key, e);
}
}
@Override
public <T extends State> List<T> getList(
SessionKey sessionKey, String key, Class<T> itemType) {
Path file = getListPath(sessionKey, key);
if (!Files.exists(file)) {
return List.of();
}
try {
List<T> result = new ArrayList<>();
try (BufferedReader reader = Files.newBufferedReader(file)) {
String line;
while ((line = reader.readLine()) != null) {
if (!line.isBlank()) {
T item = JsonUtils.getJsonCodec().fromJson(line, itemType);
result.add(item);
}
}
}
return result;
} catch (IOException e) {
throw new RuntimeException("Failed to load list: " + key, e);
}
}
// 文件路径工具方法
private Path getStatePath(SessionKey sessionKey, String key) {
return sessionDirectory
.resolve(sessionKey.toIdentifier())
.resolve(key + ".json");
}
private Path getListPath(SessionKey sessionKey, String key) {
return sessionDirectory
.resolve(sessionKey.toIdentifier())
.resolve(key + ".jsonl");
}
}
// 使用示例
Session session = new JsonSession(Path.of("./data/sessions"));
SessionKey key = SimpleSessionKey.of("user_123");
// 保存和加载
session.save(key, "agent_meta", agentState);
Optional<AgentMetaState> loaded = session.get(key, "agent_meta", AgentMetaState.class);
21.4 StateModule接口
21.4.1 接口定义
/**
* StateModule - 状态模块接口
*
* 所有需要持久化的组件应实现此接口
*/
public interface StateModule {
/**
* 保存状态到会话
*/
default void saveTo(Session session, SessionKey sessionKey) {
// 默认空实现,子类按需覆盖
}
/**
* 使用字符串ID保存(便捷方法)
*/
default void saveTo(Session session, String sessionId) {
saveTo(session, SimpleSessionKey.of(sessionId));
}
/**
* 从会话加载状态
*/
default void loadFrom(Session session, SessionKey sessionKey) {
// 默认空实现,子类按需覆盖
}
/**
* 使用字符串ID加载(便捷方法)
*/
default void loadFrom(Session session, String sessionId) {
loadFrom(session, SimpleSessionKey.of(sessionId));
}
/**
* 如果会话存在则加载状态
*
* @return true表示加载成功,false表示会话不存在
*/
default boolean loadIfExists(Session session, SessionKey sessionKey) {
if (session.exists(sessionKey)) {
loadFrom(session, sessionKey);
return true;
}
return false;
}
}
21.4.2 Agent中的实现
/**
* ReActAgent的状态持久化实现
*/
public class ReActAgent extends AgentBase implements StateModule {
private static final String KEY_AGENT_META = "agent_meta";
private static final String KEY_MEMORY_MESSAGES = "memory_messages";
@Override
public void saveTo(Session session, SessionKey sessionKey) {
// 保存Agent元数据
AgentMetaState meta = new AgentMetaState(
getAgentId(),
getName(),
getDescription(),
getSystemPrompt()
);
session.save(sessionKey, KEY_AGENT_META, meta);
// 保存Memory消息(如果Memory实现了StateModule)
if (memory instanceof StateModule stateMemory) {
stateMemory.saveTo(session, sessionKey);
} else {
// 直接保存消息列表
session.save(sessionKey, KEY_MEMORY_MESSAGES, memory.getMessages());
}
// 保存其他状态模块
if (toolkit instanceof StateModule stateToolkit) {
stateToolkit.saveTo(session, sessionKey);
}
if (planNotebook != null) {
planNotebook.saveTo(session, sessionKey);
}
}
@Override
public void loadFrom(Session session, SessionKey sessionKey) {
// 加载Agent元数据
session.get(sessionKey, KEY_AGENT_META, AgentMetaState.class)
.ifPresent(meta -> {
// 恢复元数据(如果需要)
});
// 加载Memory消息
if (memory instanceof StateModule stateMemory) {
stateMemory.loadFrom(session, sessionKey);
} else {
List<Msg> messages = session.getList(
sessionKey, KEY_MEMORY_MESSAGES, Msg.class);
memory.clear();
messages.forEach(memory::addMessage);
}
// 加载其他状态模块
if (toolkit instanceof StateModule stateToolkit) {
stateToolkit.loadFrom(session, sessionKey);
}
if (planNotebook != null) {
planNotebook.loadFrom(session, sessionKey);
}
}
}
21.5 使用会话管理
21.5.1 基本使用流程
// 1. 创建Session
Session session = new JsonSession(Path.of("./data/sessions"));
// 2. 创建Agent
ReActAgent agent = ReActAgent.builder()
.name("Assistant")
.model(model)
.sysPrompt("你是智能助手")
.build();
// 3. 定义会话ID(通常基于用户ID或会话ID)
String sessionId = "user_" + userId;
// 4. 尝试恢复已存在的会话
boolean resumed = agent.loadIfExists(session, sessionId);
if (resumed) {
System.out.println("欢迎回来!我记得我们之前的对话。");
} else {
System.out.println("你好!这是我们的首次对话。");
}
// 5. 进行对话
Msg response = agent.call(Msg.user("你好")).block();
System.out.println(response.getText());
// 6. 保存会话状态
agent.saveTo(session, sessionId);
// 7. 清理资源(可选)
session.close();
21.5.2 使用Builder配置会话
// 在Builder中直接配置会话
ReActAgent agent = ReActAgent.builder()
.name("Assistant")
.model(model)
.session(session) // 配置Session
.sessionKey(SimpleSessionKey.of("user_123")) // 配置SessionKey
.autoSave(true) // 自动保存
.build();
// Agent会自动:
// - 创建时尝试加载已有会话
// - 每次call后自动保存(如果autoSave=true)
21.5.3 多用户场景
/**
* 多用户会话管理示例
*/
public class MultiUserChatService {
private final Session session;
private final Map<String, ReActAgent> agents;
private final Model model;
public MultiUserChatService(Model model) {
this.session = new JsonSession(Path.of("./data/sessions"));
this.agents = new ConcurrentHashMap<>();
this.model = model;
}
/**
* 获取或创建用户的Agent
*/
public ReActAgent getAgent(String userId) {
return agents.computeIfAbsent(userId, this::createAgent);
}
private ReActAgent createAgent(String userId) {
ReActAgent agent = ReActAgent.builder()
.name("Assistant")
.model(model)
.sysPrompt("你是智能助手,正在与用户对话。")
.build();
// 尝试恢复会话
agent.loadIfExists(session, "user_" + userId);
return agent;
}
/**
* 处理用户消息
*/
public Msg chat(String userId, String message) {
ReActAgent agent = getAgent(userId);
// 处理消息
Msg response = agent.call(Msg.user(message)).block();
// 保存会话
agent.saveTo(session, "user_" + userId);
return response;
}
/**
* 清除用户会话
*/
public void clearSession(String userId) {
session.delete(SimpleSessionKey.of("user_" + userId));
agents.remove(userId);
}
/**
* 列出所有用户会话
*/
public Set<String> listUserSessions() {
return session.listSessionKeys().stream()
.map(SessionKey::toIdentifier)
.filter(id -> id.startsWith("user_"))
.map(id -> id.substring(5))
.collect(Collectors.toSet());
}
}
21.6 自定义Session实现
21.6.1 Redis Session
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* Redis会话实现
* 适合分布式环境
*/
public class RedisSession implements Session {
private final JedisPool pool;
private final String keyPrefix;
private final ObjectMapper objectMapper;
public RedisSession(JedisPool pool, String keyPrefix) {
this.pool = pool;
this.keyPrefix = keyPrefix;
this.objectMapper = new ObjectMapper();
}
@Override
public void save(SessionKey sessionKey, String key, State value) {
String redisKey = buildKey(sessionKey, key);
try (Jedis jedis = pool.getResource()) {
String json = objectMapper.writeValueAsString(value);
jedis.set(redisKey, json);
} catch (Exception e) {
throw new RuntimeException("Failed to save state", e);
}
}
@Override
public void save(SessionKey sessionKey, String key, List<? extends State> values) {
String redisKey = buildKey(sessionKey, key);
try (Jedis jedis = pool.getResource()) {
// 使用Redis List存储
jedis.del(redisKey);
for (State value : values) {
String json = objectMapper.writeValueAsString(value);
jedis.rpush(redisKey, json);
}
} catch (Exception e) {
throw new RuntimeException("Failed to save list", e);
}
}
@Override
public <T extends State> Optional<T> get(
SessionKey sessionKey, String key, Class<T> type) {
String redisKey = buildKey(sessionKey, key);
try (Jedis jedis = pool.getResource()) {
String json = jedis.get(redisKey);
if (json == null) return Optional.empty();
T value = objectMapper.readValue(json, type);
return Optional.of(value);
} catch (Exception e) {
throw new RuntimeException("Failed to get state", e);
}
}
@Override
public <T extends State> List<T> getList(
SessionKey sessionKey, String key, Class<T> itemType) {
String redisKey = buildKey(sessionKey, key);
try (Jedis jedis = pool.getResource()) {
List<String> jsonList = jedis.lrange(redisKey, 0, -1);
List<T> result = new ArrayList<>();
for (String json : jsonList) {
T item = objectMapper.readValue(json, itemType);
result.add(item);
}
return result;
} catch (Exception e) {
throw new RuntimeException("Failed to get list", e);
}
}
@Override
public boolean exists(SessionKey sessionKey) {
String pattern = buildKey(sessionKey, "*");
try (Jedis jedis = pool.getResource()) {
Set<String> keys = jedis.keys(pattern);
return !keys.isEmpty();
}
}
@Override
public void delete(SessionKey sessionKey) {
String pattern = buildKey(sessionKey, "*");
try (Jedis jedis = pool.getResource()) {
Set<String> keys = jedis.keys(pattern);
if (!keys.isEmpty()) {
jedis.del(keys.toArray(new String[0]));
}
}
}
@Override
public Set<SessionKey> listSessionKeys() {
try (Jedis jedis = pool.getResource()) {
Set<String> keys = jedis.keys(keyPrefix + ":*");
return keys.stream()
.map(k -> k.split(":")[1])
.distinct()
.map(SimpleSessionKey::of)
.collect(Collectors.toSet());
}
}
private String buildKey(SessionKey sessionKey, String key) {
return keyPrefix + ":" + sessionKey.toIdentifier() + ":" + key;
}
@Override
public void close() {
pool.close();
}
}
// 使用Redis Session
JedisPool pool = new JedisPool("localhost", 6379);
Session session = new RedisSession(pool, "agentscope");
21.6.2 数据库Session
/**
* 数据库会话实现
*/
public class DatabaseSession implements Session {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public DatabaseSession(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
this.objectMapper = new ObjectMapper();
initializeSchema();
}
private void initializeSchema() {
jdbcTemplate.execute("""
CREATE TABLE IF NOT EXISTS session_states (
session_key VARCHAR(255) NOT NULL,
state_key VARCHAR(255) NOT NULL,
state_value TEXT NOT NULL,
is_list BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (session_key, state_key)
)
""");
}
@Override
public void save(SessionKey sessionKey, String key, State value) {
try {
String json = objectMapper.writeValueAsString(value);
jdbcTemplate.update("""
INSERT INTO session_states (session_key, state_key, state_value, is_list, updated_at)
VALUES (?, ?, ?, FALSE, CURRENT_TIMESTAMP)
ON CONFLICT (session_key, state_key)
DO UPDATE SET state_value = ?, updated_at = CURRENT_TIMESTAMP
""",
sessionKey.toIdentifier(), key, json, json
);
} catch (Exception e) {
throw new RuntimeException("Failed to save state", e);
}
}
@Override
public void save(SessionKey sessionKey, String key, List<? extends State> values) {
try {
// 转换为JSON数组
String json = objectMapper.writeValueAsString(values);
jdbcTemplate.update("""
INSERT INTO session_states (session_key, state_key, state_value, is_list, updated_at)
VALUES (?, ?, ?, TRUE, CURRENT_TIMESTAMP)
ON CONFLICT (session_key, state_key)
DO UPDATE SET state_value = ?, updated_at = CURRENT_TIMESTAMP
""",
sessionKey.toIdentifier(), key, json, json
);
} catch (Exception e) {
throw new RuntimeException("Failed to save list", e);
}
}
@Override
public <T extends State> Optional<T> get(
SessionKey sessionKey, String key, Class<T> type) {
List<String> results = jdbcTemplate.queryForList(
"SELECT state_value FROM session_states WHERE session_key = ? AND state_key = ? AND is_list = FALSE",
String.class,
sessionKey.toIdentifier(), key
);
if (results.isEmpty()) return Optional.empty();
try {
T value = objectMapper.readValue(results.get(0), type);
return Optional.of(value);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize state", e);
}
}
@Override
public <T extends State> List<T> getList(
SessionKey sessionKey, String key, Class<T> itemType) {
List<String> results = jdbcTemplate.queryForList(
"SELECT state_value FROM session_states WHERE session_key = ? AND state_key = ? AND is_list = TRUE",
String.class,
sessionKey.toIdentifier(), key
);
if (results.isEmpty()) return List.of();
try {
JavaType listType = objectMapper.getTypeFactory()
.constructCollectionType(List.class, itemType);
return objectMapper.readValue(results.get(0), listType);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize list", e);
}
}
@Override
public boolean exists(SessionKey sessionKey) {
Integer count = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM session_states WHERE session_key = ?",
Integer.class,
sessionKey.toIdentifier()
);
return count != null && count > 0;
}
@Override
public void delete(SessionKey sessionKey) {
jdbcTemplate.update(
"DELETE FROM session_states WHERE session_key = ?",
sessionKey.toIdentifier()
);
}
@Override
public Set<SessionKey> listSessionKeys() {
List<String> keys = jdbcTemplate.queryForList(
"SELECT DISTINCT session_key FROM session_states",
String.class
);
return keys.stream()
.map(SimpleSessionKey::of)
.collect(Collectors.toSet());
}
}
21.7 会话管理最佳实践
21.7.1 会话ID设计
| 场景 | 推荐格式 | 示例 |
|---|---|---|
| 单用户应用 | user_{userId} | user_12345 |
| 多会话 | user{userId}session_{sessionId} | user12345session_abc |
| 多租户 | tenant{tenantId}user_{userId} | tenantacmeuser_12345 |
| 临时会话 | temp_{uuid} | temp_550e8400-e29b... |
21.7.2 会话清理策略
/**
* 会话清理服务
*/
public class SessionCleanupService {
private final Session session;
private final Duration maxAge;
public SessionCleanupService(Session session, Duration maxAge) {
this.session = session;
this.maxAge = maxAge;
}
/**
* 清理过期会话
*/
public int cleanupExpiredSessions() {
int cleaned = 0;
for (SessionKey key : session.listSessionKeys()) {
Optional<SessionInfo> info = session.get(key, "_session_info", SessionInfo.class);
if (info.isPresent()) {
Instant lastAccess = Instant.parse(info.get().lastAccessTime());
if (Instant.now().minus(maxAge).isAfter(lastAccess)) {
session.delete(key);
cleaned++;
}
}
}
return cleaned;
}
/**
* 更新会话访问时间
*/
public void touchSession(SessionKey key) {
SessionInfo info = new SessionInfo(Instant.now().toString());
session.save(key, "_session_info", info);
}
}
// 定期清理
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
SessionCleanupService cleanup = new SessionCleanupService(session, Duration.ofDays(30));
scheduler.scheduleAtFixedRate(
cleanup::cleanupExpiredSessions,
0, 1, TimeUnit.HOURS
);
21.7.3 错误处理
/**
* 带错误处理的会话操作
*/
public class RobustSessionManager {
private final Session session;
public void saveWithRetry(SessionKey key, String stateKey, State value) {
int maxRetries = 3;
int attempt = 0;
while (attempt < maxRetries) {
try {
session.save(key, stateKey, value);
return;
} catch (Exception e) {
attempt++;
if (attempt >= maxRetries) {
log.error("Failed to save state after {} attempts", maxRetries, e);
throw new SessionException("Save failed", e);
}
// 等待后重试
try {
Thread.sleep(100 * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new SessionException("Interrupted during retry", ie);
}
}
}
}
public <T extends State> Optional<T> getWithFallback(
SessionKey key, String stateKey, Class<T> type, T fallback) {
try {
return session.get(key, stateKey, type);
} catch (Exception e) {
log.warn("Failed to load state, using fallback", e);
return Optional.ofNullable(fallback);
}
}
}
21.8 本章小结
本章详细介绍了AgentScope-Java的会话管理系统:
- Session接口:定义了状态存储的标准接口,支持单值和列表存储
- SessionKey:灵活的会话标识设计,支持自定义结构
- StateModule:组件状态序列化接口,Agent和Memory等均实现此接口
- 内置实现:InMemorySession用于开发测试,JsonSession用于文件持久化
- 自定义实现:可扩展为Redis、数据库等存储后端
- 最佳实践:会话ID设计、清理策略、错误处理
会话管理是构建生产级Agent应用的基础能力,确保用户体验的连续性和状态的可靠性。