zhou zhou
20 小时以前 4954d3978cf1967729a5a2d5b90f6baef18974da
rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatMemoryServiceImpl.java
@@ -16,19 +16,34 @@
import com.vincent.rsf.server.ai.service.AiChatMemoryService;
import com.vincent.rsf.server.system.enums.StatusType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
@RequiredArgsConstructor
public class AiChatMemoryServiceImpl implements AiChatMemoryService {
    private final AiChatSessionMapper aiChatSessionMapper;
    private final AiChatMessageMapper aiChatMessageMapper;
    private final AiRedisSupport aiRedisSupport;
    @Qualifier("aiMemoryTaskExecutor")
    private final Executor aiMemoryTaskExecutor;
    /**
     * 用两个本地集合把“同一个会话的摘要刷新”合并成串行任务,避免连续消息把重复任务塞满线程池。
     */
    private final Set<Long> refreshingSessionIds = ConcurrentHashMap.newKeySet();
    private final Set<Long> pendingRefreshSessionIds = ConcurrentHashMap.newKeySet();
    /**
     * 读取会话记忆快照。
@@ -39,11 +54,17 @@
    public AiChatMemoryDto getMemory(Long userId, Long tenantId, String promptCode, Long sessionId) {
        ensureIdentity(userId, tenantId);
        String resolvedPromptCode = requirePromptCode(promptCode);
        // 会话记忆属于典型“读多写少”数据,先走短 TTL 缓存能明显减轻抽屉初始化和切会话压力。
        AiChatMemoryDto cached = aiRedisSupport.getMemory(tenantId, userId, resolvedPromptCode, sessionId);
        if (cached != null) {
            return cached;
        }
        AiChatSession session = sessionId == null
                ? findLatestSession(userId, tenantId, resolvedPromptCode)
                : getSession(sessionId, userId, tenantId, resolvedPromptCode);
        AiChatMemoryDto memory;
        if (session == null) {
            return AiChatMemoryDto.builder()
            memory = AiChatMemoryDto.builder()
                    .sessionId(null)
                    .memorySummary(null)
                    .memoryFacts(null)
@@ -51,10 +72,12 @@
                    .persistedMessages(List.of())
                    .shortMemoryMessages(List.of())
                    .build();
            aiRedisSupport.cacheMemory(tenantId, userId, resolvedPromptCode, sessionId, memory);
            return memory;
        }
        List<AiChatMessageDto> persistedMessages = listMessages(session.getId());
        List<AiChatMessageDto> shortMemoryMessages = tailMessagesByRounds(persistedMessages, AiDefaults.MEMORY_RECENT_ROUNDS);
        return AiChatMemoryDto.builder()
        memory = AiChatMemoryDto.builder()
                .sessionId(session.getId())
                .memorySummary(session.getMemorySummary())
                .memoryFacts(session.getMemoryFacts())
@@ -62,6 +85,11 @@
                .persistedMessages(persistedMessages)
                .shortMemoryMessages(shortMemoryMessages)
                .build();
        aiRedisSupport.cacheMemory(tenantId, userId, resolvedPromptCode, session.getId(), memory);
        if (sessionId == null || !session.getId().equals(sessionId)) {
            aiRedisSupport.cacheMemory(tenantId, userId, resolvedPromptCode, null, memory);
        }
        return memory;
    }
    /**
@@ -72,6 +100,10 @@
    public List<AiChatSessionDto> listSessions(Long userId, Long tenantId, String promptCode, String keyword) {
        ensureIdentity(userId, tenantId);
        String resolvedPromptCode = requirePromptCode(promptCode);
        List<AiChatSessionDto> cached = aiRedisSupport.getSessionList(tenantId, userId, resolvedPromptCode, keyword);
        if (cached != null) {
            return cached;
        }
        List<AiChatSession> sessions = aiChatSessionMapper.selectList(new LambdaQueryWrapper<AiChatSession>()
                .eq(AiChatSession::getUserId, userId)
                .eq(AiChatSession::getTenantId, tenantId)
@@ -83,12 +115,14 @@
                .orderByDesc(AiChatSession::getLastMessageTime)
                .orderByDesc(AiChatSession::getId));
        if (Cools.isEmpty(sessions)) {
            aiRedisSupport.cacheSessionList(tenantId, userId, resolvedPromptCode, keyword, List.of());
            return List.of();
        }
        List<AiChatSessionDto> result = new ArrayList<>();
        for (AiChatSession session : sessions) {
            result.add(buildSessionDto(session));
        }
        aiRedisSupport.cacheSessionList(tenantId, userId, resolvedPromptCode, keyword, result);
        return result;
    }
@@ -118,12 +152,14 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.insert(session);
        evictConversationCaches(tenantId, userId);
        return session;
    }
    /**
     * 落库保存一整轮对话。
     * 这里会顺序写入本轮用户消息和模型回复,并在最后刷新会话标题、最后活跃时间和记忆画像。
     * 这里会顺序写入本轮用户消息和模型回复,并在最后刷新会话标题和活跃时间。
     * 记忆画像改为后台异步刷新,避免把摘要重算耗时压在用户本轮响应尾部。
     */
    @Override
    public void saveRound(AiChatSession session, Long userId, Long tenantId, List<AiChatMessageDto> memoryMessages, String assistantContent) {
@@ -150,7 +186,8 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.updateById(update);
        refreshMemoryProfile(session.getId(), userId);
        evictConversationCaches(tenantId, userId);
        scheduleMemoryProfileRefresh(session.getId(), userId, tenantId);
    }
    /** 删除整个会话及其消息。 */
@@ -185,6 +222,7 @@
                    .setDeleted(1);
            aiChatMessageMapper.updateById(updateMessage);
        }
        evictConversationCaches(tenantId, userId);
    }
    /** 更新会话标题并返回最新会话摘要。 */
