第21章 会话管理

第21章 会话管理

在生产环境中,Agent需要跨多次交互保持状态,记住之前的对话历史和上下文。AgentScope-Java提供了完善的会话管理系统,支持状态的持久化和恢复。本章将详细介绍会话管理的设计和使用。

21.1 会话管理概述

21.1.1 为什么需要会话管理

┌─────────────────────────────────────────────────────────────────┐
│                     无会话 vs 有会话                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  无会话管理:                                                    │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ 用户: "我叫张三"                                          │    │
│  │ Agent: "你好,张三!"                                     │    │
│  │                                                          │    │
│  │ [应用重启或连接断开]                                      │    │
│  │                                                          │    │
│  │ 用户: "还记得我叫什么吗?"                                 │    │
│  │ Agent: "抱歉,我不知道您的名字..."  ← 状态丢失!           │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│  有会话管理:                                                    │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │ 用户: "我叫张三"                                          │    │
│  │ Agent: "你好,张三!" → [状态自动保存]                    │    │
│  │                                                          │    │
│  │ [应用重启或连接断开]                                      │    │
│  │ [状态自动恢复] ←                                         │    │
│  │                                                          │    │
│  │ 用户: "还记得我叫什么吗?"                                 │    │
│  │ Agent: "当然记得,您是张三!"  ← 状态恢复成功!            │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

21.1.2 会话管理核心组件

组件说明职责
Session会话存储接口定义状态存取的标准接口
SessionKey会话标识唯一标识一个会话
StateModule状态模块接口定义组件的状态序列化能力
State状态标记接口标识可序列化的状态类
InMemorySession内存会话实现进程内状态存储
JsonSessionJSON文件会话实现基于文件的持久化存储
┌─────────────────────────────────────────────────────────────────┐
│                     会话管理架构                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                    ReActAgent                            │    │
│  │    implements StateModule                                │    │
│  │                                                          │    │
│  │    ┌──────────┐  ┌──────────┐  ┌──────────┐             │    │
│  │    │  Memory  │  │ Toolkit  │  │PlanNoteb.│             │    │
│  │    │  (状态)   │  │  (状态)   │  │  (状态)   │             │    │
│  │    └────┬─────┘  └────┬─────┘  └────┬─────┘             │    │
│  │         │             │             │                    │    │
│  │         └──────────┬──┴─────────────┘                    │    │
│  │                    │                                     │    │
│  │              saveTo/loadFrom                             │    │
│  │                    ↓                                     │    │
│  └─────────────────────────────────────────────────────────┘    │
│                       │                                          │
│                       ↓                                          │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                    Session                               │    │
│  │    save(SessionKey, key, State)                         │    │
│  │    get(SessionKey, key, Class<T>)                       │    │
│  │    save(SessionKey, key, List<State>)                   │    │
│  │    getList(SessionKey, key, Class<T>)                   │    │
│  │                                                          │    │
│  │  ┌─────────────────┐  ┌─────────────────┐               │    │
│  │  │ InMemorySession │  │   JsonSession   │               │    │
│  │  │   (内存存储)     │  │   (文件存储)    │               │    │
│  │  └─────────────────┘  └─────────────────┘               │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

21.2 Session接口

21.2.1 核心接口定义

/**
 * Session - 会话存储接口
 * 
 * 提供状态对象的持久化存储能力,支持:
 * - 单值存储:save/get
 * - 列表存储:save/getList(支持增量追加)
 * - 会话管理:exists/delete/listSessionKeys
 */
public interface Session {
    
    /**
     * 保存单个状态值(完整替换)
     * 
     * @param sessionKey 会话标识
     * @param key 状态键名(如"agent_meta", "toolkit_activeGroups")
     * @param value 状态值
     */
    void save(SessionKey sessionKey, String key, State value);
    
    /**
     * 保存状态列表
     * 
     * 不同实现可能使用不同存储策略:
     * - JsonSession: 增量追加,只写入新元素
     * - InMemorySession: 完整替换
     * 
     * @param sessionKey 会话标识
     * @param key 状态键名(如"memory_messages")
     * @param values 完整状态列表
     */
    void save(SessionKey sessionKey, String key, List<? extends State> values);
    
    /**
     * 获取单个状态值
     * 
     * @return 状态值,不存在返回Optional.empty()
     */
    <T extends State> Optional<T> get(
        SessionKey sessionKey, String key, Class<T> type);
    
