package com.vincent.rsf.server.ai.service.diagnosis;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.vincent.rsf.server.ai.constant.AiSceneCode;
|
import com.vincent.rsf.server.ai.dto.GatewayChatMessage;
|
import com.vincent.rsf.server.ai.dto.GatewayChatRequest;
|
import com.vincent.rsf.server.ai.model.AiDiagnosticToolResult;
|
import com.vincent.rsf.server.ai.model.AiPromptContext;
|
import com.vincent.rsf.server.ai.service.AiGatewayClient;
|
import com.vincent.rsf.server.ai.service.AiModelRouteRuntimeService;
|
import com.vincent.rsf.server.ai.service.AiPromptRuntimeService;
|
import com.vincent.rsf.server.system.entity.AiDiagnosisPlan;
|
import com.vincent.rsf.server.system.entity.AiDiagnosisRecord;
|
import com.vincent.rsf.server.system.service.AiDiagnosisPlanService;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.Resource;
|
import java.io.InterruptedIOException;
|
import java.util.Date;
|
import java.util.List;
|
|
@Service
|
public class AiDiagnosisPlanRunnerService {
|
|
@Resource
|
private AiDiagnosisPlanService aiDiagnosisPlanService;
|
@Resource
|
private AiDiagnosticToolService aiDiagnosticToolService;
|
@Resource
|
private AiModelRouteRuntimeService aiModelRouteRuntimeService;
|
@Resource
|
private AiPromptRuntimeService aiPromptRuntimeService;
|
@Resource
|
private AiGatewayClient aiGatewayClient;
|
@Resource
|
private AiDiagnosisRuntimeService aiDiagnosisRuntimeService;
|
|
/**
|
* 执行一次巡检计划。
|
* 计划执行本质上复用诊断主链路,只是输入来源从人工提问变成了计划配置。
|
*/
|
public void runPlan(Long planId, boolean manualTrigger) {
|
AiDiagnosisPlan plan = aiDiagnosisPlanService.getById(planId);
|
if (plan == null) {
|
return;
|
}
|
Date finishTime = new Date();
|
Date nextRunTime = Integer.valueOf(1).equals(plan.getStatus())
|
? aiDiagnosisPlanService.calculateNextRunTime(plan.getCronExpr(), finishTime)
|
: null;
|
Long userId = plan.getUpdateBy() == null ? plan.getCreateBy() : plan.getUpdateBy();
|
String sessionId = "plan-" + plan.getId() + "-" + System.currentTimeMillis();
|
String question = plan.getPrompt();
|
if (question == null || question.trim().isEmpty()) {
|
question = "请对当前WMS系统进行一次巡检诊断,结合库存、任务、设备站点数据识别异常并给出处理建议。";
|
}
|
|
AiPromptContext promptContext = new AiPromptContext()
|
.setTenantId(plan.getTenantId())
|
.setUserId(userId)
|
.setSessionId(sessionId)
|
.setModelCode(plan.getPreferredModelCode())
|
.setQuestion(question)
|
.setSceneCode(plan.getSceneCode() == null || plan.getSceneCode().trim().isEmpty()
|
? AiSceneCode.SYSTEM_DIAGNOSE
|
: plan.getSceneCode());
|
|
List<AiDiagnosticToolResult> diagnosticResults = aiDiagnosticToolService.collect(promptContext);
|
String toolSummary = aiDiagnosticToolService.serializeResults(diagnosticResults);
|
AiDiagnosisRecord diagnosisRecord = aiDiagnosisRuntimeService.startDiagnosis(
|
plan.getTenantId(),
|
userId,
|
sessionId,
|
promptContext.getSceneCode(),
|
question
|
);
|
|
StringBuilder assistantReply = new StringBuilder();
|
String finalModelCode = plan.getPreferredModelCode();
|
String finalErrorMessage = null;
|
boolean success = false;
|
|
try {
|
List<AiModelRouteRuntimeService.RouteCandidate> candidates = aiModelRouteRuntimeService.resolveCandidates(
|
plan.getTenantId(),
|
promptContext.getSceneCode(),
|
plan.getPreferredModelCode()
|
);
|
if (candidates.isEmpty()) {
|
finalErrorMessage = "未找到可用的AI模型配置";
|
} else {
|
int attemptNo = 1;
|
for (AiModelRouteRuntimeService.RouteCandidate candidate : candidates) {
|
AttemptState attemptState = new AttemptState();
|
Date requestTime = new Date();
|
try {
|
GatewayChatRequest gatewayChatRequest = buildGatewayRequest(
|
sessionId,
|
question,
|
candidate,
|
promptContext,
|
diagnosticResults,
|
attemptNo
|
);
|
aiGatewayClient.stream(gatewayChatRequest, event -> handleGatewayEvent(event, assistantReply, attemptState));
|
} catch (Exception e) {
|
attemptState.setSuccess(false);
|
attemptState.setErrorMessage(e.getMessage());
|
attemptState.setInterrupted(isInterruptedError(e));
|
attemptState.setResponseTime(new Date());
|
}
|
if (attemptState.getResponseTime() == null) {
|
attemptState.setResponseTime(new Date());
|
}
|
String actualModelCode = attemptState.getActualModelCode() == null
|
? candidate.getAttemptModelCode()
|
: attemptState.getActualModelCode();
|
finalModelCode = actualModelCode;
|
aiDiagnosisRuntimeService.saveCallLog(
|
plan.getTenantId(),
|
userId,
|
sessionId,
|
diagnosisRecord.getId(),
|
candidate.getRouteCode(),
|
actualModelCode,
|
attemptNo,
|
requestTime,
|
attemptState.getResponseTime(),
|
Boolean.TRUE.equals(attemptState.getSuccess()) ? 1 : 0,
|
attemptState.getErrorMessage()
|
);
|
if (Boolean.TRUE.equals(attemptState.getSuccess())) {
|
aiModelRouteRuntimeService.markSuccess(candidate.getRouteId());
|
success = assistantReply.length() > 0;
|
if (!success) {
|
finalErrorMessage = "模型未返回有效内容";
|
}
|
break;
|
}
|
if (!attemptState.isInterrupted()) {
|
aiModelRouteRuntimeService.markFailure(candidate.getRouteId());
|
}
|
finalErrorMessage = attemptState.getErrorMessage();
|
if (attemptState.isReceivedDelta() || attemptNo >= candidates.size()) {
|
break;
|
}
|
attemptNo++;
|
}
|
}
|
|
if (success) {
|
aiDiagnosisRuntimeService.finishDiagnosisSuccess(diagnosisRecord, assistantReply.toString(), finalModelCode, toolSummary);
|
aiDiagnosisPlanService.finishExecution(
|
plan.getId(),
|
1,
|
diagnosisRecord.getId(),
|
buildPlanMessage(assistantReply.toString(), manualTrigger ? "手动执行成功" : "计划执行成功"),
|
new Date(),
|
nextRunTime
|
);
|
return;
|
}
|
aiDiagnosisRuntimeService.finishDiagnosisFailure(diagnosisRecord, assistantReply.toString(), finalErrorMessage, toolSummary);
|
aiDiagnosisPlanService.finishExecution(
|
plan.getId(),
|
0,
|
diagnosisRecord.getId(),
|
buildPlanMessage(finalErrorMessage, manualTrigger ? "手动执行失败" : "计划执行失败"),
|
new Date(),
|
nextRunTime
|
);
|
} catch (Exception e) {
|
aiDiagnosisRuntimeService.finishDiagnosisFailure(diagnosisRecord, assistantReply.toString(), e.getMessage(), toolSummary);
|
aiDiagnosisPlanService.finishExecution(
|
plan.getId(),
|
0,
|
diagnosisRecord.getId(),
|
buildPlanMessage(e.getMessage(), manualTrigger ? "手动执行失败" : "计划执行失败"),
|
new Date(),
|
nextRunTime
|
);
|
}
|
}
|
|
/**
|
* 为巡检计划组装网关请求。
|
*/
|
private GatewayChatRequest buildGatewayRequest(String sessionId,
|
String question,
|
AiModelRouteRuntimeService.RouteCandidate candidate,
|
AiPromptContext promptContext,
|
List<AiDiagnosticToolResult> diagnosticResults,
|
Integer attemptNo) {
|
GatewayChatRequest request = new GatewayChatRequest();
|
request.setSessionId(sessionId);
|
request.setModelCode(candidate.getAttemptModelCode());
|
request.setRouteCode(candidate.getRouteCode());
|
request.setAttemptNo(attemptNo);
|
request.setSystemPrompt(aiPromptRuntimeService.buildSystemPrompt(
|
promptContext.getSceneCode(),
|
candidate.getRuntimeConfig().getSystemPrompt(),
|
promptContext,
|
diagnosticResults
|
));
|
request.setChatUrl(candidate.getRuntimeConfig().getChatUrl());
|
request.setApiKey(candidate.getRuntimeConfig().getApiKey());
|
request.setModelName(candidate.getRuntimeConfig().getModelName());
|
|
GatewayChatMessage message = new GatewayChatMessage();
|
message.setRole("user");
|
message.setContent(question);
|
request.getMessages().add(message);
|
return request;
|
}
|
|
/**
|
* 消费计划执行时的流式网关事件。
|
*/
|
private boolean handleGatewayEvent(JsonNode event, StringBuilder assistantReply, AttemptState attemptState) {
|
String type = event.path("type").asText();
|
String modelCode = event.path("modelCode").asText();
|
if ("delta".equals(type)) {
|
String content = event.path("content").asText("");
|
assistantReply.append(content);
|
attemptState.setReceivedDelta(true);
|
attemptState.setActualModelCode(modelCode);
|
return true;
|
}
|
if ("error".equals(type)) {
|
attemptState.setSuccess(false);
|
attemptState.setActualModelCode(modelCode);
|
attemptState.setErrorMessage(event.path("message").asText("模型调用失败"));
|
attemptState.setResponseTime(parseResponseTime(event));
|
attemptState.setInterrupted(isInterruptedMessage(attemptState.getErrorMessage()));
|
return false;
|
}
|
if ("done".equals(type)) {
|
attemptState.setSuccess(true);
|
attemptState.setActualModelCode(modelCode);
|
attemptState.setResponseTime(parseResponseTime(event));
|
return false;
|
}
|
return true;
|
}
|
|
/**
|
* 从网关事件中提取响应时间。
|
*/
|
private Date parseResponseTime(JsonNode event) {
|
long millis = event.path("responseTime").asLong(0L);
|
return millis <= 0L ? new Date() : new Date(millis);
|
}
|
|
/**
|
* 将计划执行结果压缩成适合回写到计划记录的短消息。
|
*/
|
private String buildPlanMessage(String text, String fallback) {
|
String source = text == null ? "" : text.trim();
|
if (source.isEmpty()) {
|
return fallback;
|
}
|
return source.length() > 120 ? source.substring(0, 120) : source;
|
}
|
|
/**
|
* 判断异常链中是否包含中断类错误。
|
*/
|
private boolean isInterruptedError(Throwable throwable) {
|
Throwable current = throwable;
|
while (current != null) {
|
if (current instanceof InterruptedException || current instanceof InterruptedIOException) {
|
return true;
|
}
|
if (isInterruptedMessage(current.getMessage())) {
|
return true;
|
}
|
current = current.getCause();
|
}
|
return false;
|
}
|
|
/**
|
* 根据异常消息文本判断是否属于连接中断类错误。
|
*/
|
private boolean isInterruptedMessage(String message) {
|
if (message == null || message.trim().isEmpty()) {
|
return false;
|
}
|
String normalized = message.toLowerCase();
|
return normalized.contains("interrupted")
|
|| normalized.contains("broken pipe")
|
|| normalized.contains("connection reset")
|
|| normalized.contains("forcibly closed");
|
}
|
|
private static class AttemptState {
|
private Boolean success;
|
private String actualModelCode;
|
private String errorMessage;
|
private boolean receivedDelta;
|
private boolean interrupted;
|
private Date responseTime;
|
|
public Boolean getSuccess() {
|
return success;
|
}
|
|
public void setSuccess(Boolean success) {
|
this.success = success;
|
}
|
|
public String getActualModelCode() {
|
return actualModelCode;
|
}
|
|
public void setActualModelCode(String actualModelCode) {
|
this.actualModelCode = actualModelCode;
|
}
|
|
public String getErrorMessage() {
|
return errorMessage;
|
}
|
|
public void setErrorMessage(String errorMessage) {
|
this.errorMessage = errorMessage;
|
}
|
|
public boolean isReceivedDelta() {
|
return receivedDelta;
|
}
|
|
public void setReceivedDelta(boolean receivedDelta) {
|
this.receivedDelta = receivedDelta;
|
}
|
|
public boolean isInterrupted() {
|
return interrupted;
|
}
|
|
public void setInterrupted(boolean interrupted) {
|
this.interrupted = interrupted;
|
}
|
|
public Date getResponseTime() {
|
return responseTime;
|
}
|
|
public void setResponseTime(Date responseTime) {
|
this.responseTime = responseTime;
|
}
|
}
|
}
|