zhou zhou
5 小时以前 88b3f09a702f8f8515af43bc14242ecca2a667db
#AI2. 对话执行链路治理
5个文件已添加
6个文件已修改
397 ■■■■■ 已修改文件
rsf-admin/src/layout/AiChatDrawer.jsx 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/config/AiAsyncConfig.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/controller/AiChatController.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatDoneDto.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatErrorDto.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatRequest.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatRuntimeDto.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatStatusDto.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/enums/AiErrorCategory.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/exception/AiChatException.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatServiceImpl.java 252 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-admin/src/layout/AiChatDrawer.jsx
@@ -56,6 +56,7 @@
    const runtimeSummary = useMemo(() => {
        return {
            requestId: runtime?.requestId || "--",
            promptName: runtime?.promptName || "--",
            model: runtime?.model || "--",
            mountedMcpCount: runtime?.mountedMcpCount ?? 0,
@@ -234,8 +235,9 @@
                        }
                        if (eventName === "error") {
                            const message = payload?.message || "AI 对话失败";
                            setDrawerError(message);
                            notify(message, { type: "error" });
                            const displayMessage = payload?.requestId ? `${message} [${payload.requestId}]` : message;
                            setDrawerError(displayMessage);
                            notify(displayMessage, { type: "error" });
                        }
                    },
                }
@@ -363,6 +365,7 @@
                    <Box flex={1} display="flex" flexDirection="column" minHeight={0}>
                        <Box px={2} py={1.5}>
                            <Stack direction="row" spacing={1} flexWrap="wrap" useFlexGap>
                                <Chip size="small" label={`Req: ${runtimeSummary.requestId}`} />
                                <Chip size="small" label={`Session: ${sessionId || "--"}`} />
                                <Chip size="small" label={`Prompt: ${runtimeSummary.promptName}`} />
                                <Chip size="small" label={`Model: ${runtimeSummary.model}`} />