    /**
     * 获取状态列表
     * 
     * @return 状态列表,不存在返回空列表
     */
    <T extends State> List<T> getList(
        SessionKey sessionKey, String key, Class<T> itemType);
    
    /**
     * 检查会话是否存在
     */
    boolean exists(SessionKey sessionKey);
    
    /**
     * 删除会话及其所有数据
     */
    void delete(SessionKey sessionKey);
    
    /**
     * 列出所有会话键
     */
    Set<SessionKey> listSessionKeys();
    
    /**
     * 清理资源
     */
    default void close() {}
}

21.2.2 SessionKey接口

/**
 * SessionKey - 会话标识接口
 * 
 * 支持自定义会话标识结构,适应多租户等复杂场景
 */
public interface SessionKey {
    
    /**
     * 转换为字符串标识
     * 用于存储(目录名、数据库键、Redis前缀等)
     */
    default String toIdentifier() {
        return JsonUtils.getJsonCodec().toJson(this);
    }
}

/**
 * 简单会话键实现
 */
public record SimpleSessionKey(String sessionId) implements SessionKey {
    
    public static SimpleSessionKey of(String sessionId) {
        return new SimpleSessionKey(sessionId);
    }
    
    @Override
    public String toIdentifier() {
        return sessionId;  // 直接返回ID,更易读
    }
}

/**
 * 多租户会话键示例
 */
public record TenantSessionKey(
    String tenantId,
    String userId,
    String sessionId
) implements SessionKey {
    // 自动使用JSON序列化
}

21.3 Session实现

21.3.1 InMemorySession

/**
 * 内存会话实现
 * 
 * 特点:
 * - 线程安全(使用ConcurrentHashMap)
 * - 适合单进程应用
 * - JVM退出时数据丢失
 */
public class InMemorySession implements Session {
    
    private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();
    
    @Override
    public void save(SessionKey sessionKey, String key, State value) {
        String keyStr = serializeSessionKey(sessionKey);
        SessionData data = sessions.computeIfAbsent(keyStr, k -> new SessionData());
        data.setSingleState(key, value);
    }
    
    @Override
    public void save(SessionKey sessionKey, String key, List<? extends State> values) {
        String keyStr = serializeSessionKey(sessionKey);
        SessionData data = sessions.computeIfAbsent(keyStr, k -> new SessionData());
        // 完整替换列表(创建不可变副本)
        data.setListState(key, List.copyOf(values));
    }
    
    @Override
    public <T extends State> Optional<T> get(
            SessionKey sessionKey, String key, Class<T> type) {
        String keyStr = serializeSessionKey(sessionKey);
        SessionData data = sessions.get(keyStr);
        if (data == null) return Optional.empty();
        
        State state = data.getSingleState(key);
        if (state == null) return Optional.empty();
        
        return Optional.of(type.cast(state));
    }
    
    @Override
    public <T extends State> List<T> getList(
            SessionKey sessionKey, String key, Class<T> itemType) {
        String keyStr = serializeSessionKey(sessionKey);
        SessionData data = sessions.get(keyStr);
        if (data == null) return List.of();
        
        List<? extends State> list = data.getListState(key);
        return list == null ? List.of() : (List<T>) list;
    }
    
    @Override
    public boolean exists(SessionKey sessionKey) {
        return sessions.containsKey(serializeSessionKey(sessionKey));
    }
    
    @Override
    public void delete(SessionKey sessionKey) {
        sessions.remove(serializeSessionKey(sessionKey));
    }
    
    @Override
    public Set<SessionKey> listSessionKeys() {
        return sessions.keySet().stream()
            .map(SimpleSessionKey::of)
            .collect(Collectors.toSet());
    }
    
    /**
     * 获取活跃会话数量
     */
    public int getSessionCount() {
        return sessions.size();
    }
    
    /**
     * 清空所有会话
     */
    public void clearAll() {
        sessions.clear();
    }
}

// 使用示例
Session session = new InMemorySession();
SessionKey key = SimpleSessionKey.of("user_123");

// 保存状态
session.save(key, "agent_meta", agentState);

// 读取状态
Optional<AgentMetaState> meta = session.get(key, "agent_meta", AgentMetaState.class);

21.3.2 JsonSession

