package com.vincent.rsf.server.ai.service.diagnosis; import com.fasterxml.jackson.databind.JsonNode; import com.vincent.rsf.server.ai.constant.AiSceneCode; import com.vincent.rsf.server.ai.dto.GatewayChatMessage; import com.vincent.rsf.server.ai.dto.GatewayChatRequest; import com.vincent.rsf.server.ai.model.AiDiagnosticToolResult; import com.vincent.rsf.server.ai.model.AiPromptContext; import com.vincent.rsf.server.ai.service.AiGatewayClient; import com.vincent.rsf.server.ai.service.AiModelRouteRuntimeService; import com.vincent.rsf.server.ai.service.AiPromptRuntimeService; import com.vincent.rsf.server.system.entity.AiDiagnosisPlan; import com.vincent.rsf.server.system.entity.AiDiagnosisRecord; import com.vincent.rsf.server.system.service.AiDiagnosisPlanService; import org.springframework.stereotype.Service; import jakarta.annotation.Resource; import java.io.InterruptedIOException; import java.util.Date; import java.util.List; @Service public class AiDiagnosisPlanRunnerService { @Resource private AiDiagnosisPlanService aiDiagnosisPlanService; @Resource private AiDiagnosticToolService aiDiagnosticToolService; @Resource private AiModelRouteRuntimeService aiModelRouteRuntimeService; @Resource private AiPromptRuntimeService aiPromptRuntimeService; @Resource private AiGatewayClient aiGatewayClient; @Resource private AiDiagnosisRuntimeService aiDiagnosisRuntimeService; /** * 执行一次巡检计划。 * 计划执行本质上复用诊断主链路,只是输入来源从人工提问变成了计划配置。 */ public void runPlan(Long planId, boolean manualTrigger) { AiDiagnosisPlan plan = aiDiagnosisPlanService.getById(planId); if (plan == null) { return; } Date finishTime = new Date(); Date nextRunTime = Integer.valueOf(1).equals(plan.getStatus()) ? aiDiagnosisPlanService.calculateNextRunTime(plan.getCronExpr(), finishTime) : null; Long userId = plan.getUpdateBy() == null ? plan.getCreateBy() : plan.getUpdateBy(); String sessionId = "plan-" + plan.getId() + "-" + System.currentTimeMillis(); String question = plan.getPrompt(); if (question == null || question.trim().isEmpty()) { question = "请对当前WMS系统进行一次巡检诊断,结合库存、任务、设备站点数据识别异常并给出处理建议。"; } AiPromptContext promptContext = new AiPromptContext() .setTenantId(plan.getTenantId()) .setUserId(userId) .setSessionId(sessionId) .setModelCode(plan.getPreferredModelCode()) .setQuestion(question) .setSceneCode(plan.getSceneCode() == null || plan.getSceneCode().trim().isEmpty() ? AiSceneCode.SYSTEM_DIAGNOSE : plan.getSceneCode()); List diagnosticResults = aiDiagnosticToolService.collect(promptContext); String toolSummary = aiDiagnosticToolService.serializeResults(diagnosticResults); AiDiagnosisRecord diagnosisRecord = aiDiagnosisRuntimeService.startDiagnosis( plan.getTenantId(), userId, sessionId, promptContext.getSceneCode(), question ); StringBuilder assistantReply = new StringBuilder(); String finalModelCode = plan.getPreferredModelCode(); String finalErrorMessage = null; boolean success = false; try { List candidates = aiModelRouteRuntimeService.resolveCandidates( plan.getTenantId(), promptContext.getSceneCode(), plan.getPreferredModelCode() ); if (candidates.isEmpty()) { finalErrorMessage = "未找到可用的AI模型配置"; } else { int attemptNo = 1; for (AiModelRouteRuntimeService.RouteCandidate candidate : candidates) { AttemptState attemptState = new AttemptState(); Date requestTime = new Date(); try { GatewayChatRequest gatewayChatRequest = buildGatewayRequest( sessionId, question, candidate, promptContext, diagnosticResults, attemptNo ); aiGatewayClient.stream(gatewayChatRequest, event -> handleGatewayEvent(event, assistantReply, attemptState)); } catch (Exception e) { attemptState.setSuccess(false); attemptState.setErrorMessage(e.getMessage()); attemptState.setInterrupted(isInterruptedError(e)); attemptState.setResponseTime(new Date()); } if (attemptState.getResponseTime() == null) { attemptState.setResponseTime(new Date()); } String actualModelCode = attemptState.getActualModelCode() == null ? candidate.getAttemptModelCode() : attemptState.getActualModelCode(); finalModelCode = actualModelCode; aiDiagnosisRuntimeService.saveCallLog( plan.getTenantId(), userId, sessionId, diagnosisRecord.getId(), candidate.getRouteCode(), actualModelCode, attemptNo, requestTime, attemptState.getResponseTime(), Boolean.TRUE.equals(attemptState.getSuccess()) ? 1 : 0, attemptState.getErrorMessage() ); if (Boolean.TRUE.equals(attemptState.getSuccess())) { aiModelRouteRuntimeService.markSuccess(candidate.getRouteId()); success = assistantReply.length() > 0; if (!success) { finalErrorMessage = "模型未返回有效内容"; } break; } if (!attemptState.isInterrupted()) { aiModelRouteRuntimeService.markFailure(candidate.getRouteId()); } finalErrorMessage = attemptState.getErrorMessage(); if (attemptState.isReceivedDelta() || attemptNo >= candidates.size()) { break; } attemptNo++; } } if (success) { aiDiagnosisRuntimeService.finishDiagnosisSuccess(diagnosisRecord, assistantReply.toString(), finalModelCode, toolSummary); aiDiagnosisPlanService.finishExecution( plan.getId(), 1, diagnosisRecord.getId(), buildPlanMessage(assistantReply.toString(), manualTrigger ? "手动执行成功" : "计划执行成功"), new Date(), nextRunTime ); return; } aiDiagnosisRuntimeService.finishDiagnosisFailure(diagnosisRecord, assistantReply.toString(), finalErrorMessage, toolSummary); aiDiagnosisPlanService.finishExecution( plan.getId(), 0, diagnosisRecord.getId(), buildPlanMessage(finalErrorMessage, manualTrigger ? "手动执行失败" : "计划执行失败"), new Date(), nextRunTime ); } catch (Exception e) { aiDiagnosisRuntimeService.finishDiagnosisFailure(diagnosisRecord, assistantReply.toString(), e.getMessage(), toolSummary); aiDiagnosisPlanService.finishExecution( plan.getId(), 0, diagnosisRecord.getId(), buildPlanMessage(e.getMessage(), manualTrigger ? "手动执行失败" : "计划执行失败"), new Date(), nextRunTime ); } } /** * 为巡检计划组装网关请求。 */ private GatewayChatRequest buildGatewayRequest(String sessionId, String question, AiModelRouteRuntimeService.RouteCandidate candidate, AiPromptContext promptContext, List diagnosticResults, Integer attemptNo) { GatewayChatRequest request = new GatewayChatRequest(); request.setSessionId(sessionId); request.setModelCode(candidate.getAttemptModelCode()); request.setRouteCode(candidate.getRouteCode()); request.setAttemptNo(attemptNo); request.setSystemPrompt(aiPromptRuntimeService.buildSystemPrompt( promptContext.getSceneCode(), candidate.getRuntimeConfig().getSystemPrompt(), promptContext, diagnosticResults )); request.setChatUrl(candidate.getRuntimeConfig().getChatUrl()); request.setApiKey(candidate.getRuntimeConfig().getApiKey()); request.setModelName(candidate.getRuntimeConfig().getModelName()); GatewayChatMessage message = new GatewayChatMessage(); message.setRole("user"); message.setContent(question); request.getMessages().add(message); return request; } /** * 消费计划执行时的流式网关事件。 */ private boolean handleGatewayEvent(JsonNode event, StringBuilder assistantReply, AttemptState attemptState) { String type = event.path("type").asText(); String modelCode = event.path("modelCode").asText(); if ("delta".equals(type)) { String content = event.path("content").asText(""); assistantReply.append(content); attemptState.setReceivedDelta(true); attemptState.setActualModelCode(modelCode); return true; } if ("error".equals(type)) { attemptState.setSuccess(false); attemptState.setActualModelCode(modelCode); attemptState.setErrorMessage(event.path("message").asText("模型调用失败")); attemptState.setResponseTime(parseResponseTime(event)); attemptState.setInterrupted(isInterruptedMessage(attemptState.getErrorMessage())); return false; } if ("done".equals(type)) { attemptState.setSuccess(true); attemptState.setActualModelCode(modelCode); attemptState.setResponseTime(parseResponseTime(event)); return false; } return true; } /** * 从网关事件中提取响应时间。 */ private Date parseResponseTime(JsonNode event) { long millis = event.path("responseTime").asLong(0L); return millis <= 0L ? new Date() : new Date(millis); } /** * 将计划执行结果压缩成适合回写到计划记录的短消息。 */ private String buildPlanMessage(String text, String fallback) { String source = text == null ? "" : text.trim(); if (source.isEmpty()) { return fallback; } return source.length() > 120 ? source.substring(0, 120) : source; } /** * 判断异常链中是否包含中断类错误。 */ private boolean isInterruptedError(Throwable throwable) { Throwable current = throwable; while (current != null) { if (current instanceof InterruptedException || current instanceof InterruptedIOException) { return true; } if (isInterruptedMessage(current.getMessage())) { return true; } current = current.getCause(); } return false; } /** * 根据异常消息文本判断是否属于连接中断类错误。 */ private boolean isInterruptedMessage(String message) { if (message == null || message.trim().isEmpty()) { return false; } String normalized = message.toLowerCase(); return normalized.contains("interrupted") || normalized.contains("broken pipe") || normalized.contains("connection reset") || normalized.contains("forcibly closed"); } private static class AttemptState { private Boolean success; private String actualModelCode; private String errorMessage; private boolean receivedDelta; private boolean interrupted; private Date responseTime; public Boolean getSuccess() { return success; } public void setSuccess(Boolean success) { this.success = success; } public String getActualModelCode() { return actualModelCode; } public void setActualModelCode(String actualModelCode) { this.actualModelCode = actualModelCode; } public String getErrorMessage() { return errorMessage; } public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; } public boolean isReceivedDelta() { return receivedDelta; } public void setReceivedDelta(boolean receivedDelta) { this.receivedDelta = receivedDelta; } public boolean isInterrupted() { return interrupted; } public void setInterrupted(boolean interrupted) { this.interrupted = interrupted; } public Date getResponseTime() { return responseTime; } public void setResponseTime(Date responseTime) { this.responseTime = responseTime; } } }