zhou zhou
22 小时以前 4954d3978cf1967729a5a2d5b90f6baef18974da
rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatMemoryServiceImpl.java
@@ -16,29 +16,55 @@
import com.vincent.rsf.server.ai.service.AiChatMemoryService;
import com.vincent.rsf.server.system.enums.StatusType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
@RequiredArgsConstructor
public class AiChatMemoryServiceImpl implements AiChatMemoryService {
    private final AiChatSessionMapper aiChatSessionMapper;
    private final AiChatMessageMapper aiChatMessageMapper;
    private final AiRedisSupport aiRedisSupport;
    @Qualifier("aiMemoryTaskExecutor")
    private final Executor aiMemoryTaskExecutor;
    /**
     * 用两个本地集合把“同一个会话的摘要刷新”合并成串行任务,避免连续消息把重复任务塞满线程池。
     */
    private final Set<Long> refreshingSessionIds = ConcurrentHashMap.newKeySet();
    private final Set<Long> pendingRefreshSessionIds = ConcurrentHashMap.newKeySet();
    /**
     * 读取会话记忆快照。
     * 返回结果同时包含完整落库历史、短期记忆窗口以及摘要/事实记忆,
     * 便于调用方按不同用途选择数据粒度。
     */
    @Override
    public AiChatMemoryDto getMemory(Long userId, Long tenantId, String promptCode, Long sessionId) {
        ensureIdentity(userId, tenantId);
        String resolvedPromptCode = requirePromptCode(promptCode);
        // 会话记忆属于典型“读多写少”数据,先走短 TTL 缓存能明显减轻抽屉初始化和切会话压力。
        AiChatMemoryDto cached = aiRedisSupport.getMemory(tenantId, userId, resolvedPromptCode, sessionId);
        if (cached != null) {
            return cached;
        }
        AiChatSession session = sessionId == null
                ? findLatestSession(userId, tenantId, resolvedPromptCode)
                : getSession(sessionId, userId, tenantId, resolvedPromptCode);
        AiChatMemoryDto memory;
        if (session == null) {
            return AiChatMemoryDto.builder()
            memory = AiChatMemoryDto.builder()
                    .sessionId(null)
                    .memorySummary(null)
                    .memoryFacts(null)
@@ -46,10 +72,12 @@
                    .persistedMessages(List.of())
                    .shortMemoryMessages(List.of())
                    .build();
            aiRedisSupport.cacheMemory(tenantId, userId, resolvedPromptCode, sessionId, memory);
            return memory;
        }
        List<AiChatMessageDto> persistedMessages = listMessages(session.getId());
        List<AiChatMessageDto> shortMemoryMessages = tailMessagesByRounds(persistedMessages, AiDefaults.MEMORY_RECENT_ROUNDS);
        return AiChatMemoryDto.builder()
        memory = AiChatMemoryDto.builder()
                .sessionId(session.getId())
                .memorySummary(session.getMemorySummary())
                .memoryFacts(session.getMemoryFacts())
@@ -57,12 +85,25 @@
                .persistedMessages(persistedMessages)
                .shortMemoryMessages(shortMemoryMessages)
                .build();
        aiRedisSupport.cacheMemory(tenantId, userId, resolvedPromptCode, session.getId(), memory);
        if (sessionId == null || !session.getId().equals(sessionId)) {
            aiRedisSupport.cacheMemory(tenantId, userId, resolvedPromptCode, null, memory);
        }
        return memory;
    }
    /**
     * 查询当前用户在某个 Prompt 下的会话列表。
     * 列表只返回用于侧边栏展示的摘要信息,不返回完整对话内容。
     */
    @Override
    public List<AiChatSessionDto> listSessions(Long userId, Long tenantId, String promptCode, String keyword) {
        ensureIdentity(userId, tenantId);
        String resolvedPromptCode = requirePromptCode(promptCode);
        List<AiChatSessionDto> cached = aiRedisSupport.getSessionList(tenantId, userId, resolvedPromptCode, keyword);
        if (cached != null) {
            return cached;
        }
        List<AiChatSession> sessions = aiChatSessionMapper.selectList(new LambdaQueryWrapper<AiChatSession>()
                .eq(AiChatSession::getUserId, userId)
                .eq(AiChatSession::getTenantId, tenantId)
@@ -74,15 +115,21 @@
                .orderByDesc(AiChatSession::getLastMessageTime)
                .orderByDesc(AiChatSession::getId));
        if (Cools.isEmpty(sessions)) {
            aiRedisSupport.cacheSessionList(tenantId, userId, resolvedPromptCode, keyword, List.of());
            return List.of();
        }
        List<AiChatSessionDto> result = new ArrayList<>();
        for (AiChatSession session : sessions) {
            result.add(buildSessionDto(session));
        }
        aiRedisSupport.cacheSessionList(tenantId, userId, resolvedPromptCode, keyword, result);
        return result;
    }
    /**
     * 解析本轮请求应该落到哪个会话。
     * 如果前端带了 sessionId 则做归属校验并复用;否则自动创建新会话。
     */
    @Override
    public AiChatSession resolveSession(Long userId, Long tenantId, String promptCode, Long sessionId, String titleSeed) {
        ensureIdentity(userId, tenantId);
@@ -105,9 +152,15 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.insert(session);
        evictConversationCaches(tenantId, userId);
        return session;
    }
    /**
     * 落库保存一整轮对话。
     * 这里会顺序写入本轮用户消息和模型回复,并在最后刷新会话标题和活跃时间。
     * 记忆画像改为后台异步刷新,避免把摘要重算耗时压在用户本轮响应尾部。
     */
    @Override
    public void saveRound(AiChatSession session, Long userId, Long tenantId, List<AiChatMessageDto> memoryMessages, String assistantContent) {
        if (session == null || session.getId() == null) {
@@ -133,9 +186,11 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.updateById(update);
        refreshMemoryProfile(session.getId(), userId);
        evictConversationCaches(tenantId, userId);
        scheduleMemoryProfileRefresh(session.getId(), userId, tenantId);
    }
    /** 删除整个会话及其消息。 */
    @Override
    public void removeSession(Long userId, Long tenantId, Long sessionId) {
        ensureIdentity(userId, tenantId);
@@ -167,8 +222,10 @@
                    .setDeleted(1);
            aiChatMessageMapper.updateById(updateMessage);
        }
        evictConversationCaches(tenantId, userId);
    }
    /** 更新会话标题并返回最新会话摘要。 */
    @Override
    public AiChatSessionDto renameSession(Long userId, Long tenantId, Long sessionId, AiChatSessionRenameRequest request) {
        ensureIdentity(userId, tenantId);
@@ -183,9 +240,12 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.updateById(update);
        return buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        AiChatSessionDto sessionDto = buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        evictConversationCaches(tenantId, userId);
        return sessionDto;
    }
    /** 更新会话置顶状态。 */
    @Override
    public AiChatSessionDto pinSession(Long userId, Long tenantId, Long sessionId, AiChatSessionPinRequest request) {
        ensureIdentity(userId, tenantId);
@@ -200,9 +260,12 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.updateById(update);
        return buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        AiChatSessionDto sessionDto = buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        evictConversationCaches(tenantId, userId);
        return sessionDto;
    }
    /** 清空某个会话的全部消息和派生记忆字段。 */
    @Override
    public void clearSessionMemory(Long userId, Long tenantId, Long sessionId) {
        ensureIdentity(userId, tenantId);
@@ -222,8 +285,10 @@
                .setUpdateBy(userId)
                .setUpdateTime(new Date())
                .setLastMessageTime(session.getCreateTime()));
        evictConversationCaches(tenantId, userId);
    }
    /** 只保留最近一轮问答,用于手动裁剪长会话。 */
    @Override
    public void retainLatestRound(Long userId, Long tenantId, Long sessionId) {
        ensureIdentity(userId, tenantId);
@@ -241,7 +306,46 @@
                        .setDeleted(1));
            }
        }
        refreshMemoryProfile(sessionId, userId);
        evictConversationCaches(tenantId, userId);
        scheduleMemoryProfileRefresh(sessionId, userId, tenantId);
    }
    private void evictConversationCaches(Long tenantId, Long userId) {
        // 会话标题、摘要、最近消息和 runtime 都会互相影响,统一按用户维度一起失效更稳妥。
        aiRedisSupport.evictUserConversationCaches(tenantId, userId);
    }
    private void scheduleMemoryProfileRefresh(Long sessionId, Long userId, Long tenantId) {
        if (sessionId == null) {
            return;
        }
        if (!refreshingSessionIds.add(sessionId)) {
            pendingRefreshSessionIds.add(sessionId);
            return;
        }
        aiMemoryTaskExecutor.execute(() -> runMemoryProfileRefreshLoop(sessionId, userId, tenantId));
    }
    private void runMemoryProfileRefreshLoop(Long sessionId, Long userId, Long tenantId) {
        try {
            boolean shouldContinue;
            do {
                pendingRefreshSessionIds.remove(sessionId);
                try {
                    refreshMemoryProfile(sessionId, userId);
                    evictConversationCaches(tenantId, userId);
                } catch (Exception e) {
                    log.warn("AI memory profile refresh failed, sessionId={}, userId={}, tenantId={}, message={}",
                            sessionId, userId, tenantId, e.getMessage(), e);
                }
                shouldContinue = pendingRefreshSessionIds.remove(sessionId);
            } while (shouldContinue);
        } finally {
            refreshingSessionIds.remove(sessionId);
            if (pendingRefreshSessionIds.remove(sessionId) && refreshingSessionIds.add(sessionId)) {
                aiMemoryTaskExecutor.execute(() -> runMemoryProfileRefreshLoop(sessionId, userId, tenantId));
            }
        }
    }
    private AiChatSession findLatestSession(Long userId, Long tenantId, String promptCode) {
@@ -307,6 +411,7 @@
    }
    private List<AiChatMessageDto> normalizeMessages(List<AiChatMessageDto> memoryMessages) {
        /** 清洗前端上传的内存消息,只允许 user/assistant 两类角色落库。 */
        List<AiChatMessageDto> normalized = new ArrayList<>();
        if (Cools.isEmpty(memoryMessages)) {
            return normalized;
@@ -372,6 +477,10 @@
    }
    private String buildSessionTitle(String titleSeed) {
        /**
         * 把首轮用户问题压缩成适合作为会话标题的短摘要。
         * 这里会去掉换行、连续空白,并优先在自然语义断点处截断。
         */
        if (!StringUtils.hasText(titleSeed)) {
            throw new CoolException("AI 会话标题不能为空");
        }
@@ -429,6 +538,11 @@
    }
    private void refreshMemoryProfile(Long sessionId, Long userId) {
        /**
         * 重新计算会话的摘要记忆和关键事实。
         * 这是“持久化消息”和“模型上下文治理”之间的桥梁方法。
         * 现在它运行在后台线程里,因此允许短时间最终一致,而不是强制本轮同步完成。
         */
        List<AiChatMessageDto> messages = listMessages(sessionId);
        List<AiChatMessageDto> shortMemoryMessages = tailMessagesByRounds(messages, AiDefaults.MEMORY_RECENT_ROUNDS);
        List<AiChatMessageDto> historyMessages = messages.size() > shortMemoryMessages.size()
@@ -454,6 +568,7 @@
    }
    private List<AiChatMessageDto> tailMessagesByRounds(List<AiChatMessageDto> source, int rounds) {
        /** 按“用户发言轮次”裁剪最近消息,而不是简单按条数截断。 */
        if (Cools.isEmpty(source) || rounds <= 0) {
            return List.of();
        }
@@ -492,6 +607,7 @@
    }
    private String buildMemorySummary(List<AiChatMessageDto> historyMessages) {
        /** 为较早历史生成可直接插入系统消息的文本摘要。 */
        StringBuilder builder = new StringBuilder("较早对话摘要:\n");
        for (AiChatMessageDto item : historyMessages) {
            if (item == null || !StringUtils.hasText(item.getContent())) {
@@ -511,6 +627,7 @@
    }
    private String buildMemoryFacts(List<AiChatMessageDto> messages) {
        /** 从最近用户关注点中提炼关键事实,作为轻量持久记忆。 */
        if (Cools.isEmpty(messages)) {
            return null;
        }