/**
 * JSON文件会话实现
 * 
 * 特点:
 * - 基于文件系统持久化
 * - 支持增量追加(列表类型)
 * - 使用哈希检测变更
 * - UTF-8编码
 */
public class JsonSession implements Session {
    
    private final Path sessionDirectory;
    
    /**
     * 使用默认目录 ~/.agentscope/sessions
     */
    public JsonSession() {
        this(Paths.get(System.getProperty("user.home"), ".agentscope", "sessions"));
    }
    
    /**
     * 使用自定义目录
     */
    public JsonSession(Path sessionDirectory) {
        this.sessionDirectory = sessionDirectory;
        try {
            Files.createDirectories(sessionDirectory);
        } catch (IOException e) {
            throw new RuntimeException("Failed to create session directory", e);
        }
    }
    
    @Override
    public void save(SessionKey sessionKey, String key, State value) {
        Path file = getStatePath(sessionKey, key);
        ensureDirectoryExists(file.getParent());
        
        try {
            String json = JsonUtils.getJsonCodec().toPrettyJson(value);
            Files.writeString(file, json, StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new RuntimeException("Failed to save state: " + key, e);
        }
    }
    
    @Override
    public void save(SessionKey sessionKey, String key, List<? extends State> values) {
        Path file = getListPath(sessionKey, key);
        Path hashFile = getHashPath(sessionKey, key);
        ensureDirectoryExists(file.getParent());
        
        try {
            // 计算当前哈希
            String currentHash = ListHashUtil.computeHash(values);
            String storedHash = readHashFile(hashFile);
            long existingCount = countLines(file);
            
            // 判断是否需要完整重写
            boolean needsFullRewrite = ListHashUtil.needsFullRewrite(
                currentHash, storedHash, values.size(), (int) existingCount);
            
            if (needsFullRewrite) {
                // 完整重写
                rewriteEntireList(file, values);
            } else if (values.size() > existingCount) {
                // 增量追加
                appendNewItems(file, values, (int) existingCount);
            }
            // else: 无变化,跳过
            
            // 更新哈希文件
            writeHashFile(hashFile, currentHash);
            
        } catch (IOException e) {
            throw new RuntimeException("Failed to save list: " + key, e);
        }
    }
    
    @Override
    public <T extends State> Optional<T> get(
            SessionKey sessionKey, String key, Class<T> type) {
        Path file = getStatePath(sessionKey, key);
        
        if (!Files.exists(file)) {
            return Optional.empty();
        }
        
        try {
            String json = Files.readString(file, StandardCharsets.UTF_8);
            T state = JsonUtils.getJsonCodec().fromJson(json, type);
            return Optional.of(state);
        } catch (IOException e) {
            throw new RuntimeException("Failed to load state: " + key, e);
        }
    }
    
    @Override
    public <T extends State> List<T> getList(
            SessionKey sessionKey, String key, Class<T> itemType) {
        Path file = getListPath(sessionKey, key);
        
        if (!Files.exists(file)) {
            return List.of();
        }
        
        try {
            List<T> result = new ArrayList<>();
            try (BufferedReader reader = Files.newBufferedReader(file)) {
                String line;
                while ((line = reader.readLine()) != null) {
                    if (!line.isBlank()) {
                        T item = JsonUtils.getJsonCodec().fromJson(line, itemType);
                        result.add(item);
                    }
                }
            }
            return result;
        } catch (IOException e) {
            throw new RuntimeException("Failed to load list: " + key, e);
        }
    }
    
    // 文件路径工具方法
    private Path getStatePath(SessionKey sessionKey, String key) {
        return sessionDirectory
            .resolve(sessionKey.toIdentifier())
            .resolve(key + ".json");
    }
    
    private Path getListPath(SessionKey sessionKey, String key) {
        return sessionDirectory
            .resolve(sessionKey.toIdentifier())
            .resolve(key + ".jsonl");
    }
}

// 使用示例
Session session = new JsonSession(Path.of("./data/sessions"));
SessionKey key = SimpleSessionKey.of("user_123");

// 保存和加载
session.save(key, "agent_meta", agentState);
Optional<AgentMetaState> loaded = session.get(key, "agent_meta", AgentMetaState.class);

21.4 StateModule接口

21.4.1 接口定义

/**
 * StateModule - 状态模块接口
 * 
 * 所有需要持久化的组件应实现此接口
 */
public interface StateModule {
    