@@ -202,7 +240,9 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.updateById(update);
        return buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        AiChatSessionDto sessionDto = buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        evictConversationCaches(tenantId, userId);
        return sessionDto;
    }
    /** 更新会话置顶状态。 */
@@ -220,7 +260,9 @@
                .setUpdateBy(userId)
                .setUpdateTime(now);
        aiChatSessionMapper.updateById(update);
        return buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        AiChatSessionDto sessionDto = buildSessionDto(requireOwnedSession(sessionId, userId, tenantId));
        evictConversationCaches(tenantId, userId);
        return sessionDto;
    }
    /** 清空某个会话的全部消息和派生记忆字段。 */
@@ -243,6 +285,7 @@
                .setUpdateBy(userId)
                .setUpdateTime(new Date())
                .setLastMessageTime(session.getCreateTime()));
        evictConversationCaches(tenantId, userId);
    }
    /** 只保留最近一轮问答,用于手动裁剪长会话。 */
@@ -263,7 +306,46 @@
                        .setDeleted(1));
            }
        }
        refreshMemoryProfile(sessionId, userId);
        evictConversationCaches(tenantId, userId);
        scheduleMemoryProfileRefresh(sessionId, userId, tenantId);
    }
    private void evictConversationCaches(Long tenantId, Long userId) {
        // 会话标题、摘要、最近消息和 runtime 都会互相影响,统一按用户维度一起失效更稳妥。
        aiRedisSupport.evictUserConversationCaches(tenantId, userId);
    }
    private void scheduleMemoryProfileRefresh(Long sessionId, Long userId, Long tenantId) {
        if (sessionId == null) {
            return;
        }
        if (!refreshingSessionIds.add(sessionId)) {
            pendingRefreshSessionIds.add(sessionId);
            return;
        }
        aiMemoryTaskExecutor.execute(() -> runMemoryProfileRefreshLoop(sessionId, userId, tenantId));
    }
    private void runMemoryProfileRefreshLoop(Long sessionId, Long userId, Long tenantId) {
        try {
            boolean shouldContinue;
            do {
                pendingRefreshSessionIds.remove(sessionId);
                try {
                    refreshMemoryProfile(sessionId, userId);
                    evictConversationCaches(tenantId, userId);
                } catch (Exception e) {
                    log.warn("AI memory profile refresh failed, sessionId={}, userId={}, tenantId={}, message={}",
                            sessionId, userId, tenantId, e.getMessage(), e);
                }
                shouldContinue = pendingRefreshSessionIds.remove(sessionId);
            } while (shouldContinue);
        } finally {
            refreshingSessionIds.remove(sessionId);
            if (pendingRefreshSessionIds.remove(sessionId) && refreshingSessionIds.add(sessionId)) {
                aiMemoryTaskExecutor.execute(() -> runMemoryProfileRefreshLoop(sessionId, userId, tenantId));
            }
        }
    }
    private AiChatSession findLatestSession(Long userId, Long tenantId, String promptCode) {
@@ -459,6 +541,7 @@
        /**
         * 重新计算会话的摘要记忆和关键事实。
         * 这是“持久化消息”和“模型上下文治理”之间的桥梁方法。
         * 现在它运行在后台线程里,因此允许短时间最终一致,而不是强制本轮同步完成。
         */
        List<AiChatMessageDto> messages = listMessages(sessionId);
        List<AiChatMessageDto> shortMemoryMessages = tailMessagesByRounds(messages, AiDefaults.MEMORY_RECENT_ROUNDS);