From 4954d3978cf1967729a5a2d5b90f6baef18974da Mon Sep 17 00:00:00 2001
From: zhou zhou <3272660260@qq.com>
Date: 星期一, 23 三月 2026 09:35:10 +0800
Subject: [PATCH] #ai redis+页面优化
---
rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatServiceImpl.java | 315 +++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 272 insertions(+), 43 deletions(-)
diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatServiceImpl.java b/rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatServiceImpl.java
index 40d5594..320421e 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatServiceImpl.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatServiceImpl.java
@@ -8,12 +8,14 @@
import com.vincent.rsf.server.ai.dto.AiChatErrorDto;
import com.vincent.rsf.server.ai.dto.AiChatMemoryDto;
import com.vincent.rsf.server.ai.dto.AiChatMessageDto;
+import com.vincent.rsf.server.ai.dto.AiChatModelOptionDto;
import com.vincent.rsf.server.ai.dto.AiChatRequest;
import com.vincent.rsf.server.ai.dto.AiChatRuntimeDto;
import com.vincent.rsf.server.ai.dto.AiChatStatusDto;
import com.vincent.rsf.server.ai.dto.AiChatSessionDto;
import com.vincent.rsf.server.ai.dto.AiChatSessionPinRequest;
import com.vincent.rsf.server.ai.dto.AiChatSessionRenameRequest;
+import com.vincent.rsf.server.ai.dto.AiChatThinkingEventDto;
import com.vincent.rsf.server.ai.dto.AiChatToolEventDto;
import com.vincent.rsf.server.ai.dto.AiResolvedConfig;
import com.vincent.rsf.server.ai.entity.AiCallLog;
@@ -26,6 +28,7 @@
import com.vincent.rsf.server.ai.service.AiChatService;
import com.vincent.rsf.server.ai.service.AiChatMemoryService;
import com.vincent.rsf.server.ai.service.AiConfigResolverService;
+import com.vincent.rsf.server.ai.service.AiParamService;
import com.vincent.rsf.server.ai.service.MountedToolCallback;
import com.vincent.rsf.server.ai.service.McpMountRuntimeFactory;
import io.micrometer.observation.ObservationRegistry;
@@ -77,8 +80,10 @@
private final AiConfigResolverService aiConfigResolverService;
private final AiChatMemoryService aiChatMemoryService;
+ private final AiParamService aiParamService;
private final McpMountRuntimeFactory mcpMountRuntimeFactory;
private final AiCallLogService aiCallLogService;
+ private final AiRedisSupport aiRedisSupport;
private final GenericApplicationContext applicationContext;
private final ObservationRegistry observationRegistry;
private final ObjectMapper objectMapper;
@@ -90,24 +95,31 @@
* 璇ユ柟娉曚笉浼氳Е鍙戞ā鍨嬭皟鐢紝鑰屾槸鎶婇厤缃В鏋愮粨鏋滃拰浼氳瘽璁板繂鑱氬悎鎴愬墠绔竴娆℃覆鏌撴墍闇�鐨勫揩鐓с��
*/
@Override
- public AiChatRuntimeDto getRuntime(String promptCode, Long sessionId, Long userId, Long tenantId) {
- AiResolvedConfig config = aiConfigResolverService.resolve(promptCode, tenantId);
+ public AiChatRuntimeDto getRuntime(String promptCode, Long sessionId, Long aiParamId, Long userId, Long tenantId) {
+ AiResolvedConfig config = aiConfigResolverService.resolve(promptCode, tenantId, aiParamId);
+ Long runtimeCacheAiParamId = aiParamId;
+ // runtime 鏄厤缃揩鐓у拰浼氳瘽璁板繂鐨勮仛鍚堣鍥撅紝鍗曠嫭缂撳瓨鑳藉噺灏戜竴娆¢〉闈㈣繘鍏ユ椂鐨勯噸澶嶆嫾瑁呫��
+ AiChatRuntimeDto cached = aiRedisSupport.getRuntime(tenantId, userId, config.getPromptCode(), sessionId, runtimeCacheAiParamId);
+ if (cached != null) {
+ return cached;
+ }
AiChatMemoryDto memory = aiChatMemoryService.getMemory(userId, tenantId, config.getPromptCode(), sessionId);
- return AiChatRuntimeDto.builder()
- .requestId(null)
- .sessionId(memory.getSessionId())
- .promptCode(config.getPromptCode())
- .promptName(config.getPrompt().getName())
- .model(config.getAiParam().getModel())
- .configuredMcpCount(config.getMcpMounts().size())
- .mountedMcpCount(config.getMcpMounts().size())
- .mountedMcpNames(config.getMcpMounts().stream().map(item -> item.getName()).toList())
- .mountErrors(List.of())
- .memorySummary(memory.getMemorySummary())
- .memoryFacts(memory.getMemoryFacts())
- .recentMessageCount(memory.getRecentMessageCount())
- .persistedMessages(memory.getPersistedMessages())
- .build();
+ List<AiChatModelOptionDto> modelOptions = aiParamService.listChatModelOptions(tenantId);
+ AiChatRuntimeDto runtime = buildRuntimeSnapshot(
+ null,
+ memory.getSessionId(),
+ config,
+ modelOptions,
+ config.getMcpMounts().size(),
+ config.getMcpMounts().stream().map(item -> item.getName()).toList(),
+ List.of(),
+ memory
+ );
+ aiRedisSupport.cacheRuntime(tenantId, userId, config.getPromptCode(), sessionId, runtimeCacheAiParamId, runtime);
+ if (memory.getSessionId() != null && !Objects.equals(memory.getSessionId(), sessionId)) {
+ aiRedisSupport.cacheRuntime(tenantId, userId, config.getPromptCode(), memory.getSessionId(), runtimeCacheAiParamId, runtime);
+ }
+ return runtime;
}
/**
@@ -173,13 +185,23 @@
Long sessionId = request.getSessionId();
Long callLogId = null;
String model = null;
+ String resolvedPromptCode = request.getPromptCode();
+ ThinkingTraceEmitter thinkingTraceEmitter = null;
try {
ensureIdentity(userId, tenantId);
AiResolvedConfig config = resolveConfig(request, tenantId);
+ List<AiChatModelOptionDto> modelOptions = aiParamService.listChatModelOptions(tenantId);
+ resolvedPromptCode = config.getPromptCode();
+ if (!aiRedisSupport.allowChatRequest(tenantId, userId, config.getPromptCode())) {
+ throw buildAiException("AI_RATE_LIMITED", AiErrorCategory.REQUEST, "RATE_LIMIT",
+ "褰撳墠鎻愰棶杩囦簬棰戠箒锛岃绋嶅悗鍐嶈瘯", null);
+ }
final String resolvedModel = config.getAiParam().getModel();
model = resolvedModel;
AiChatSession session = resolveSession(request, userId, tenantId, config.getPromptCode());
sessionId = session.getId();
+ // 娴佺姸鎬佽惤 Redis 鐨勭洰鏍囨槸缁欏瀹炰緥鍜屽悗缁繍缁存煡璇㈢暀缁熶竴鍏ュ彛锛屼笉鏇夸唬鏁版嵁搴撴棩蹇椼��
+ aiRedisSupport.markStreamState(requestId, tenantId, userId, sessionId, config.getPromptCode(), "RUNNING", null);
AiChatMemoryDto memory = loadMemory(userId, tenantId, config.getPromptCode(), session.getId());
List<AiChatMessageDto> mergedMessages = mergeMessages(memory.getShortMemoryMessages(), request.getMessages());
AiCallLog callLog = aiCallLogService.startCallLog(
@@ -196,21 +218,16 @@
);
callLogId = callLog.getId();
try (McpMountRuntimeFactory.McpMountRuntime runtime = createRuntime(config, userId)) {
- emitStrict(emitter, "start", AiChatRuntimeDto.builder()
- .requestId(requestId)
- .sessionId(session.getId())
- .promptCode(config.getPromptCode())
- .promptName(config.getPrompt().getName())
- .model(config.getAiParam().getModel())
- .configuredMcpCount(config.getMcpMounts().size())
- .mountedMcpCount(runtime.getMountedCount())
- .mountedMcpNames(runtime.getMountedNames())
- .mountErrors(runtime.getErrors())
- .memorySummary(memory.getMemorySummary())
- .memoryFacts(memory.getMemoryFacts())
- .recentMessageCount(memory.getRecentMessageCount())
- .persistedMessages(memory.getPersistedMessages())
- .build());
+ emitStrict(emitter, "start", buildRuntimeSnapshot(
+ requestId,
+ session.getId(),
+ config,
+ modelOptions,
+ runtime.getMountedCount(),
+ runtime.getMountedNames(),
+ runtime.getErrors(),
+ memory
+ ));
emitSafely(emitter, "status", AiChatStatusDto.builder()
.requestId(requestId)
.sessionId(session.getId())
@@ -221,10 +238,13 @@
.build());
log.info("AI chat started, requestId={}, userId={}, tenantId={}, sessionId={}, model={}",
requestId, userId, tenantId, session.getId(), resolvedModel);
+ thinkingTraceEmitter = new ThinkingTraceEmitter(emitter, requestId, session.getId());
+ thinkingTraceEmitter.startAnalyze();
+ ThinkingTraceEmitter activeThinkingTraceEmitter = thinkingTraceEmitter;
ToolCallback[] observableToolCallbacks = wrapToolCallbacks(
runtime.getToolCallbacks(), emitter, requestId, session.getId(), toolCallSequence,
- toolSuccessCount, toolFailureCount, callLogId, userId, tenantId
+ toolSuccessCount, toolFailureCount, callLogId, userId, tenantId, activeThinkingTraceEmitter
);
Prompt prompt = new Prompt(
buildPromptMessages(memory, mergedMessages, config.getPrompt(), request.getMetadata()),
@@ -237,9 +257,10 @@
String content = extractContent(response);
aiChatMemoryService.saveRound(session, userId, tenantId, request.getMessages(), content);
if (StringUtils.hasText(content)) {
- markFirstToken(firstTokenAtRef, emitter, requestId, session.getId(), resolvedModel, startedAt);
+ markFirstToken(firstTokenAtRef, emitter, requestId, session.getId(), resolvedModel, startedAt, activeThinkingTraceEmitter);
emitStrict(emitter, "delta", buildMessagePayload("requestId", requestId, "content", content));
}
+ activeThinkingTraceEmitter.completeCurrentPhase();
emitDone(emitter, requestId, response.getMetadata(), config.getAiParam().getModel(), session.getId(), startedAt, firstTokenAtRef.get());
emitSafely(emitter, "status", buildTerminalStatus(requestId, session.getId(), "COMPLETED", resolvedModel, startedAt, firstTokenAtRef.get()));
aiCallLogService.completeCallLog(
@@ -253,6 +274,7 @@
toolSuccessCount.get(),
toolFailureCount.get()
);
+ aiRedisSupport.markStreamState(requestId, tenantId, userId, session.getId(), config.getPromptCode(), "COMPLETED", null);
log.info("AI chat completed, requestId={}, sessionId={}, elapsedMs={}, firstTokenLatencyMs={}",
requestId, session.getId(), System.currentTimeMillis() - startedAt, resolveFirstTokenLatency(startedAt, firstTokenAtRef.get()));
emitter.complete();
@@ -267,7 +289,7 @@
lastMetadata.set(response.getMetadata());
String content = extractContent(response);
if (StringUtils.hasText(content)) {
- markFirstToken(firstTokenAtRef, emitter, requestId, session.getId(), resolvedModel, startedAt);
+ markFirstToken(firstTokenAtRef, emitter, requestId, session.getId(), resolvedModel, startedAt, activeThinkingTraceEmitter);
assistantContent.append(content);
emitStrict(emitter, "delta", buildMessagePayload("requestId", requestId, "content", content));
}
@@ -278,6 +300,7 @@
e == null ? "AI 妯″瀷娴佸紡璋冪敤澶辫触" : e.getMessage(), e);
}
aiChatMemoryService.saveRound(session, userId, tenantId, request.getMessages(), assistantContent.toString());
+ activeThinkingTraceEmitter.completeCurrentPhase();
emitDone(emitter, requestId, lastMetadata.get(), config.getAiParam().getModel(), session.getId(), startedAt, firstTokenAtRef.get());
emitSafely(emitter, "status", buildTerminalStatus(requestId, session.getId(), "COMPLETED", resolvedModel, startedAt, firstTokenAtRef.get()));
aiCallLogService.completeCallLog(
@@ -291,18 +314,21 @@
toolSuccessCount.get(),
toolFailureCount.get()
);
+ aiRedisSupport.markStreamState(requestId, tenantId, userId, session.getId(), config.getPromptCode(), "COMPLETED", null);
log.info("AI chat completed, requestId={}, sessionId={}, elapsedMs={}, firstTokenLatencyMs={}",
requestId, session.getId(), System.currentTimeMillis() - startedAt, resolveFirstTokenLatency(startedAt, firstTokenAtRef.get()));
emitter.complete();
}
} catch (AiChatException e) {
handleStreamFailure(emitter, requestId, sessionId, model, startedAt, firstTokenAtRef.get(), e,
- callLogId, toolSuccessCount.get(), toolFailureCount.get());
+ callLogId, toolSuccessCount.get(), toolFailureCount.get(), thinkingTraceEmitter,
+ tenantId, userId, resolvedPromptCode);
} catch (Exception e) {
handleStreamFailure(emitter, requestId, sessionId, model, startedAt, firstTokenAtRef.get(),
buildAiException("AI_INTERNAL_ERROR", AiErrorCategory.INTERNAL, "INTERNAL",
e == null ? "AI 瀵硅瘽澶辫触" : e.getMessage(), e),
- callLogId, toolSuccessCount.get(), toolFailureCount.get());
+ callLogId, toolSuccessCount.get(), toolFailureCount.get(), thinkingTraceEmitter,
+ tenantId, userId, resolvedPromptCode);
} finally {
log.debug("AI chat stream finished, requestId={}", requestId);
}
@@ -320,11 +346,34 @@
private AiResolvedConfig resolveConfig(AiChatRequest request, Long tenantId) {
/** 鎶婅姹傞噷鐨� Prompt 鍦烘櫙瑙f瀽鎴愪竴浠藉彲鐩存帴鎵ц鐨� AI 閰嶇疆銆� */
try {
- return aiConfigResolverService.resolve(request.getPromptCode(), tenantId);
+ return aiConfigResolverService.resolve(request.getPromptCode(), tenantId, request.getAiParamId());
} catch (Exception e) {
throw buildAiException("AI_CONFIG_RESOLVE_ERROR", AiErrorCategory.CONFIG, "CONFIG_RESOLVE",
e == null ? "AI 閰嶇疆瑙f瀽澶辫触" : e.getMessage(), e);
}
+ }
+
+ private AiChatRuntimeDto buildRuntimeSnapshot(String requestId, Long sessionId, AiResolvedConfig config,
+ List<AiChatModelOptionDto> modelOptions, Integer mountedMcpCount,
+ List<String> mountedMcpNames, List<String> mountErrors,
+ AiChatMemoryDto memory) {
+ return AiChatRuntimeDto.builder()
+ .requestId(requestId)
+ .sessionId(sessionId)
+ .aiParamId(config.getAiParam().getId())
+ .promptCode(config.getPromptCode())
+ .promptName(config.getPrompt().getName())
+ .model(config.getAiParam().getModel())
+ .modelOptions(modelOptions)
+ .configuredMcpCount(config.getMcpMounts().size())
+ .mountedMcpCount(mountedMcpCount)
+ .mountedMcpNames(mountedMcpNames)
+ .mountErrors(mountErrors)
+ .memorySummary(memory.getMemorySummary())
+ .memoryFacts(memory.getMemoryFacts())
+ .recentMessageCount(memory.getRecentMessageCount())
+ .persistedMessages(memory.getPersistedMessages())
+ .build();
}
private AiChatSession resolveSession(AiChatRequest request, Long userId, Long tenantId, String promptCode) {
@@ -375,9 +424,13 @@
}
}
- private void markFirstToken(AtomicReference<Long> firstTokenAtRef, SseEmitter emitter, String requestId, Long sessionId, String model, long startedAt) {
+ private void markFirstToken(AtomicReference<Long> firstTokenAtRef, SseEmitter emitter, String requestId,
+ Long sessionId, String model, long startedAt, ThinkingTraceEmitter thinkingTraceEmitter) {
if (!firstTokenAtRef.compareAndSet(null, System.currentTimeMillis())) {
return;
+ }
+ if (thinkingTraceEmitter != null) {
+ thinkingTraceEmitter.startAnswer();
}
emitSafely(emitter, "status", AiChatStatusDto.builder()
.requestId(requestId)
@@ -408,10 +461,15 @@
private void handleStreamFailure(SseEmitter emitter, String requestId, Long sessionId, String model, long startedAt,
Long firstTokenAt, AiChatException exception, Long callLogId,
- long toolSuccessCount, long toolFailureCount) {
+ long toolSuccessCount, long toolFailureCount,
+ ThinkingTraceEmitter thinkingTraceEmitter,
+ Long tenantId, Long userId, String promptCode) {
if (isClientAbortException(exception)) {
log.warn("AI chat aborted by client, requestId={}, sessionId={}, stage={}, message={}",
requestId, sessionId, exception.getStage(), exception.getMessage());
+ if (thinkingTraceEmitter != null) {
+ thinkingTraceEmitter.markTerminated("ABORTED");
+ }
emitSafely(emitter, "status", buildTerminalStatus(requestId, sessionId, "ABORTED", model, startedAt, firstTokenAt));
aiCallLogService.failCallLog(
callLogId,
@@ -424,11 +482,15 @@
toolSuccessCount,
toolFailureCount
);
+ aiRedisSupport.markStreamState(requestId, tenantId, userId, sessionId, promptCode, "ABORTED", exception.getMessage());
emitter.completeWithError(exception);
return;
}
log.error("AI chat failed, requestId={}, sessionId={}, category={}, stage={}, message={}",
requestId, sessionId, exception.getCategory(), exception.getStage(), exception.getMessage(), exception);
+ if (thinkingTraceEmitter != null) {
+ thinkingTraceEmitter.markTerminated("FAILED");
+ }
emitSafely(emitter, "status", buildTerminalStatus(requestId, sessionId, "FAILED", model, startedAt, firstTokenAt));
emitSafely(emitter, "error", AiChatErrorDto.builder()
.requestId(requestId)
@@ -450,6 +512,7 @@
toolSuccessCount,
toolFailureCount
);
+ aiRedisSupport.markStreamState(requestId, tenantId, userId, sessionId, promptCode, "FAILED", exception.getMessage());
emitter.completeWithError(exception);
}
@@ -521,7 +584,8 @@
private ToolCallback[] wrapToolCallbacks(ToolCallback[] toolCallbacks, SseEmitter emitter, String requestId,
Long sessionId, AtomicLong toolCallSequence,
AtomicLong toolSuccessCount, AtomicLong toolFailureCount,
- Long callLogId, Long userId, Long tenantId) {
+ Long callLogId, Long userId, Long tenantId,
+ ThinkingTraceEmitter thinkingTraceEmitter) {
/** 缁欐墍鏈夊伐鍏峰洖璋冨涓婁竴灞傚彲瑙傛祴鍖呰锛岀敤浜庡疄鏃� SSE 杞ㄨ抗鍜屽璁℃棩蹇楄惤搴撱�� */
if (Cools.isEmpty(toolCallbacks)) {
return toolCallbacks;
@@ -532,7 +596,7 @@
continue;
}
wrappedCallbacks.add(new ObservableToolCallback(callback, emitter, requestId, sessionId, toolCallSequence,
- toolSuccessCount, toolFailureCount, callLogId, userId, tenantId));
+ toolSuccessCount, toolFailureCount, callLogId, userId, tenantId, thinkingTraceEmitter));
}
return wrappedCallbacks.toArray(new ToolCallback[0]);
}
@@ -725,6 +789,125 @@
return false;
}
+ private class ThinkingTraceEmitter {
+
+ private final SseEmitter emitter;
+ private final String requestId;
+ private final Long sessionId;
+ private String currentPhase;
+ private String currentStatus;
+
+ private ThinkingTraceEmitter(SseEmitter emitter, String requestId, Long sessionId) {
+ this.emitter = emitter;
+ this.requestId = requestId;
+ this.sessionId = sessionId;
+ }
+
+ private void startAnalyze() {
+ if (currentPhase != null) {
+ return;
+ }
+ currentPhase = "ANALYZE";
+ currentStatus = "STARTED";
+ emitThinkingEvent("ANALYZE", "STARTED", "姝e湪鍒嗘瀽闂",
+ "宸叉帴鏀朵綘鐨勯棶棰橈紝姝e湪鐞嗚В鎰忓浘骞跺垽鏂槸鍚﹂渶瑕佽皟鐢ㄥ伐鍏枫��", null);
+ }
+
+ private void onToolStart(String toolName, String toolCallId) {
+ switchPhase("TOOL_CALL", "STARTED", "姝e湪璋冪敤宸ュ叿", "宸插垽鏂渶瑕佽皟鐢ㄥ伐鍏凤紝姝e湪鏌ヨ鐩稿叧淇℃伅銆�", null);
+ currentStatus = "UPDATED";
+ emitThinkingEvent("TOOL_CALL", "UPDATED", "姝e湪璋冪敤宸ュ叿",
+ "姝e湪璋冪敤宸ュ叿 " + safeLabel(toolName, "鏈煡宸ュ叿") + " 鑾峰彇鎵�闇�淇℃伅銆�", toolCallId);
+ }
+
+ private void onToolResult(String toolName, String toolCallId, boolean failed) {
+ currentPhase = "TOOL_CALL";
+ currentStatus = failed ? "FAILED" : "UPDATED";
+ emitThinkingEvent("TOOL_CALL", failed ? "FAILED" : "UPDATED",
+ failed ? "宸ュ叿璋冪敤澶辫触" : "宸ュ叿璋冪敤瀹屾垚",
+ failed
+ ? "宸ュ叿 " + safeLabel(toolName, "鏈煡宸ュ叿") + " 璋冪敤澶辫触锛屾鍦ㄨ瘎浼板け璐ュ奖鍝嶅苟鏁寸悊鍙敤淇℃伅銆�"
+ : "宸ュ叿 " + safeLabel(toolName, "鏈煡宸ュ叿") + " 宸茶繑鍥炵粨鏋滐紝姝e湪缁х画鍒嗘瀽骞舵彁鐐煎叧閿俊鎭��",
+ toolCallId);
+ }
+
+ private void startAnswer() {
+ switchPhase("ANSWER", "STARTED", "姝e湪鏁寸悊绛旀", "宸插畬鎴愬垎鏋愶紝姝e湪缁勭粐鏈�缁堝洖澶嶅唴瀹广��", null);
+ }
+
+ private void completeCurrentPhase() {
+ if (!StringUtils.hasText(currentPhase) || isTerminalStatus(currentStatus)) {
+ return;
+ }
+ currentStatus = "COMPLETED";
+ emitThinkingEvent(currentPhase, "COMPLETED", resolveCompleteTitle(currentPhase),
+ resolveCompleteContent(currentPhase), null);
+ }
+
+ private void markTerminated(String terminalStatus) {
+ if (!StringUtils.hasText(currentPhase) || isTerminalStatus(currentStatus)) {
+ return;
+ }
+ currentStatus = terminalStatus;
+ emitThinkingEvent(currentPhase, terminalStatus,
+ "ABORTED".equals(terminalStatus) ? "鎬濊�冨凡涓" : "鎬濊�冨け璐�",
+ "ABORTED".equals(terminalStatus)
+ ? "鏈疆瀵硅瘽宸茶涓锛屾�濊�冭繃绋嬫彁鍓嶇粨鏉熴��"
+ : "鏈疆瀵硅瘽鍦ㄧ敓鎴愮瓟妗堝墠澶辫触锛屽綋鍓嶆�濊�冭繃绋嬪凡鍋滄銆�",
+ null);
+ }
+
+ private void switchPhase(String nextPhase, String nextStatus, String title, String content, String toolCallId) {
+ if (!Objects.equals(currentPhase, nextPhase)) {
+ completeCurrentPhase();
+ }
+ currentPhase = nextPhase;
+ currentStatus = nextStatus;
+ emitThinkingEvent(nextPhase, nextStatus, title, content, toolCallId);
+ }
+
+ private void emitThinkingEvent(String phase, String status, String title, String content, String toolCallId) {
+ emitSafely(emitter, "thinking", AiChatThinkingEventDto.builder()
+ .requestId(requestId)
+ .sessionId(sessionId)
+ .phase(phase)
+ .status(status)
+ .title(title)
+ .content(content)
+ .toolCallId(toolCallId)
+ .timestamp(Instant.now().toEpochMilli())
+ .build());
+ }
+
+ private boolean isTerminalStatus(String status) {
+ return "COMPLETED".equals(status) || "FAILED".equals(status) || "ABORTED".equals(status);
+ }
+
+ private String resolveCompleteTitle(String phase) {
+ if ("ANSWER".equals(phase)) {
+ return "绛旀鏁寸悊瀹屾垚";
+ }
+ if ("TOOL_CALL".equals(phase)) {
+ return "宸ュ叿鍒嗘瀽瀹屾垚";
+ }
+ return "闂鍒嗘瀽瀹屾垚";
+ }
+
+ private String resolveCompleteContent(String phase) {
+ if ("ANSWER".equals(phase)) {
+ return "鏈�缁堢瓟澶嶅凡鐢熸垚瀹屾垚銆�";
+ }
+ if ("TOOL_CALL".equals(phase)) {
+ return "宸ュ叿璋冪敤闃舵宸茬粨鏉燂紝鐩稿叧淇℃伅宸叉暣鐞嗗畬姣曘��";
+ }
+ return "闂鎰忓浘鍜屽鐞嗘柟鍚戝凡鍒嗘瀽瀹屾垚銆�";
+ }
+
+ private String safeLabel(String value, String fallback) {
+ return StringUtils.hasText(value) ? value : fallback;
+ }
+ }
+
private class ObservableToolCallback implements ToolCallback {
private final ToolCallback delegate;
@@ -737,11 +920,13 @@
private final Long callLogId;
private final Long userId;
private final Long tenantId;
+ private final ThinkingTraceEmitter thinkingTraceEmitter;
private ObservableToolCallback(ToolCallback delegate, SseEmitter emitter, String requestId,
Long sessionId, AtomicLong toolCallSequence,
AtomicLong toolSuccessCount, AtomicLong toolFailureCount,
- Long callLogId, Long userId, Long tenantId) {
+ Long callLogId, Long userId, Long tenantId,
+ ThinkingTraceEmitter thinkingTraceEmitter) {
this.delegate = delegate;
this.emitter = emitter;
this.requestId = requestId;
@@ -752,6 +937,7 @@
this.callLogId = callLogId;
this.userId = userId;
this.tenantId = tenantId;
+ this.thinkingTraceEmitter = thinkingTraceEmitter;
}
@Override
@@ -780,6 +966,41 @@
String mountName = delegate instanceof MountedToolCallback ? ((MountedToolCallback) delegate).getMountName() : null;
String toolCallId = requestId + "-tool-" + toolCallSequence.incrementAndGet();
long startedAt = System.currentTimeMillis();
+ // 杩欓噷鍙鍚屼竴 request 鍐呯殑閲嶅宸ュ叿璋冪敤鍋氱煭鏈熷鐢紝閬垮厤鎶婅法璇锋眰缁撴灉璇綋鎴愰�氱敤缂撳瓨銆�
+ AiRedisSupport.CachedToolResult cachedToolResult = aiRedisSupport.getToolResult(tenantId, requestId, toolName, toolInput);
+ if (cachedToolResult != null) {
+ emitSafely(emitter, "tool_result", AiChatToolEventDto.builder()
+ .requestId(requestId)
+ .sessionId(sessionId)
+ .toolCallId(toolCallId)
+ .toolName(toolName)
+ .mountName(mountName)
+ .status(cachedToolResult.isSuccess() ? "COMPLETED" : "FAILED")
+ .inputSummary(summarizeToolPayload(toolInput, 400))
+ .outputSummary(summarizeToolPayload(cachedToolResult.getOutput(), 600))
+ .errorMessage(cachedToolResult.getErrorMessage())
+ .durationMs(0L)
+ .timestamp(System.currentTimeMillis())
+ .build());
+ if (thinkingTraceEmitter != null) {
+ thinkingTraceEmitter.onToolResult(toolName, toolCallId, !cachedToolResult.isSuccess());
+ }
+ if (cachedToolResult.isSuccess()) {
+ toolSuccessCount.incrementAndGet();
+ aiCallLogService.saveMcpCallLog(callLogId, requestId, sessionId, toolCallId, mountName, toolName,
+ "COMPLETED", summarizeToolPayload(toolInput, 400), summarizeToolPayload(cachedToolResult.getOutput(), 600),
+ null, 0L, userId, tenantId);
+ return cachedToolResult.getOutput();
+ }
+ toolFailureCount.incrementAndGet();
+ aiCallLogService.saveMcpCallLog(callLogId, requestId, sessionId, toolCallId, mountName, toolName,
+ "FAILED", summarizeToolPayload(toolInput, 400), null, cachedToolResult.getErrorMessage(),
+ 0L, userId, tenantId);
+ throw new CoolException(cachedToolResult.getErrorMessage());
+ }
+ if (thinkingTraceEmitter != null) {
+ thinkingTraceEmitter.onToolStart(toolName, toolCallId);
+ }
emitSafely(emitter, "tool_start", AiChatToolEventDto.builder()
.requestId(requestId)
.sessionId(sessionId)
@@ -805,6 +1026,10 @@
.durationMs(durationMs)
.timestamp(System.currentTimeMillis())
.build());
+ if (thinkingTraceEmitter != null) {
+ thinkingTraceEmitter.onToolResult(toolName, toolCallId, false);
+ }
+ aiRedisSupport.cacheToolResult(tenantId, requestId, toolName, toolInput, true, output, null);
toolSuccessCount.incrementAndGet();
aiCallLogService.saveMcpCallLog(callLogId, requestId, sessionId, toolCallId, mountName, toolName,
"COMPLETED", summarizeToolPayload(toolInput, 400), summarizeToolPayload(output, 600),
@@ -824,6 +1049,10 @@
.durationMs(durationMs)
.timestamp(System.currentTimeMillis())
.build());
+ if (thinkingTraceEmitter != null) {
+ thinkingTraceEmitter.onToolResult(toolName, toolCallId, true);
+ }
+ aiRedisSupport.cacheToolResult(tenantId, requestId, toolName, toolInput, false, null, e.getMessage());
toolFailureCount.incrementAndGet();
aiCallLogService.saveMcpCallLog(callLogId, requestId, sessionId, toolCallId, mountName, toolName,
"FAILED", summarizeToolPayload(toolInput, 400), null, e.getMessage(),
--
Gitblit v1.9.1