    /**
     * 保存状态到会话
     */
    default void saveTo(Session session, SessionKey sessionKey) {
        // 默认空实现,子类按需覆盖
    }
    
    /**
     * 使用字符串ID保存(便捷方法)
     */
    default void saveTo(Session session, String sessionId) {
        saveTo(session, SimpleSessionKey.of(sessionId));
    }
    
    /**
     * 从会话加载状态
     */
    default void loadFrom(Session session, SessionKey sessionKey) {
        // 默认空实现,子类按需覆盖
    }
    
    /**
     * 使用字符串ID加载(便捷方法)
     */
    default void loadFrom(Session session, String sessionId) {
        loadFrom(session, SimpleSessionKey.of(sessionId));
    }
    
    /**
     * 如果会话存在则加载状态
     * 
     * @return true表示加载成功,false表示会话不存在
     */
    default boolean loadIfExists(Session session, SessionKey sessionKey) {
        if (session.exists(sessionKey)) {
            loadFrom(session, sessionKey);
            return true;
        }
        return false;
    }
}

21.4.2 Agent中的实现

/**
 * ReActAgent的状态持久化实现
 */
public class ReActAgent extends AgentBase implements StateModule {
    
    private static final String KEY_AGENT_META = "agent_meta";
    private static final String KEY_MEMORY_MESSAGES = "memory_messages";
    
    @Override
    public void saveTo(Session session, SessionKey sessionKey) {
        // 保存Agent元数据
        AgentMetaState meta = new AgentMetaState(
            getAgentId(),
            getName(),
            getDescription(),
            getSystemPrompt()
        );
        session.save(sessionKey, KEY_AGENT_META, meta);
        
        // 保存Memory消息(如果Memory实现了StateModule)
        if (memory instanceof StateModule stateMemory) {
            stateMemory.saveTo(session, sessionKey);
        } else {
            // 直接保存消息列表
            session.save(sessionKey, KEY_MEMORY_MESSAGES, memory.getMessages());
        }
        
        // 保存其他状态模块
        if (toolkit instanceof StateModule stateToolkit) {
            stateToolkit.saveTo(session, sessionKey);
        }
        if (planNotebook != null) {
            planNotebook.saveTo(session, sessionKey);
        }
    }
    
    @Override
    public void loadFrom(Session session, SessionKey sessionKey) {
        // 加载Agent元数据
        session.get(sessionKey, KEY_AGENT_META, AgentMetaState.class)
            .ifPresent(meta -> {
                // 恢复元数据(如果需要)
            });
        
        // 加载Memory消息
        if (memory instanceof StateModule stateMemory) {
            stateMemory.loadFrom(session, sessionKey);
        } else {
            List<Msg> messages = session.getList(
                sessionKey, KEY_MEMORY_MESSAGES, Msg.class);
            memory.clear();
            messages.forEach(memory::addMessage);
        }
        
        // 加载其他状态模块
        if (toolkit instanceof StateModule stateToolkit) {
            stateToolkit.loadFrom(session, sessionKey);
        }
        if (planNotebook != null) {
            planNotebook.loadFrom(session, sessionKey);
        }
    }
}

21.5 使用会话管理

21.5.1 基本使用流程

// 1. 创建Session
Session session = new JsonSession(Path.of("./data/sessions"));

// 2. 创建Agent
ReActAgent agent = ReActAgent.builder()
    .name("Assistant")
    .model(model)
    .sysPrompt("你是智能助手")
    .build();

// 3. 定义会话ID(通常基于用户ID或会话ID)
String sessionId = "user_" + userId;

// 4. 尝试恢复已存在的会话
boolean resumed = agent.loadIfExists(session, sessionId);
if (resumed) {
    System.out.println("欢迎回来!我记得我们之前的对话。");
} else {
    System.out.println("你好!这是我们的首次对话。");
}

// 5. 进行对话
Msg response = agent.call(Msg.user("你好")).block();
System.out.println(response.getText());

// 6. 保存会话状态
agent.saveTo(session, sessionId);

// 7. 清理资源(可选)
session.close();

21.5.2 使用Builder配置会话

// 在Builder中直接配置会话
ReActAgent agent = ReActAgent.builder()
    .name("Assistant")
    .model(model)
    .session(session)                           // 配置Session
    .sessionKey(SimpleSessionKey.of("user_123")) // 配置SessionKey
    .autoSave(true)                             // 自动保存
    .build();

// Agent会自动:
// - 创建时尝试加载已有会话
// - 每次call后自动保存(如果autoSave=true)

21.5.3 多用户场景

/**
 * 多用户会话管理示例
 */
public class MultiUserChatService {
    