@@ -437,6 +440,11 @@
                        <Divider />
                        <Box px={2} py={1.5}>
                            {usage?.elapsedMs != null && (
                                <Typography variant="caption" color="text.secondary" display="block" mb={0.5}>
                                    Elapsed: {usage.elapsedMs} ms{usage?.firstTokenLatencyMs != null ? ` / First token: ${usage.firstTokenLatencyMs} ms` : ""}
                                </Typography>
                            )}
                            {usage?.totalTokens != null && (
                                <Typography variant="caption" color="text.secondary" display="block" mb={1}>
                                    Tokens: prompt {usage?.promptTokens ?? 0} / completion {usage?.completionTokens ?? 0} / total {usage?.totalTokens ?? 0}
rsf-server/src/main/java/com/vincent/rsf/server/ai/config/AiAsyncConfig.java
New file
@@ -0,0 +1,24 @@
package com.vincent.rsf.server.ai.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class AiAsyncConfig {
    @Bean(name = "aiChatTaskExecutor")
    public Executor aiChatTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("ai-chat-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);
        executor.initialize();
        return executor;
    }
}
rsf-server/src/main/java/com/vincent/rsf/server/ai/controller/AiChatController.java
@@ -5,12 +5,17 @@
import com.vincent.rsf.server.ai.service.AiChatService;
import com.vincent.rsf.server.system.controller.BaseController;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
@RestController
@Slf4j
@RequiredArgsConstructor
public class AiChatController extends BaseController {
@@ -39,6 +44,12 @@
    @PreAuthorize("isAuthenticated()")
    @PostMapping(value = "/ai/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter stream(@RequestBody AiChatRequest request) {
        String requestId = StringUtils.hasText(request.getRequestId())
                ? request.getRequestId().trim()
                : UUID.randomUUID().toString().replace("-", "");
        request.setRequestId(requestId);
        log.info("AI chat request accepted, requestId={}, userId={}, tenantId={}, sessionId={}",
                requestId, getLoginUserId(), getTenantId(), request.getSessionId());
        return aiChatService.stream(request, getLoginUserId(), getTenantId());
    }
}
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatDoneDto.java
@@ -7,10 +7,16 @@
@Builder
public class AiChatDoneDto {
    private String requestId;
    private Long sessionId;
    private String model;
    private Long elapsedMs;
    private Long firstTokenLatencyMs;
    private Integer promptTokens;
    private Integer completionTokens;
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatErrorDto.java
New file
@@ -0,0 +1,23 @@
package com.vincent.rsf.server.ai.dto;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class AiChatErrorDto {
    private String requestId;
    private Long sessionId;
    private String code;
    private String category;
    private String stage;
    private String message;
    private Long timestamp;
}
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatRequest.java
@@ -8,6 +8,8 @@
@Data
public class AiChatRequest {
    private String requestId;
    private Long sessionId;
    private List<AiChatMessageDto> messages;
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatRuntimeDto.java
@@ -9,6 +9,8 @@
@Builder
public class AiChatRuntimeDto {
    private String requestId;
    private Long sessionId;
    private String promptCode;
rsf-server/src/main/java/com/vincent/rsf/server/ai/dto/AiChatStatusDto.java
New file
@@ -0,0 +1,23 @@
package com.vincent.rsf.server.ai.dto;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class AiChatStatusDto {
    private String requestId;
    private Long sessionId;
    private String status;
    private String model;
    private Long timestamp;
    private Long elapsedMs;
    private Long firstTokenLatencyMs;
}
rsf-server/src/main/java/com/vincent/rsf/server/ai/enums/AiErrorCategory.java
New file
@@ -0,0 +1,12 @@
package com.vincent.rsf.server.ai.enums;
public enum AiErrorCategory {
    REQUEST,
    AUTH,
    CONFIG,
    MCP,
    MODEL,
    STREAM,
    INTERNAL
}
rsf-server/src/main/java/com/vincent/rsf/server/ai/exception/AiChatException.java
New file
@@ -0,0 +1,30 @@
package com.vincent.rsf.server.ai.exception;
import com.vincent.rsf.framework.exception.CoolException;
import com.vincent.rsf.server.ai.enums.AiErrorCategory;
import lombok.Getter;
@Getter
public class AiChatException extends CoolException {
    private final String code;
    private final AiErrorCategory category;
    private final String stage;
    public AiChatException(String code, AiErrorCategory category, String stage, String message) {
        super(message);
        this.code = code;
        this.category = category;
        this.stage = stage;
    }
    public AiChatException(String code, AiErrorCategory category, String stage, String message, Throwable cause) {
        super(message);
        this.code = code;
        this.category = category;
        this.stage = stage;
        initCause(cause);
    }
}
rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiChatServiceImpl.java
@@ -5,15 +5,19 @@
import com.vincent.rsf.framework.exception.CoolException;
import com.vincent.rsf.server.ai.config.AiDefaults;
import com.vincent.rsf.server.ai.dto.AiChatDoneDto;
import com.vincent.rsf.server.ai.dto.AiChatErrorDto;
import com.vincent.rsf.server.ai.dto.AiChatMemoryDto;
import com.vincent.rsf.server.ai.dto.AiChatMessageDto;
import com.vincent.rsf.server.ai.dto.AiChatRequest;
import com.vincent.rsf.server.ai.dto.AiChatRuntimeDto;
import com.vincent.rsf.server.ai.dto.AiChatStatusDto;
import com.vincent.rsf.server.ai.dto.AiChatSessionDto;
import com.vincent.rsf.server.ai.dto.AiResolvedConfig;
import com.vincent.rsf.server.ai.entity.AiParam;
import com.vincent.rsf.server.ai.entity.AiPrompt;
import com.vincent.rsf.server.ai.entity.AiChatSession;
import com.vincent.rsf.server.ai.enums.AiErrorCategory;
import com.vincent.rsf.server.ai.exception.AiChatException;
import com.vincent.rsf.server.ai.service.AiChatService;
import com.vincent.rsf.server.ai.service.AiChatMemoryService;
import com.vincent.rsf.server.ai.service.AiConfigResolverService;
@@ -41,6 +45,7 @@
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestClient;
@@ -49,6 +54,7 @@
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
@@ -56,6 +62,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@@ -69,12 +76,15 @@
    private final GenericApplicationContext applicationContext;
    private final ObservationRegistry observationRegistry;
    private final ObjectMapper objectMapper;
    @Qualifier("aiChatTaskExecutor")
    private final Executor aiChatTaskExecutor;
    @Override
    public AiChatRuntimeDto getRuntime(String promptCode, Long sessionId, Long userId, Long tenantId) {
        AiResolvedConfig config = aiConfigResolverService.resolve(promptCode, tenantId);
        AiChatMemoryDto memory = aiChatMemoryService.getMemory(userId, tenantId, config.getPromptCode(), sessionId);
        return AiChatRuntimeDto.builder()
                .requestId(null)
                .sessionId(memory.getSessionId())
                .promptCode(config.getPromptCode())
                .promptName(config.getPrompt().getName())
@@ -101,18 +111,28 @@
    @Override
    public SseEmitter stream(AiChatRequest request, Long userId, Long tenantId) {
        SseEmitter emitter = new SseEmitter(AiDefaults.SSE_TIMEOUT_MS);
        CompletableFuture.runAsync(() -> doStream(request, userId, tenantId, emitter));
        CompletableFuture.runAsync(() -> doStream(request, userId, tenantId, emitter), aiChatTaskExecutor);
        return emitter;
    }
    private void doStream(AiChatRequest request, Long userId, Long tenantId, SseEmitter emitter) {
        String requestId = request.getRequestId();
        long startedAt = System.currentTimeMillis();
        AtomicReference<Long> firstTokenAtRef = new AtomicReference<>();
        Long sessionId = request.getSessionId();
        String model = null;
        try {
            AiResolvedConfig config = aiConfigResolverService.resolve(request.getPromptCode(), tenantId);
            AiChatSession session = aiChatMemoryService.resolveSession(userId, tenantId, config.getPromptCode(), request.getSessionId(), resolveTitleSeed(request.getMessages()));
            AiChatMemoryDto memory = aiChatMemoryService.getMemory(userId, tenantId, config.getPromptCode(), session.getId());
            ensureIdentity(userId, tenantId);
            AiResolvedConfig config = resolveConfig(request, tenantId);
            final String resolvedModel = config.getAiParam().getModel();
            model = resolvedModel;
            AiChatSession session = resolveSession(request, userId, tenantId, config.getPromptCode());
            sessionId = session.getId();
            AiChatMemoryDto memory = loadMemory(userId, tenantId, config.getPromptCode(), session.getId());
            List<AiChatMessageDto> mergedMessages = mergeMessages(memory.getPersistedMessages(), request.getMessages());
            try (McpMountRuntimeFactory.McpMountRuntime runtime = mcpMountRuntimeFactory.create(config.getMcpMounts(), userId)) {
                emit(emitter, "start", AiChatRuntimeDto.builder()
            try (McpMountRuntimeFactory.McpMountRuntime runtime = createRuntime(config, userId)) {
                emitStrict(emitter, "start", AiChatRuntimeDto.builder()
                        .requestId(requestId)
                        .sessionId(session.getId())
                        .promptCode(config.getPromptCode())
                        .promptName(config.getPrompt().getName())
@@ -123,6 +143,16 @@
                        .mountErrors(runtime.getErrors())
                        .persistedMessages(memory.getPersistedMessages())
                        .build());
                emitSafely(emitter, "status", AiChatStatusDto.builder()
                        .requestId(requestId)
                        .sessionId(session.getId())
                        .status("STARTED")
                        .model(resolvedModel)
                        .timestamp(Instant.now().toEpochMilli())
                        .elapsedMs(0L)
                        .build());
                log.info("AI chat started, requestId={}, userId={}, tenantId={}, sessionId={}, model={}",
                        requestId, userId, tenantId, session.getId(), resolvedModel);
                Prompt prompt = new Prompt(
                        buildPromptMessages(mergedMessages, config.getPrompt(), request.getMetadata()),
@@ -130,39 +160,172 @@
                );
                OpenAiChatModel chatModel = createChatModel(config.getAiParam());
                if (Boolean.FALSE.equals(config.getAiParam().getStreamingEnabled())) {
                    ChatResponse response = chatModel.call(prompt);
                    ChatResponse response = invokeChatCall(chatModel, prompt);
                    String content = extractContent(response);
                    aiChatMemoryService.saveRound(session, userId, tenantId, request.getMessages(), content);
                    if (StringUtils.hasText(content)) {
                        emit(emitter, "delta", buildMessagePayload("content", content));
                        markFirstToken(firstTokenAtRef, emitter, requestId, session.getId(), resolvedModel, startedAt);
                        emitStrict(emitter, "delta", buildMessagePayload("requestId", requestId, "content", content));
                    }
                    emitDone(emitter, response.getMetadata(), config.getAiParam().getModel(), session.getId());
                    emitDone(emitter, requestId, response.getMetadata(), config.getAiParam().getModel(), session.getId(), startedAt, firstTokenAtRef.get());
                    emitSafely(emitter, "status", buildTerminalStatus(requestId, session.getId(), "COMPLETED", resolvedModel, startedAt, firstTokenAtRef.get()));
                    log.info("AI chat completed, requestId={}, sessionId={}, elapsedMs={}, firstTokenLatencyMs={}",
                            requestId, session.getId(), System.currentTimeMillis() - startedAt, resolveFirstTokenLatency(startedAt, firstTokenAtRef.get()));
                    emitter.complete();
                    return;
                }
                Flux<ChatResponse> responseFlux = chatModel.stream(prompt);
                Flux<ChatResponse> responseFlux = invokeChatStream(chatModel, prompt);
                AtomicReference<ChatResponseMetadata> lastMetadata = new AtomicReference<>();
                StringBuilder assistantContent = new StringBuilder();
                responseFlux.doOnNext(response -> {
                try {
                    responseFlux.doOnNext(response -> {
                            lastMetadata.set(response.getMetadata());
                            String content = extractContent(response);
                            if (StringUtils.hasText(content)) {
                                markFirstToken(firstTokenAtRef, emitter, requestId, session.getId(), resolvedModel, startedAt);
                                assistantContent.append(content);
                                emit(emitter, "delta", buildMessagePayload("content", content));
                                emitStrict(emitter, "delta", buildMessagePayload("requestId", requestId, "content", content));
                            }
                        })
                        .doOnError(error -> emit(emitter, "error", buildMessagePayload("message", error == null ? "AI 对话失败" : error.getMessage())))
                        .blockLast();
                } catch (Exception e) {
                    throw buildAiException("AI_MODEL_STREAM_ERROR", AiErrorCategory.MODEL, "MODEL_STREAM",
                            e == null ? "AI 模型流式调用失败" : e.getMessage(), e);
                }
                aiChatMemoryService.saveRound(session, userId, tenantId, request.getMessages(), assistantContent.toString());
                emitDone(emitter, lastMetadata.get(), config.getAiParam().getModel(), session.getId());
                emitDone(emitter, requestId, lastMetadata.get(), config.getAiParam().getModel(), session.getId(), startedAt, firstTokenAtRef.get());
                emitSafely(emitter, "status", buildTerminalStatus(requestId, session.getId(), "COMPLETED", resolvedModel, startedAt, firstTokenAtRef.get()));
                log.info("AI chat completed, requestId={}, sessionId={}, elapsedMs={}, firstTokenLatencyMs={}",
                        requestId, session.getId(), System.currentTimeMillis() - startedAt, resolveFirstTokenLatency(startedAt, firstTokenAtRef.get()));
                emitter.complete();
            }
        } catch (AiChatException e) {
            handleStreamFailure(emitter, requestId, sessionId, model, startedAt, firstTokenAtRef.get(), e);
        } catch (Exception e) {
            log.error("AI stream error", e);
            emit(emitter, "error", buildMessagePayload("message", e == null ? "AI 对话失败" : e.getMessage()));
            emitter.completeWithError(e);
            handleStreamFailure(emitter, requestId, sessionId, model, startedAt, firstTokenAtRef.get(),
                    buildAiException("AI_INTERNAL_ERROR", AiErrorCategory.INTERNAL, "INTERNAL",
                            e == null ? "AI 对话失败" : e.getMessage(), e));
        } finally {
            log.debug("AI chat stream finished, requestId={}", requestId);
        }
    }
    private void ensureIdentity(Long userId, Long tenantId) {
        if (userId == null) {
            throw buildAiException("AI_AUTH_USER_MISSING", AiErrorCategory.AUTH, "AUTH_VALIDATE", "当前登录用户不存在", null);
        }
        if (tenantId == null) {
            throw buildAiException("AI_AUTH_TENANT_MISSING", AiErrorCategory.AUTH, "AUTH_VALIDATE", "当前租户不存在", null);
        }
    }
    private AiResolvedConfig resolveConfig(AiChatRequest request, Long tenantId) {
        try {
            return aiConfigResolverService.resolve(request.getPromptCode(), tenantId);
        } catch (Exception e) {
            throw buildAiException("AI_CONFIG_RESOLVE_ERROR", AiErrorCategory.CONFIG, "CONFIG_RESOLVE",
                    e == null ? "AI 配置解析失败" : e.getMessage(), e);
        }
    }
    private AiChatSession resolveSession(AiChatRequest request, Long userId, Long tenantId, String promptCode) {
        try {
            return aiChatMemoryService.resolveSession(userId, tenantId, promptCode, request.getSessionId(), resolveTitleSeed(request.getMessages()));
        } catch (Exception e) {
            throw buildAiException("AI_SESSION_RESOLVE_ERROR", AiErrorCategory.REQUEST, "SESSION_RESOLVE",
                    e == null ? "AI 会话解析失败" : e.getMessage(), e);
        }
    }
    private AiChatMemoryDto loadMemory(Long userId, Long tenantId, String promptCode, Long sessionId) {
        try {
            return aiChatMemoryService.getMemory(userId, tenantId, promptCode, sessionId);
        } catch (Exception e) {
            throw buildAiException("AI_MEMORY_LOAD_ERROR", AiErrorCategory.REQUEST, "MEMORY_LOAD",
                    e == null ? "AI 会话记忆加载失败" : e.getMessage(), e);
        }
    }
    private McpMountRuntimeFactory.McpMountRuntime createRuntime(AiResolvedConfig config, Long userId) {
        try {
            return mcpMountRuntimeFactory.create(config.getMcpMounts(), userId);
        } catch (Exception e) {
            throw buildAiException("AI_MCP_MOUNT_ERROR", AiErrorCategory.MCP, "MCP_MOUNT",
                    e == null ? "MCP 挂载失败" : e.getMessage(), e);
        }
    }
    private ChatResponse invokeChatCall(OpenAiChatModel chatModel, Prompt prompt) {
        try {
            return chatModel.call(prompt);
        } catch (Exception e) {
            throw buildAiException("AI_MODEL_CALL_ERROR", AiErrorCategory.MODEL, "MODEL_CALL",
                    e == null ? "AI 模型调用失败" : e.getMessage(), e);
        }
    }
    private Flux<ChatResponse> invokeChatStream(OpenAiChatModel chatModel, Prompt prompt) {
        try {
            return chatModel.stream(prompt);
        } catch (Exception e) {
            throw buildAiException("AI_MODEL_STREAM_ERROR", AiErrorCategory.MODEL, "MODEL_STREAM_INIT",
                    e == null ? "AI 模型流式调用失败" : e.getMessage(), e);
        }
    }
    private void markFirstToken(AtomicReference<Long> firstTokenAtRef, SseEmitter emitter, String requestId, Long sessionId, String model, long startedAt) {
        if (!firstTokenAtRef.compareAndSet(null, System.currentTimeMillis())) {
            return;
        }
        emitSafely(emitter, "status", AiChatStatusDto.builder()
                .requestId(requestId)
                .sessionId(sessionId)
                .status("FIRST_TOKEN")
                .model(model)
                .timestamp(Instant.now().toEpochMilli())
                .elapsedMs(System.currentTimeMillis() - startedAt)
                .firstTokenLatencyMs(resolveFirstTokenLatency(startedAt, firstTokenAtRef.get()))
                .build());
    }
    private AiChatStatusDto buildTerminalStatus(String requestId, Long sessionId, String status, String model, long startedAt, Long firstTokenAt) {
        return AiChatStatusDto.builder()
                .requestId(requestId)
                .sessionId(sessionId)
                .status(status)
                .model(model)
                .timestamp(Instant.now().toEpochMilli())
                .elapsedMs(System.currentTimeMillis() - startedAt)
                .firstTokenLatencyMs(resolveFirstTokenLatency(startedAt, firstTokenAt))
                .build();
    }
    private Long resolveFirstTokenLatency(long startedAt, Long firstTokenAt) {
        return firstTokenAt == null ? null : Math.max(0L, firstTokenAt - startedAt);
    }
    private void handleStreamFailure(SseEmitter emitter, String requestId, Long sessionId, String model, long startedAt, Long firstTokenAt, AiChatException exception) {
        if (isClientAbortException(exception)) {
            log.warn("AI chat aborted by client, requestId={}, sessionId={}, stage={}, message={}",
                    requestId, sessionId, exception.getStage(), exception.getMessage());
            emitSafely(emitter, "status", buildTerminalStatus(requestId, sessionId, "ABORTED", model, startedAt, firstTokenAt));
            emitter.completeWithError(exception);
            return;
        }
        log.error("AI chat failed, requestId={}, sessionId={}, category={}, stage={}, message={}",
                requestId, sessionId, exception.getCategory(), exception.getStage(), exception.getMessage(), exception);
        emitSafely(emitter, "status", buildTerminalStatus(requestId, sessionId, "FAILED", model, startedAt, firstTokenAt));
        emitSafely(emitter, "error", AiChatErrorDto.builder()
                .requestId(requestId)
                .sessionId(sessionId)
                .code(exception.getCode())
                .category(exception.getCategory().name())
                .stage(exception.getStage())
                .message(exception.getMessage())
                .timestamp(Instant.now().toEpochMilli())
                .build());
        emitter.completeWithError(exception);
    }
    private OpenAiChatModel createChatModel(AiParam aiParam) {
@@ -203,7 +366,7 @@
    private OpenAiChatOptions buildChatOptions(AiParam aiParam, ToolCallback[] toolCallbacks, Long userId, Map<String, Object> metadata) {
        if (userId == null) {
            throw new CoolException("当前登录用户不存在");
            throw buildAiException("AI_AUTH_USER_MISSING", AiErrorCategory.AUTH, "OPTIONS_BUILD", "当前登录用户不存在", null);
        }
        OpenAiChatOptions.Builder builder = OpenAiChatOptions.builder()
                .model(aiParam.getModel())
@@ -319,31 +482,72 @@
        return response.getResult().getOutput().getText();
    }
    private void emitDone(SseEmitter emitter, ChatResponseMetadata metadata, String fallbackModel, Long sessionId) {
    private void emitDone(SseEmitter emitter, String requestId, ChatResponseMetadata metadata, String fallbackModel, Long sessionId, long startedAt, Long firstTokenAt) {
        Usage usage = metadata == null ? null : metadata.getUsage();
        emit(emitter, "done", AiChatDoneDto.builder()
        emitStrict(emitter, "done", AiChatDoneDto.builder()
                .requestId(requestId)
                .sessionId(sessionId)
                .model(metadata != null && StringUtils.hasText(metadata.getModel()) ? metadata.getModel() : fallbackModel)
                .elapsedMs(System.currentTimeMillis() - startedAt)
                .firstTokenLatencyMs(resolveFirstTokenLatency(startedAt, firstTokenAt))
                .promptTokens(usage == null ? null : usage.getPromptTokens())
                .completionTokens(usage == null ? null : usage.getCompletionTokens())
                .totalTokens(usage == null ? null : usage.getTotalTokens())
                .build());
    }
    private Map<String, String> buildMessagePayload(String key, String value) {
    private Map<String, String> buildMessagePayload(String... keyValues) {
        Map<String, String> payload = new LinkedHashMap<>();
        payload.put(key, value == null ? "" : value);
        if (keyValues == null || keyValues.length == 0) {
            return payload;
        }
        if (keyValues.length % 2 != 0) {
            throw new CoolException("消息载荷参数必须成对出现");
        }
        for (int i = 0; i < keyValues.length; i += 2) {
            payload.put(keyValues[i], keyValues[i + 1] == null ? "" : keyValues[i + 1]);
        }
        return payload;
    }
    private void emit(SseEmitter emitter, String eventName, Object payload) {
    private void emitStrict(SseEmitter emitter, String eventName, Object payload) {
        try {
            String data = objectMapper.writeValueAsString(payload);
            emitter.send(SseEmitter.event()
                    .name(eventName)
                    .data(data, MediaType.APPLICATION_JSON));
        } catch (IOException e) {
            throw new CoolException("SSE 输出失败: " + e.getMessage());
            throw buildAiException("AI_SSE_EMIT_ERROR", AiErrorCategory.STREAM, "SSE_EMIT", "SSE 输出失败: " + e.getMessage(), e);
        }
    }
    private void emitSafely(SseEmitter emitter, String eventName, Object payload) {
        try {
            emitStrict(emitter, eventName, payload);
        } catch (Exception e) {
            log.warn("AI SSE event emit skipped, eventName={}, message={}", eventName, e.getMessage());
        }
    }
    private AiChatException buildAiException(String code, AiErrorCategory category, String stage, String message, Throwable cause) {
        return new AiChatException(code, category, stage, message, cause);
    }
    private boolean isClientAbortException(Throwable throwable) {
        Throwable current = throwable;
        while (current != null) {
            String message = current.getMessage();
            if (message != null) {
                String normalized = message.toLowerCase();
                if (normalized.contains("broken pipe")
                        || normalized.contains("connection reset")
                        || normalized.contains("forcibly closed")
                        || normalized.contains("abort")) {
                    return true;
                }
            }
            current = current.getCause();
        }
        return false;
    }
}