    private final Session session;
    private final Map<String, ReActAgent> agents;
    private final Model model;
    
    public MultiUserChatService(Model model) {
        this.session = new JsonSession(Path.of("./data/sessions"));
        this.agents = new ConcurrentHashMap<>();
        this.model = model;
    }
    
    /**
     * 获取或创建用户的Agent
     */
    public ReActAgent getAgent(String userId) {
        return agents.computeIfAbsent(userId, this::createAgent);
    }
    
    private ReActAgent createAgent(String userId) {
        ReActAgent agent = ReActAgent.builder()
            .name("Assistant")
            .model(model)
            .sysPrompt("你是智能助手,正在与用户对话。")
            .build();
        
        // 尝试恢复会话
        agent.loadIfExists(session, "user_" + userId);
        
        return agent;
    }
    
    /**
     * 处理用户消息
     */
    public Msg chat(String userId, String message) {
        ReActAgent agent = getAgent(userId);
        
        // 处理消息
        Msg response = agent.call(Msg.user(message)).block();
        
        // 保存会话
        agent.saveTo(session, "user_" + userId);
        
        return response;
    }
    
    /**
     * 清除用户会话
     */
    public void clearSession(String userId) {
        session.delete(SimpleSessionKey.of("user_" + userId));
        agents.remove(userId);
    }
    
    /**
     * 列出所有用户会话
     */
    public Set<String> listUserSessions() {
        return session.listSessionKeys().stream()
            .map(SessionKey::toIdentifier)
            .filter(id -> id.startsWith("user_"))
            .map(id -> id.substring(5))
            .collect(Collectors.toSet());
    }
}

21.6 自定义Session实现

21.6.1 Redis Session

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * Redis会话实现
 * 适合分布式环境
 */
public class RedisSession implements Session {
    
    private final JedisPool pool;
    private final String keyPrefix;
    private final ObjectMapper objectMapper;
    
    public RedisSession(JedisPool pool, String keyPrefix) {
        this.pool = pool;
        this.keyPrefix = keyPrefix;
        this.objectMapper = new ObjectMapper();
    }
    
    @Override
    public void save(SessionKey sessionKey, String key, State value) {
        String redisKey = buildKey(sessionKey, key);
        try (Jedis jedis = pool.getResource()) {
            String json = objectMapper.writeValueAsString(value);
            jedis.set(redisKey, json);
        } catch (Exception e) {
            throw new RuntimeException("Failed to save state", e);
        }
    }
    
    @Override
    public void save(SessionKey sessionKey, String key, List<? extends State> values) {
        String redisKey = buildKey(sessionKey, key);
        try (Jedis jedis = pool.getResource()) {
            // 使用Redis List存储
            jedis.del(redisKey);
            for (State value : values) {
                String json = objectMapper.writeValueAsString(value);
                jedis.rpush(redisKey, json);
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to save list", e);
        }
    }
    
    @Override
    public <T extends State> Optional<T> get(
            SessionKey sessionKey, String key, Class<T> type) {
        String redisKey = buildKey(sessionKey, key);
        try (Jedis jedis = pool.getResource()) {
            String json = jedis.get(redisKey);
            if (json == null) return Optional.empty();
            
            T value = objectMapper.readValue(json, type);
            return Optional.of(value);
        } catch (Exception e) {
            throw new RuntimeException("Failed to get state", e);
        }
    }
    
    @Override
    public <T extends State> List<T> getList(
            SessionKey sessionKey, String key, Class<T> itemType) {
        String redisKey = buildKey(sessionKey, key);
        try (Jedis jedis = pool.getResource()) {
            List<String> jsonList = jedis.lrange(redisKey, 0, -1);
            
            List<T> result = new ArrayList<>();
            for (String json : jsonList) {
                T item = objectMapper.readValue(json, itemType);
                result.add(item);
            }
            return result;
        } catch (Exception e) {
            throw new RuntimeException("Failed to get list", e);
        }
    }
    
    @Override
    public boolean exists(SessionKey sessionKey) {
        String pattern = buildKey(sessionKey, "*");
        try (Jedis jedis = pool.getResource()) {
            Set<String> keys = jedis.keys(pattern);
            return !keys.isEmpty();
        }
    }
    
    @Override
    public void delete(SessionKey sessionKey) {
        String pattern = buildKey(sessionKey, "*");
        try (Jedis jedis = pool.getResource()) {
            Set<String> keys = jedis.keys(pattern);
            if (!keys.isEmpty()) {
                jedis.del(keys.toArray(new String[0]));
            }
        }
    }
    
    @Override
    public Set<SessionKey> listSessionKeys() {
        try (Jedis jedis = pool.getResource()) {
            Set<String> keys = jedis.keys(keyPrefix + ":*");
            return keys.stream()
                .map(k -> k.split(":")[1])
                .distinct()
                .map(SimpleSessionKey::of)
                .collect(Collectors.toSet());
        }
    }
    
    private String buildKey(SessionKey sessionKey, String key) {
        return keyPrefix + ":" + sessionKey.toIdentifier() + ":" + key;
    }
    
    @Override
    public void close() {
        pool.close();
    }
}

// 使用Redis Session
JedisPool pool = new JedisPool("localhost", 6379);
Session session = new RedisSession(pool, "agentscope");

21.6.2 数据库Session

/**
 * 数据库会话实现
 */
public class DatabaseSession implements Session {
    
    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;
    
    public DatabaseSession(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
        this.objectMapper = new ObjectMapper();
        initializeSchema();
    }
    
    private void initializeSchema() {
        jdbcTemplate.execute("""
            CREATE TABLE IF NOT EXISTS session_states (
                session_key VARCHAR(255) NOT NULL,
                state_key VARCHAR(255) NOT NULL,
                state_value TEXT NOT NULL,
                is_list BOOLEAN DEFAULT FALSE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                PRIMARY KEY (session_key, state_key)
            )
            """);
    }
    
    @Override
    public void save(SessionKey sessionKey, String key, State value) {
        try {
            String json = objectMapper.writeValueAsString(value);
            
            jdbcTemplate.update("""
                INSERT INTO session_states (session_key, state_key, state_value, is_list, updated_at)
                VALUES (?, ?, ?, FALSE, CURRENT_TIMESTAMP)
                ON CONFLICT (session_key, state_key) 
                DO UPDATE SET state_value = ?, updated_at = CURRENT_TIMESTAMP
                """,
                sessionKey.toIdentifier(), key, json, json
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to save state", e);
        }
    }
    
    @Override
    public void save(SessionKey sessionKey, String key, List<? extends State> values) {
        try {
            // 转换为JSON数组
            String json = objectMapper.writeValueAsString(values);
            
            jdbcTemplate.update("""
                INSERT INTO session_states (session_key, state_key, state_value, is_list, updated_at)
                VALUES (?, ?, ?, TRUE, CURRENT_TIMESTAMP)
                ON CONFLICT (session_key, state_key) 
                DO UPDATE SET state_value = ?, updated_at = CURRENT_TIMESTAMP
                """,
                sessionKey.toIdentifier(), key, json, json
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to save list", e);
        }
    }
    
    @Override
    public <T extends State> Optional<T> get(
            SessionKey sessionKey, String key, Class<T> type) {
        List<String> results = jdbcTemplate.queryForList(
            "SELECT state_value FROM session_states WHERE session_key = ? AND state_key = ? AND is_list = FALSE",
            String.class,
            sessionKey.toIdentifier(), key
        );
        
        if (results.isEmpty()) return Optional.empty();
        
        try {
            T value = objectMapper.readValue(results.get(0), type);
            return Optional.of(value);
        } catch (Exception e) {
            throw new RuntimeException("Failed to deserialize state", e);
        }
    }
    
    @Override
    public <T extends State> List<T> getList(
            SessionKey sessionKey, String key, Class<T> itemType) {
        List<String> results = jdbcTemplate.queryForList(
            "SELECT state_value FROM session_states WHERE session_key = ? AND state_key = ? AND is_list = TRUE",
            String.class,
            sessionKey.toIdentifier(), key
        );
        
        if (results.isEmpty()) return List.of();
        
        try {
            JavaType listType = objectMapper.getTypeFactory()
                .constructCollectionType(List.class, itemType);
            return objectMapper.readValue(results.get(0), listType);
        } catch (Exception e) {
            throw new RuntimeException("Failed to deserialize list", e);
        }
    }
    
    @Override
    public boolean exists(SessionKey sessionKey) {
        Integer count = jdbcTemplate.queryForObject(
            "SELECT COUNT(*) FROM session_states WHERE session_key = ?",
            Integer.class,
            sessionKey.toIdentifier()
        );
        return count != null && count > 0;
    }
    
    @Override
    public void delete(SessionKey sessionKey) {
        jdbcTemplate.update(
            "DELETE FROM session_states WHERE session_key = ?",
            sessionKey.toIdentifier()
        );
    }
    
    @Override
    public Set<SessionKey> listSessionKeys() {
        List<String> keys = jdbcTemplate.queryForList(
            "SELECT DISTINCT session_key FROM session_states",
            String.class
        );
        return keys.stream()
            .map(SimpleSessionKey::of)
            .collect(Collectors.toSet());
    }
}

21.7 会话管理最佳实践

21.7.1 会话ID设计

场景推荐格式示例
单用户应用user_{userId}user_12345
多会话user{userId}session_{sessionId}user12345session_abc
多租户tenant{tenantId}user_{userId}tenantacmeuser_12345
临时会话temp_{uuid}temp_550e8400-e29b...

21.7.2 会话清理策略

/**
 * 会话清理服务
 */
public class SessionCleanupService {
    
    private final Session session;
    private final Duration maxAge;
    
    public SessionCleanupService(Session session, Duration maxAge) {
        this.session = session;
        this.maxAge = maxAge;
    }
    
    /**
     * 清理过期会话
     */
    public int cleanupExpiredSessions() {
        int cleaned = 0;
        
        for (SessionKey key : session.listSessionKeys()) {
            Optional<SessionInfo> info = session.get(key, "_session_info", SessionInfo.class);
            
            if (info.isPresent()) {
                Instant lastAccess = Instant.parse(info.get().lastAccessTime());
                if (Instant.now().minus(maxAge).isAfter(lastAccess)) {
                    session.delete(key);
                    cleaned++;
                }
            }
        }
        
        return cleaned;
    }
    
    /**
     * 更新会话访问时间
     */
    public void touchSession(SessionKey key) {
        SessionInfo info = new SessionInfo(Instant.now().toString());
        session.save(key, "_session_info", info);
    }
}

// 定期清理
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
SessionCleanupService cleanup = new SessionCleanupService(session, Duration.ofDays(30));

scheduler.scheduleAtFixedRate(
    cleanup::cleanupExpiredSessions,
    0, 1, TimeUnit.HOURS
);

21.7.3 错误处理

/**
 * 带错误处理的会话操作
 */
public class RobustSessionManager {
    
    private final Session session;
    
    public void saveWithRetry(SessionKey key, String stateKey, State value) {
        int maxRetries = 3;
        int attempt = 0;
        
        while (attempt < maxRetries) {
            try {
                session.save(key, stateKey, value);
                return;
            } catch (Exception e) {
                attempt++;
                if (attempt >= maxRetries) {
                    log.error("Failed to save state after {} attempts", maxRetries, e);
                    throw new SessionException("Save failed", e);
                }
                // 等待后重试
                try {
                    Thread.sleep(100 * attempt);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new SessionException("Interrupted during retry", ie);
                }
            }
        }
    }
    
    public <T extends State> Optional<T> getWithFallback(
            SessionKey key, String stateKey, Class<T> type, T fallback) {
        try {
            return session.get(key, stateKey, type);
        } catch (Exception e) {
            log.warn("Failed to load state, using fallback", e);
            return Optional.ofNullable(fallback);
        }
    }
}

21.8 本章小结

本章详细介绍了AgentScope-Java的会话管理系统:

  1. Session接口:定义了状态存储的标准接口,支持单值和列表存储
  2. SessionKey:灵活的会话标识设计,支持自定义结构
  3. StateModule:组件状态序列化接口,Agent和Memory等均实现此接口
  4. 内置实现:InMemorySession用于开发测试,JsonSession用于文件持久化
  5. 自定义实现:可扩展为Redis、数据库等存储后端
  6. 最佳实践:会话ID设计、清理策略、错误处理

会话管理是构建生产级Agent应用的基础能力,确保用户体验的连续性和状态的可靠性。

← 返回目录